matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::VectorDiff;
19use eyeball_im_util::vector::VectorObserverExt;
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
24use matrix_sdk::{
25    deserialized_responses::{TimelineEvent, TimelineEventKind as SdkTimelineEventKind},
26    event_cache::{
27        paginator::{PaginationResult, Paginator},
28        RoomEventCache,
29    },
30    send_queue::{
31        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
32    },
33    Result, Room,
34};
35use ruma::{
36    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
37    events::{
38        poll::unstable_start::UnstablePollStartEventContent,
39        reaction::ReactionEventContent,
40        receipt::{Receipt, ReceiptThread, ReceiptType},
41        relation::Annotation,
42        room::message::{MessageType, Relation},
43        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
44        AnySyncTimelineEvent, MessageLikeEventType,
45    },
46    serde::Raw,
47    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
48    TransactionId, UserId,
49};
50#[cfg(test)]
51use ruma::{events::receipt::ReceiptEventContent, RoomId};
52use tokio::sync::{RwLock, RwLockWriteGuard};
53use tracing::{
54    debug, error, field, field::debug, info, info_span, instrument, trace, warn, Instrument as _,
55};
56
57pub(super) use self::{
58    metadata::{RelativePosition, TimelineMetadata},
59    observable_items::{
60        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
61        ObservableItemsTransactionEntry,
62    },
63    state::{FullEventMeta, PendingEdit, PendingEditKind, TimelineState},
64    state_transaction::TimelineStateTransaction,
65};
66use super::{
67    algorithms::{rfind_event_by_id, rfind_event_item},
68    event_handler::TimelineEventKind,
69    event_item::{ReactionStatus, RemoteEventOrigin},
70    item::TimelineUniqueId,
71    subscriber::TimelineSubscriber,
72    traits::{Decryptor, RoomDataProvider},
73    DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message,
74    PaginationError, Profile, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus,
75    TimelineItem, TimelineItemContent, TimelineItemKind,
76};
77use crate::{
78    timeline::{
79        algorithms::rfind_event_by_item_id,
80        date_dividers::DateDividerAdjuster,
81        event_item::EventTimelineItemKind,
82        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
83        TimelineEventFilterFn,
84    },
85    unable_to_decrypt_hook::UtdHookManager,
86};
87
88mod aggregations;
89mod metadata;
90mod observable_items;
91mod read_receipts;
92mod state;
93mod state_transaction;
94
95pub(super) use aggregations::*;
96
97/// Data associated to the current timeline focus.
98#[derive(Debug)]
99enum TimelineFocusData<P: RoomDataProvider> {
100    /// The timeline receives live events from the sync.
101    Live,
102
103    /// The timeline is focused on a single event, and it can expand in one
104    /// direction or another.
105    Event {
106        /// The event id we've started to focus on.
107        event_id: OwnedEventId,
108        /// The paginator instance.
109        paginator: Paginator<P>,
110        /// Number of context events to request for the first request.
111        num_context_events: u16,
112    },
113
114    PinnedEvents {
115        loader: PinnedEventsLoader,
116    },
117}
118
119#[derive(Clone, Debug)]
120pub(super) struct TimelineController<P: RoomDataProvider = Room> {
121    /// Inner mutable state.
122    state: Arc<RwLock<TimelineState>>,
123
124    /// Inner mutable focus state.
125    focus: Arc<RwLock<TimelineFocusData<P>>>,
126
127    /// A [`RoomDataProvider`] implementation, providing data.
128    ///
129    /// Useful for testing only; in the real world, it's just a [`Room`].
130    pub(crate) room_data_provider: P,
131
132    /// Settings applied to this timeline.
133    pub(super) settings: TimelineSettings,
134}
135
136#[derive(Clone)]
137pub(super) struct TimelineSettings {
138    /// Should the read receipts and read markers be handled?
139    pub(super) track_read_receipts: bool,
140
141    /// Event filter that controls what's rendered as a timeline item (and thus
142    /// what can carry read receipts).
143    pub(super) event_filter: Arc<TimelineEventFilterFn>,
144
145    /// Are unparsable events added as timeline items of their own kind?
146    pub(super) add_failed_to_parse: bool,
147
148    /// Should the timeline items be grouped by day or month?
149    pub(super) date_divider_mode: DateDividerMode,
150}
151
152#[cfg(not(tarpaulin_include))]
153impl fmt::Debug for TimelineSettings {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        f.debug_struct("TimelineSettings")
156            .field("track_read_receipts", &self.track_read_receipts)
157            .field("add_failed_to_parse", &self.add_failed_to_parse)
158            .finish_non_exhaustive()
159    }
160}
161
162impl Default for TimelineSettings {
163    fn default() -> Self {
164        Self {
165            track_read_receipts: false,
166            event_filter: Arc::new(default_event_filter),
167            add_failed_to_parse: true,
168            date_divider_mode: DateDividerMode::Daily,
169        }
170    }
171}
172
173#[derive(Debug, Clone, Copy)]
174pub(super) enum TimelineFocusKind {
175    Live,
176    Event,
177    PinnedEvents,
178}
179
180/// The default event filter for
181/// [`crate::timeline::TimelineBuilder::event_filter`].
182///
183/// It filters out events that are not rendered by the timeline, including but
184/// not limited to: reactions, edits, redactions on existing messages.
185///
186/// If you have a custom filter, it may be best to chain yours with this one if
187/// you do not want to run into situations where a read receipt is not visible
188/// because it's living on an event that doesn't have a matching timeline item.
189pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
190    match event {
191        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
192            if ev.redacts(room_version).is_some() {
193                // This is a redaction of an existing message, we'll only update the previous
194                // message and not render a new entry.
195                false
196            } else {
197                // This is a redacted entry, that we'll show only if the redacted entity wasn't
198                // a reaction.
199                ev.event_type() != MessageLikeEventType::Reaction
200            }
201        }
202
203        AnySyncTimelineEvent::MessageLike(msg) => {
204            match msg.original_content() {
205                None => {
206                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
207                    // a reaction.
208                    msg.event_type() != MessageLikeEventType::Reaction
209                }
210
211                Some(original_content) => {
212                    match original_content {
213                        AnyMessageLikeEventContent::RoomMessage(content) => {
214                            if content
215                                .relates_to
216                                .as_ref()
217                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
218                            {
219                                // Edits aren't visible by default.
220                                return false;
221                            }
222
223                            matches!(
224                                content.msgtype,
225                                MessageType::Audio(_)
226                                    | MessageType::Emote(_)
227                                    | MessageType::File(_)
228                                    | MessageType::Image(_)
229                                    | MessageType::Location(_)
230                                    | MessageType::Notice(_)
231                                    | MessageType::ServerNotice(_)
232                                    | MessageType::Text(_)
233                                    | MessageType::Video(_)
234                                    | MessageType::VerificationRequest(_)
235                            )
236                        }
237
238                        AnyMessageLikeEventContent::Sticker(_)
239                        | AnyMessageLikeEventContent::UnstablePollStart(
240                            UnstablePollStartEventContent::New(_),
241                        )
242                        | AnyMessageLikeEventContent::CallInvite(_)
243                        | AnyMessageLikeEventContent::CallNotify(_)
244                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
245
246                        _ => false,
247                    }
248                }
249            }
250        }
251
252        AnySyncTimelineEvent::State(_) => {
253            // All the state events may get displayed by default.
254            true
255        }
256    }
257}
258
259impl<P: RoomDataProvider> TimelineController<P> {
260    pub(super) fn new(
261        room_data_provider: P,
262        focus: TimelineFocus,
263        internal_id_prefix: Option<String>,
264        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
265        is_room_encrypted: bool,
266    ) -> Self {
267        let (focus_data, focus_kind) = match focus {
268            TimelineFocus::Live => (TimelineFocusData::Live, TimelineFocusKind::Live),
269
270            TimelineFocus::Event { target, num_context_events } => {
271                let paginator = Paginator::new(room_data_provider.clone());
272                (
273                    TimelineFocusData::Event { paginator, event_id: target, num_context_events },
274                    TimelineFocusKind::Event,
275                )
276            }
277
278            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
279                TimelineFocusData::PinnedEvents {
280                    loader: PinnedEventsLoader::new(
281                        Arc::new(room_data_provider.clone()),
282                        max_events_to_load as usize,
283                        max_concurrent_requests as usize,
284                    ),
285                },
286                TimelineFocusKind::PinnedEvents,
287            ),
288        };
289
290        let state = TimelineState::new(
291            focus_kind,
292            room_data_provider.own_user_id().to_owned(),
293            room_data_provider.room_version(),
294            internal_id_prefix,
295            unable_to_decrypt_hook,
296            is_room_encrypted,
297        );
298
299        Self {
300            state: Arc::new(RwLock::new(state)),
301            focus: Arc::new(RwLock::new(focus_data)),
302            room_data_provider,
303            settings: Default::default(),
304        }
305    }
306
307    /// Initializes the configured focus with appropriate data.
308    ///
309    /// Should be called only once after creation of the [`TimelineInner`], with
310    /// all its fields set.
311    ///
312    /// Returns whether there were any events added to the timeline.
313    pub(super) async fn init_focus(
314        &self,
315        room_event_cache: &RoomEventCache,
316    ) -> Result<bool, Error> {
317        let focus_guard = self.focus.read().await;
318
319        match &*focus_guard {
320            TimelineFocusData::Live => {
321                // Retrieve the cached events, and add them to the timeline.
322                let (events, _stream) = room_event_cache.subscribe().await;
323
324                let has_events = !events.is_empty();
325
326                self.replace_with_initial_remote_events(
327                    events.into_iter(),
328                    RemoteEventOrigin::Cache,
329                )
330                .await;
331
332                Ok(has_events)
333            }
334
335            TimelineFocusData::Event { event_id, paginator, num_context_events } => {
336                // Start a /context request, and append the results (in order) to the timeline.
337                let start_from_result = paginator
338                    .start_from(event_id, (*num_context_events).into())
339                    .await
340                    .map_err(PaginationError::Paginator)?;
341
342                drop(focus_guard);
343
344                let has_events = !start_from_result.events.is_empty();
345
346                self.replace_with_initial_remote_events(
347                    start_from_result.events.into_iter(),
348                    RemoteEventOrigin::Pagination,
349                )
350                .await;
351
352                Ok(has_events)
353            }
354
355            TimelineFocusData::PinnedEvents { loader } => {
356                let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
357
358                drop(focus_guard);
359
360                let has_events = !loaded_events.is_empty();
361
362                self.replace_with_initial_remote_events(
363                    loaded_events.into_iter(),
364                    RemoteEventOrigin::Pagination,
365                )
366                .await;
367
368                Ok(has_events)
369            }
370        }
371    }
372
373    /// Listens to encryption state changes for the room in
374    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
375    /// existing timeline items. This will then cause a refresh of those
376    /// timeline items.
377    pub async fn handle_encryption_state_changes(&self) {
378        let mut room_info = self.room_data_provider.room_info();
379
380        // Small function helper to help mark as encrypted.
381        let mark_encrypted = || async {
382            let mut state = self.state.write().await;
383            state.meta.is_room_encrypted = true;
384            state.mark_all_events_as_encrypted();
385        };
386
387        if room_info.get().is_encrypted() {
388            // If the room was already encrypted, it won't toggle to unencrypted, so we can
389            // shut down this task early.
390            mark_encrypted().await;
391            return;
392        }
393
394        while let Some(info) = room_info.next().await {
395            if info.is_encrypted() {
396                mark_encrypted().await;
397                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
398                // here is done.
399                break;
400            }
401        }
402    }
403
404    pub(crate) async fn reload_pinned_events(
405        &self,
406    ) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
407        let focus_guard = self.focus.read().await;
408
409        if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
410            loader.load_events().await
411        } else {
412            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
413        }
414    }
415
416    /// Run a lazy backwards pagination (in live mode).
417    ///
418    /// It adjusts the `count` value of the `Skip` higher-order stream so that
419    /// more items are pushed front in the timeline.
420    ///
421    /// If no more items are available (i.e. if the `count` is zero), this
422    /// method returns `Some(needs)` where `needs` is the number of events that
423    /// must be unlazily backwards paginated.
424    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
425        let state = self.state.read().await;
426
427        let (count, needs) = state
428            .meta
429            .subscriber_skip_count
430            .compute_next_when_paginating_backwards(num_events.into());
431
432        state.meta.subscriber_skip_count.update(count, &state.timeline_focus);
433
434        needs
435    }
436
437    /// Run a backwards pagination (in focused mode) and append the results to
438    /// the timeline.
439    ///
440    /// Returns whether we hit the start of the timeline.
441    pub(super) async fn focused_paginate_backwards(
442        &self,
443        num_events: u16,
444    ) -> Result<bool, PaginationError> {
445        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
446            TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
447                return Err(PaginationError::NotEventFocusMode)
448            }
449            TimelineFocusData::Event { paginator, .. } => paginator
450                .paginate_backward(num_events.into())
451                .await
452                .map_err(PaginationError::Paginator)?,
453        };
454
455        // Events are in reverse topological order.
456        // We can push front each event individually.
457        self.handle_remote_events_with_diffs(
458            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
459            RemoteEventOrigin::Pagination,
460        )
461        .await;
462
463        Ok(hit_end_of_timeline)
464    }
465
466    /// Run a forwards pagination (in focused mode) and append the results to
467    /// the timeline.
468    ///
469    /// Returns whether we hit the end of the timeline.
470    pub(super) async fn focused_paginate_forwards(
471        &self,
472        num_events: u16,
473    ) -> Result<bool, PaginationError> {
474        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
475            TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
476                return Err(PaginationError::NotEventFocusMode)
477            }
478            TimelineFocusData::Event { paginator, .. } => paginator
479                .paginate_forward(num_events.into())
480                .await
481                .map_err(PaginationError::Paginator)?,
482        };
483
484        // Events are in topological order.
485        // We can append all events with no transformation.
486        self.handle_remote_events_with_diffs(
487            vec![VectorDiff::Append { values: events.into() }],
488            RemoteEventOrigin::Pagination,
489        )
490        .await;
491
492        Ok(hit_end_of_timeline)
493    }
494
495    /// Is this timeline receiving events from sync (aka has a live focus)?
496    pub(super) async fn is_live(&self) -> bool {
497        matches!(&*self.focus.read().await, TimelineFocusData::Live)
498    }
499
500    pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
501        self.settings = settings;
502        self
503    }
504
505    /// Get a copy of the current items in the list.
506    ///
507    /// Cheap because `im::Vector` is cheap to clone.
508    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
509        self.state.read().await.items.clone_items()
510    }
511
512    #[cfg(test)]
513    pub(super) async fn subscribe_raw(
514        &self,
515    ) -> (
516        Vector<Arc<TimelineItem>>,
517        impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
518    ) {
519        let state = self.state.read().await;
520
521        state.items.subscribe().into_values_and_stream()
522    }
523
524    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
525        let state = self.state.read().await;
526
527        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
528    }
529
530    pub(super) async fn subscribe_filter_map<U, F>(
531        &self,
532        f: F,
533    ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
534    where
535        U: Clone,
536        F: Fn(Arc<TimelineItem>) -> Option<U>,
537    {
538        self.state.read().await.items.subscribe().filter_map(f)
539    }
540
541    /// Toggle a reaction locally.
542    ///
543    /// Returns true if the reaction was added, false if it was removed.
544    #[instrument(skip_all)]
545    pub(super) async fn toggle_reaction_local(
546        &self,
547        item_id: &TimelineEventItemId,
548        key: &str,
549    ) -> Result<bool, Error> {
550        let mut state = self.state.write().await;
551
552        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
553            warn!("Timeline item not found, can't add reaction");
554            return Err(Error::FailedToToggleReaction);
555        };
556
557        let user_id = self.room_data_provider.own_user_id();
558        let prev_status = item
559            .content()
560            .reactions()
561            .get(key)
562            .and_then(|group| group.get(user_id))
563            .map(|reaction_info| reaction_info.status.clone());
564
565        let Some(prev_status) = prev_status else {
566            match &item.kind {
567                EventTimelineItemKind::Local(local) => {
568                    if let Some(send_handle) = &local.send_handle {
569                        if send_handle
570                            .react(key.to_owned())
571                            .await
572                            .map_err(|err| Error::SendQueueError(err.into()))?
573                            .is_some()
574                        {
575                            trace!("adding a reaction to a local echo");
576                            return Ok(true);
577                        }
578
579                        warn!("couldn't toggle reaction for local echo");
580                        return Ok(false);
581                    }
582
583                    warn!("missing send handle for local echo; is this a test?");
584                    return Ok(false);
585                }
586
587                EventTimelineItemKind::Remote(remote) => {
588                    // Add a reaction through the room data provider.
589                    // No need to reflect the effect locally, since the local echo handling will
590                    // take care of it.
591                    trace!("adding a reaction to a remote echo");
592                    let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
593                    self.room_data_provider
594                        .send(ReactionEventContent::from(annotation).into())
595                        .await?;
596                    return Ok(true);
597                }
598            }
599        };
600
601        trace!("removing a previous reaction");
602        match prev_status {
603            ReactionStatus::LocalToLocal(send_reaction_handle) => {
604                if let Some(handle) = send_reaction_handle {
605                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
606                        // Impossible state: the reaction has moved from local to echo under our
607                        // feet, but the timeline was supposed to be locked!
608                        warn!("unexpectedly unable to abort sending of local reaction");
609                    }
610                } else {
611                    warn!("no send reaction handle (this should only happen in testing contexts)");
612                }
613            }
614
615            ReactionStatus::LocalToRemote(send_handle) => {
616                // No need to reflect the change ourselves, since handling the discard of the
617                // local echo will take care of it.
618                trace!("aborting send of the previous reaction that was a local echo");
619                if let Some(handle) = send_handle {
620                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
621                        // Impossible state: the reaction has moved from local to echo under our
622                        // feet, but the timeline was supposed to be locked!
623                        warn!("unexpectedly unable to abort sending of local reaction");
624                    }
625                } else {
626                    warn!("no send handle (this should only happen in testing contexts)");
627                }
628            }
629
630            ReactionStatus::RemoteToRemote(event_id) => {
631                // Assume the redaction will work; we'll re-add the reaction if it didn't.
632                let Some(annotated_event_id) =
633                    item.as_remote().map(|event_item| event_item.event_id.clone())
634                else {
635                    warn!("remote reaction to remote event, but the associated item isn't remote");
636                    return Ok(false);
637                };
638
639                let mut reactions = item.content().reactions().clone();
640                let reaction_info = reactions.remove_reaction(user_id, key);
641
642                if reaction_info.is_some() {
643                    let new_item = item.with_reactions(reactions);
644                    state.items.replace(item_pos, new_item);
645                } else {
646                    warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
647                }
648
649                // Release the lock before running the request.
650                drop(state);
651
652                trace!("sending redact for a previous reaction");
653                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
654                    if let Some(reaction_info) = reaction_info {
655                        debug!("sending redact failed, adding the reaction back to the list");
656
657                        let mut state = self.state.write().await;
658                        if let Some((item_pos, item)) =
659                            rfind_event_by_id(&state.items, &annotated_event_id)
660                        {
661                            // Re-add the reaction to the mapping.
662                            let mut reactions = item.content().reactions();
663                            reactions
664                                .entry(key.to_owned())
665                                .or_default()
666                                .insert(user_id.to_owned(), reaction_info);
667                            let new_item = item.with_reactions(reactions);
668                            state.items.replace(item_pos, new_item);
669                        } else {
670                            warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
671                        }
672                    }
673
674                    return Err(err);
675                }
676            }
677        }
678
679        Ok(false)
680    }
681
682    /// Handle updates on events as [`VectorDiff`]s.
683    pub(super) async fn handle_remote_events_with_diffs(
684        &self,
685        diffs: Vec<VectorDiff<TimelineEvent>>,
686        origin: RemoteEventOrigin,
687    ) {
688        if diffs.is_empty() {
689            return;
690        }
691
692        let mut state = self.state.write().await;
693        state
694            .handle_remote_events_with_diffs(
695                diffs,
696                origin,
697                &self.room_data_provider,
698                &self.settings,
699            )
700            .await
701    }
702
703    pub(super) async fn clear(&self) {
704        self.state.write().await.clear();
705    }
706
707    /// Replaces the content of the current timeline with initial events.
708    ///
709    /// Also sets up read receipts and the read marker for a live timeline of a
710    /// room.
711    ///
712    /// This is all done with a single lock guard, since we don't want the state
713    /// to be modified between the clear and re-insertion of new events.
714    pub(super) async fn replace_with_initial_remote_events<Events>(
715        &self,
716        events: Events,
717        origin: RemoteEventOrigin,
718    ) where
719        Events: IntoIterator + ExactSizeIterator,
720        <Events as IntoIterator>::Item: Into<TimelineEvent>,
721    {
722        let mut state = self.state.write().await;
723
724        let track_read_markers = self.settings.track_read_receipts;
725        if track_read_markers {
726            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
727            state
728                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
729                .await;
730        }
731
732        // Replace the events if either the current event list or the new one aren't
733        // empty.
734        // Previously we just had to check the new one wasn't empty because
735        // we did a clear operation before so the current one would always be empty, but
736        // now we may want to replace a populated timeline with an empty one.
737        if !state.items.is_empty() || events.len() > 0 {
738            state
739                .replace_with_remote_events(
740                    events,
741                    origin,
742                    &self.room_data_provider,
743                    &self.settings,
744                )
745                .await;
746        }
747
748        if track_read_markers {
749            if let Some(fully_read_event_id) =
750                self.room_data_provider.load_fully_read_marker().await
751            {
752                state.handle_fully_read_marker(fully_read_event_id);
753            }
754        }
755    }
756
757    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
758        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
759    }
760
761    pub(super) async fn handle_ephemeral_events(
762        &self,
763        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
764    ) {
765        let mut state = self.state.write().await;
766        state.handle_ephemeral_events(events, &self.room_data_provider).await;
767    }
768
769    /// Creates the local echo for an event we're sending.
770    #[instrument(skip_all)]
771    pub(super) async fn handle_local_event(
772        &self,
773        txn_id: OwnedTransactionId,
774        content: TimelineEventKind,
775        send_handle: Option<SendHandle>,
776    ) {
777        let sender = self.room_data_provider.own_user_id().to_owned();
778        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
779
780        // Only add new items if the timeline is live.
781        let should_add_new_items = self.is_live().await;
782
783        let date_divider_mode = self.settings.date_divider_mode.clone();
784
785        let mut state = self.state.write().await;
786        state
787            .handle_local_event(
788                sender,
789                profile,
790                should_add_new_items,
791                date_divider_mode,
792                txn_id,
793                send_handle,
794                content,
795            )
796            .await;
797    }
798
799    /// Update the send state of a local event represented by a transaction ID.
800    ///
801    /// If the corresponding local timeline item is missing, a warning is
802    /// raised.
803    #[instrument(skip(self))]
804    pub(super) async fn update_event_send_state(
805        &self,
806        txn_id: &TransactionId,
807        send_state: EventSendState,
808    ) {
809        let mut state = self.state.write().await;
810        let mut txn = state.transaction();
811
812        let new_event_id: Option<&EventId> =
813            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
814
815        // The local echoes are always at the end of the timeline, we must first make
816        // sure the remote echo hasn't showed up yet.
817        if rfind_event_item(&txn.items, |it| {
818            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
819        })
820        .is_some()
821        {
822            // Remote echo already received. This is very unlikely.
823            trace!("Remote echo received before send-event response");
824
825            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
826
827            // If there's both the remote echo and a local echo, that means the
828            // remote echo was received before the response *and* contained no
829            // transaction ID (and thus duplicated the local echo).
830            if let Some((idx, _)) = local_echo {
831                warn!("Message echo got duplicated, removing the local one");
832                txn.items.remove(idx);
833
834                // Adjust the date dividers, if needs be.
835                let mut adjuster =
836                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
837                adjuster.run(&mut txn.items, &mut txn.meta);
838            }
839
840            txn.commit();
841            return;
842        }
843
844        // Look for the local event by the transaction ID or event ID.
845        let result = rfind_event_item(&txn.items, |it| {
846            it.transaction_id() == Some(txn_id)
847                || new_event_id.is_some()
848                    && it.event_id() == new_event_id
849                    && it.as_local().is_some()
850        });
851
852        let Some((idx, item)) = result else {
853            // Event wasn't found as a standalone item.
854            //
855            // If it was just sent, try to find if it matches a corresponding aggregation,
856            // and mark it as sent in that case.
857            if let Some(new_event_id) = new_event_id {
858                match txn
859                    .meta
860                    .aggregations
861                    .mark_aggregation_as_sent(txn_id.to_owned(), new_event_id.to_owned())
862                {
863                    MarkAggregationSentResult::MarkedSent { update } => {
864                        trace!("marked aggregation as sent");
865
866                        if let Some((target, aggregation)) = update {
867                            if let Some((item_pos, item)) =
868                                rfind_event_by_item_id(&txn.items, &target)
869                            {
870                                let mut content = item.content().clone();
871                                match aggregation.apply(&mut content) {
872                                    ApplyAggregationResult::UpdatedItem => {
873                                        trace!("reapplied aggregation in the event");
874                                        let internal_id = item.internal_id.to_owned();
875                                        let new_item = item.with_content(content);
876                                        txn.items.replace(
877                                            item_pos,
878                                            TimelineItem::new(new_item, internal_id),
879                                        );
880                                        txn.commit();
881                                    }
882                                    ApplyAggregationResult::LeftItemIntact => {}
883                                    ApplyAggregationResult::Error(err) => {
884                                        warn!("when reapplying aggregation just marked as sent: {err}");
885                                    }
886                                }
887                            }
888                        }
889
890                        // Early return: we've found the event to mark as sent, it was an
891                        // aggregation.
892                        return;
893                    }
894
895                    MarkAggregationSentResult::NotFound => {}
896                }
897            }
898
899            warn!("Timeline item not found, can't update send state");
900            return;
901        };
902
903        let Some(local_item) = item.as_local() else {
904            warn!("We looked for a local item, but it transitioned to remote.");
905            return;
906        };
907
908        // The event was already marked as sent, that's a broken state, let's
909        // emit an error but also override to the given sent state.
910        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
911            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
912        }
913
914        // If the event has just been marked as sent, update the aggregations mapping to
915        // take that into account.
916        if let Some(new_event_id) = new_event_id {
917            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
918        }
919
920        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
921        txn.items.replace(idx, new_item);
922
923        txn.commit();
924    }
925
926    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
927        let mut state = self.state.write().await;
928
929        if let Some((idx, _)) =
930            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
931        {
932            let mut txn = state.transaction();
933
934            txn.items.remove(idx);
935
936            // A read marker or a date divider may have been inserted before the local echo.
937            // Ensure both are up to date.
938            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
939            adjuster.run(&mut txn.items, &mut txn.meta);
940
941            txn.meta.update_read_marker(&mut txn.items);
942
943            txn.commit();
944
945            debug!("discarded local echo");
946            return true;
947        }
948
949        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
950        // dereferencing it once.
951        let state = &mut *state;
952
953        // Look if this was a local aggregation.
954        if let Some((target, aggregation)) = state
955            .meta
956            .aggregations
957            .try_remove_aggregation(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
958        {
959            let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, target) else {
960                warn!("missing target item for a local aggregation");
961                return false;
962            };
963
964            let mut content = item.content().clone();
965            match aggregation.unapply(&mut content) {
966                ApplyAggregationResult::UpdatedItem => {
967                    trace!("removed local reaction to local echo");
968                    let internal_id = item.internal_id.clone();
969                    let new_item = item.with_content(content);
970                    state.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
971                }
972                ApplyAggregationResult::LeftItemIntact => {}
973                ApplyAggregationResult::Error(err) => {
974                    warn!("when undoing local aggregation: {err}");
975                }
976            }
977
978            return true;
979        }
980
981        false
982    }
983
984    pub(super) async fn replace_local_echo(
985        &self,
986        txn_id: &TransactionId,
987        content: AnyMessageLikeEventContent,
988    ) -> bool {
989        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
990            // Ideally, we'd support replacing local echoes for a reaction, etc., but
991            // handling RoomMessage should be sufficient in most cases. Worst
992            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
993            // editing the event then.
994            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
995            return false;
996        };
997
998        let mut state = self.state.write().await;
999        let mut txn = state.transaction();
1000
1001        let Some((idx, prev_item)) =
1002            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1003        else {
1004            debug!("Can't find local echo to replace");
1005            return false;
1006        };
1007
1008        // Reuse the previous local echo's state, but reset the send state to not sent
1009        // (per API contract).
1010        let ti_kind = {
1011            let Some(prev_local_item) = prev_item.as_local() else {
1012                warn!("We looked for a local item, but it transitioned as remote??");
1013                return false;
1014            };
1015            prev_local_item.with_send_state(EventSendState::NotSentYet)
1016        };
1017
1018        // Replace the local-related state (kind) and the content state.
1019        let prev_reactions = prev_item.content().reactions();
1020        let new_item = TimelineItem::new(
1021            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1022                content,
1023                None,
1024                &txn.items,
1025                prev_reactions,
1026            )),
1027            prev_item.internal_id.to_owned(),
1028        );
1029
1030        txn.items.replace(idx, new_item);
1031
1032        // This doesn't change the original sending time, so there's no need to adjust
1033        // date dividers.
1034
1035        txn.commit();
1036
1037        debug!("Replaced local echo");
1038        true
1039    }
1040
1041    #[instrument(skip(self, room), fields(room_id = ?room.room_id()))]
1042    pub(super) async fn retry_event_decryption(
1043        &self,
1044        room: &Room,
1045        session_ids: Option<BTreeSet<String>>,
1046    ) {
1047        self.retry_event_decryption_inner(room.to_owned(), session_ids).await
1048    }
1049
1050    #[cfg(test)]
1051    pub(super) async fn retry_event_decryption_test(
1052        &self,
1053        room_id: &RoomId,
1054        olm_machine: OlmMachine,
1055        session_ids: Option<BTreeSet<String>>,
1056    ) {
1057        self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1058    }
1059
1060    async fn retry_event_decryption_inner(
1061        &self,
1062        decryptor: impl Decryptor,
1063        session_ids: Option<BTreeSet<String>>,
1064    ) {
1065        use super::EncryptedMessage;
1066
1067        let mut state = self.state.clone().write_owned().await;
1068
1069        let should_retry = move |session_id: &str| {
1070            if let Some(session_ids) = &session_ids {
1071                session_ids.contains(session_id)
1072            } else {
1073                true
1074            }
1075        };
1076
1077        let retry_indices: Vec<_> = state
1078            .items
1079            .iter()
1080            .enumerate()
1081            .filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? {
1082                EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
1083                    if should_retry(session_id) =>
1084                {
1085                    Some(idx)
1086                }
1087                EncryptedMessage::MegolmV1AesSha2 { .. }
1088                | EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
1089                | EncryptedMessage::Unknown => None,
1090            })
1091            .collect();
1092
1093        if retry_indices.is_empty() {
1094            return;
1095        }
1096
1097        debug!("Retrying decryption");
1098
1099        let settings = self.settings.clone();
1100        let room_data_provider = self.room_data_provider.clone();
1101        let push_rules_context = room_data_provider.push_rules_and_context().await;
1102        let unable_to_decrypt_hook = state.meta.unable_to_decrypt_hook.clone();
1103
1104        matrix_sdk::executor::spawn(async move {
1105            let retry_one = |item: Arc<TimelineItem>| {
1106                let decryptor = decryptor.clone();
1107                let should_retry = &should_retry;
1108                let unable_to_decrypt_hook = unable_to_decrypt_hook.clone();
1109                async move {
1110                    let event_item = item.as_event()?;
1111
1112                    let session_id = match event_item.content().as_unable_to_decrypt()? {
1113                        EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
1114                            if should_retry(session_id) =>
1115                        {
1116                            session_id
1117                        }
1118                        EncryptedMessage::MegolmV1AesSha2 { .. }
1119                        | EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
1120                        | EncryptedMessage::Unknown => return None,
1121                    };
1122
1123                    tracing::Span::current().record("session_id", session_id);
1124
1125                    let Some(remote_event) = event_item.as_remote() else {
1126                        error!("Key for unable-to-decrypt timeline item is not an event ID");
1127                        return None;
1128                    };
1129
1130                    tracing::Span::current().record("event_id", debug(&remote_event.event_id));
1131
1132                    let Some(original_json) = &remote_event.original_json else {
1133                        error!("UTD item must contain original JSON");
1134                        return None;
1135                    };
1136
1137                    match decryptor.decrypt_event_impl(original_json).await {
1138                        Ok(event) => {
1139                            if let SdkTimelineEventKind::UnableToDecrypt { utd_info, .. } =
1140                                event.kind
1141                            {
1142                                info!(
1143                                    "Failed to decrypt event after receiving room key: {:?}",
1144                                    utd_info.reason
1145                                );
1146                                None
1147                            } else {
1148                                // Notify observers that we managed to eventually decrypt an event.
1149                                if let Some(hook) = unable_to_decrypt_hook {
1150                                    hook.on_late_decrypt(&remote_event.event_id).await;
1151                                }
1152
1153                                Some(event)
1154                            }
1155                        }
1156                        Err(e) => {
1157                            info!("Failed to decrypt event after receiving room key: {e}");
1158                            None
1159                        }
1160                    }
1161                }
1162                .instrument(info_span!(
1163                    "retry_one",
1164                    session_id = field::Empty,
1165                    event_id = field::Empty
1166                ))
1167            };
1168
1169            state
1170                .retry_event_decryption(
1171                    retry_one,
1172                    retry_indices,
1173                    push_rules_context,
1174                    &room_data_provider,
1175                    &settings,
1176                )
1177                .await;
1178        });
1179    }
1180
1181    pub(super) async fn set_sender_profiles_pending(&self) {
1182        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1183    }
1184
1185    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1186        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1187    }
1188
1189    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1190        self.state.write().await.items.for_each(|mut entry| {
1191            let Some(event_item) = entry.as_event() else { return };
1192            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1193                let new_item = entry.with_kind(TimelineItemKind::Event(
1194                    event_item.with_sender_profile(profile_state.clone()),
1195                ));
1196                ObservableItemsEntry::replace(&mut entry, new_item);
1197            }
1198        });
1199    }
1200
1201    pub(super) async fn update_missing_sender_profiles(&self) {
1202        trace!("Updating missing sender profiles");
1203
1204        let mut state = self.state.write().await;
1205        let mut entries = state.items.entries();
1206        while let Some(mut entry) = entries.next() {
1207            let Some(event_item) = entry.as_event() else { continue };
1208            let event_id = event_item.event_id().map(debug);
1209            let transaction_id = event_item.transaction_id().map(debug);
1210
1211            if event_item.sender_profile().is_ready() {
1212                trace!(event_id, transaction_id, "Profile already set");
1213                continue;
1214            }
1215
1216            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1217                Some(profile) => {
1218                    trace!(event_id, transaction_id, "Adding profile");
1219                    let updated_item =
1220                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1221                    let new_item = entry.with_kind(updated_item);
1222                    ObservableItemsEntry::replace(&mut entry, new_item);
1223                }
1224                None => {
1225                    if !event_item.sender_profile().is_unavailable() {
1226                        trace!(event_id, transaction_id, "Marking profile unavailable");
1227                        let updated_item =
1228                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1229                        let new_item = entry.with_kind(updated_item);
1230                        ObservableItemsEntry::replace(&mut entry, new_item);
1231                    } else {
1232                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1233                    }
1234                }
1235            }
1236        }
1237
1238        trace!("Done updating missing sender profiles");
1239    }
1240
1241    /// Update the profiles of the given senders, even if they are ready.
1242    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1243        trace!("Forcing update of sender profiles: {sender_ids:?}");
1244
1245        let mut state = self.state.write().await;
1246        let mut entries = state.items.entries();
1247        while let Some(mut entry) = entries.next() {
1248            let Some(event_item) = entry.as_event() else { continue };
1249            if !sender_ids.contains(event_item.sender()) {
1250                continue;
1251            }
1252
1253            let event_id = event_item.event_id().map(debug);
1254            let transaction_id = event_item.transaction_id().map(debug);
1255
1256            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1257                Some(profile) => {
1258                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1259                    {
1260                        debug!(event_id, transaction_id, "Profile already up-to-date");
1261                    } else {
1262                        trace!(event_id, transaction_id, "Updating profile");
1263                        let updated_item =
1264                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1265                        let new_item = entry.with_kind(updated_item);
1266                        ObservableItemsEntry::replace(&mut entry, new_item);
1267                    }
1268                }
1269                None => {
1270                    if !event_item.sender_profile().is_unavailable() {
1271                        trace!(event_id, transaction_id, "Marking profile unavailable");
1272                        let updated_item =
1273                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1274                        let new_item = entry.with_kind(updated_item);
1275                        ObservableItemsEntry::replace(&mut entry, new_item);
1276                    } else {
1277                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1278                    }
1279                }
1280            }
1281        }
1282
1283        trace!("Done forcing update of sender profiles");
1284    }
1285
1286    #[cfg(test)]
1287    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1288        let own_user_id = self.room_data_provider.own_user_id();
1289        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1290    }
1291
1292    /// Get the latest read receipt for the given user.
1293    ///
1294    /// Useful to get the latest read receipt, whether it's private or public.
1295    pub(super) async fn latest_user_read_receipt(
1296        &self,
1297        user_id: &UserId,
1298    ) -> Option<(OwnedEventId, Receipt)> {
1299        self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
1300    }
1301
1302    /// Get the ID of the timeline event with the latest read receipt for the
1303    /// given user.
1304    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1305        &self,
1306        user_id: &UserId,
1307    ) -> Option<OwnedEventId> {
1308        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1309    }
1310
1311    /// Subscribe to changes in the read receipts of our own user.
1312    pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1313        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1314    }
1315
1316    /// Handle a room send update that's a new local echo.
1317    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1318        match echo.content {
1319            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1320                let content = match serialized_event.deserialize() {
1321                    Ok(d) => d,
1322                    Err(err) => {
1323                        warn!("error deserializing local echo: {err}");
1324                        return;
1325                    }
1326                };
1327
1328                self.handle_local_event(
1329                    echo.transaction_id.clone(),
1330                    TimelineEventKind::Message { content, relations: Default::default() },
1331                    Some(send_handle),
1332                )
1333                .await;
1334
1335                if let Some(send_error) = send_error {
1336                    self.update_event_send_state(
1337                        &echo.transaction_id,
1338                        EventSendState::SendingFailed {
1339                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)),
1340                            is_recoverable: false,
1341                        },
1342                    )
1343                    .await;
1344                }
1345            }
1346
1347            LocalEchoContent::React { key, send_handle, applies_to } => {
1348                self.handle_local_reaction(key, send_handle, applies_to).await;
1349            }
1350        }
1351    }
1352
1353    /// Adds a reaction (local echo) to a local echo.
1354    #[instrument(skip(self, send_handle))]
1355    async fn handle_local_reaction(
1356        &self,
1357        reaction_key: String,
1358        send_handle: SendReactionHandle,
1359        applies_to: OwnedTransactionId,
1360    ) {
1361        let mut state = self.state.write().await;
1362        let mut tr = state.transaction();
1363
1364        let target = TimelineEventItemId::TransactionId(applies_to);
1365
1366        let reaction_txn_id = send_handle.transaction_id().to_owned();
1367        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1368        let aggregation = Aggregation::new(
1369            TimelineEventItemId::TransactionId(reaction_txn_id),
1370            AggregationKind::Reaction {
1371                key: reaction_key.clone(),
1372                sender: self.room_data_provider.own_user_id().to_owned(),
1373                timestamp: MilliSecondsSinceUnixEpoch::now(),
1374                reaction_status,
1375            },
1376        );
1377
1378        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1379        find_item_and_apply_aggregation(&mut tr.items, &target, aggregation);
1380
1381        tr.commit();
1382    }
1383
1384    /// Handle a single room send queue update.
1385    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1386        match update {
1387            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1388                self.handle_local_echo(echo).await;
1389            }
1390
1391            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1392                if !self.discard_local_echo(&transaction_id).await {
1393                    warn!("couldn't find the local echo to discard");
1394                }
1395            }
1396
1397            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1398                let content = match new_content.deserialize() {
1399                    Ok(d) => d,
1400                    Err(err) => {
1401                        warn!("error deserializing local echo (upon edit): {err}");
1402                        return;
1403                    }
1404                };
1405
1406                if !self.replace_local_echo(&transaction_id, content).await {
1407                    warn!("couldn't find the local echo to replace");
1408                }
1409            }
1410
1411            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1412                self.update_event_send_state(
1413                    &transaction_id,
1414                    EventSendState::SendingFailed { error, is_recoverable },
1415                )
1416                .await;
1417            }
1418
1419            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1420                self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1421            }
1422
1423            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1424                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1425                    .await;
1426            }
1427
1428            RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1429                // TODO(bnjbvr): Do something else?
1430                info!(txn_id = %related_to, "some media for a media event has been uploaded");
1431            }
1432        }
1433    }
1434}
1435
1436impl TimelineController {
1437    pub(super) fn room(&self) -> &Room {
1438        &self.room_data_provider
1439    }
1440
1441    /// Given an event identifier, will fetch the details for the event it's
1442    /// replying to, if applicable.
1443    #[instrument(skip(self))]
1444    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1445        let state = self.state.write().await;
1446        let (index, item) = rfind_event_by_id(&state.items, event_id)
1447            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1448        let remote_item = item
1449            .as_remote()
1450            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1451            .clone();
1452
1453        let TimelineItemContent::Message(message) = item.content().clone() else {
1454            debug!("Event is not a message");
1455            return Ok(());
1456        };
1457        let Some(in_reply_to) = message.in_reply_to() else {
1458            debug!("Event is not a reply");
1459            return Ok(());
1460        };
1461        if let TimelineDetails::Pending = &in_reply_to.event {
1462            debug!("Replied-to event is already being fetched");
1463            return Ok(());
1464        }
1465        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1466            debug!("Replied-to event has already been fetched");
1467            return Ok(());
1468        }
1469
1470        let internal_id = item.internal_id.to_owned();
1471        let item = item.clone();
1472        let event = fetch_replied_to_event(
1473            state,
1474            index,
1475            &item,
1476            internal_id,
1477            &message,
1478            &in_reply_to.event_id,
1479            self.room(),
1480        )
1481        .await?;
1482
1483        // We need to be sure to have the latest position of the event as it might have
1484        // changed while waiting for the request.
1485        let mut state = self.state.write().await;
1486        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1487            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1488
1489        // Check the state of the event again, it might have been redacted while
1490        // the request was in-flight.
1491        let TimelineItemContent::Message(message) = item.content().clone() else {
1492            info!("Event is no longer a message (redacted?)");
1493            return Ok(());
1494        };
1495        let Some(in_reply_to) = message.in_reply_to() else {
1496            warn!("Event no longer has a reply (bug?)");
1497            return Ok(());
1498        };
1499
1500        // Now that we've received the content of the replied-to event, replace the
1501        // replied-to content in the item with it.
1502        trace!("Updating in-reply-to details");
1503        let internal_id = item.internal_id.to_owned();
1504        let mut item = item.clone();
1505        item.set_content(TimelineItemContent::Message(
1506            message.with_in_reply_to(InReplyToDetails {
1507                event_id: in_reply_to.event_id.clone(),
1508                event,
1509            }),
1510        ));
1511        state.items.replace(index, TimelineItem::new(item, internal_id));
1512
1513        Ok(())
1514    }
1515
1516    /// Check whether the given receipt should be sent.
1517    ///
1518    /// Returns `false` if the given receipt is older than the current one.
1519    pub(super) async fn should_send_receipt(
1520        &self,
1521        receipt_type: &SendReceiptType,
1522        thread: &ReceiptThread,
1523        event_id: &EventId,
1524    ) -> bool {
1525        // We don't support threaded receipts yet.
1526        if *thread != ReceiptThread::Unthreaded {
1527            return true;
1528        }
1529
1530        let own_user_id = self.room().own_user_id();
1531        let state = self.state.read().await;
1532        let room = self.room();
1533
1534        match receipt_type {
1535            SendReceiptType::Read => {
1536                if let Some((old_pub_read, _)) = state
1537                    .meta
1538                    .user_receipt(
1539                        own_user_id,
1540                        ReceiptType::Read,
1541                        room,
1542                        state.items.all_remote_events(),
1543                    )
1544                    .await
1545                {
1546                    trace!(%old_pub_read, "found a previous public receipt");
1547                    if let Some(relative_pos) = state.meta.compare_events_positions(
1548                        &old_pub_read,
1549                        event_id,
1550                        state.items.all_remote_events(),
1551                    ) {
1552                        trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1553                        return relative_pos == RelativePosition::After;
1554                    }
1555                }
1556            }
1557            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1558            // doesn't make sense to have a private read receipt behind a public one.
1559            SendReceiptType::ReadPrivate => {
1560                if let Some((old_priv_read, _)) =
1561                    state.latest_user_read_receipt(own_user_id, room).await
1562                {
1563                    trace!(%old_priv_read, "found a previous private receipt");
1564                    if let Some(relative_pos) = state.meta.compare_events_positions(
1565                        &old_priv_read,
1566                        event_id,
1567                        state.items.all_remote_events(),
1568                    ) {
1569                        trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1570                        return relative_pos == RelativePosition::After;
1571                    }
1572                }
1573            }
1574            SendReceiptType::FullyRead => {
1575                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1576                {
1577                    if let Some(relative_pos) = state.meta.compare_events_positions(
1578                        &prev_event_id,
1579                        event_id,
1580                        state.items.all_remote_events(),
1581                    ) {
1582                        return relative_pos == RelativePosition::After;
1583                    }
1584                }
1585            }
1586            _ => {}
1587        }
1588
1589        // Let the server handle unknown receipts.
1590        true
1591    }
1592
1593    /// Returns the latest event identifier, even if it's not visible, or if
1594    /// it's folded into another timeline item.
1595    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1596        let state = self.state.read().await;
1597        state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1598    }
1599}
1600
1601async fn fetch_replied_to_event(
1602    mut state: RwLockWriteGuard<'_, TimelineState>,
1603    index: usize,
1604    item: &EventTimelineItem,
1605    internal_id: TimelineUniqueId,
1606    message: &Message,
1607    in_reply_to: &EventId,
1608    room: &Room,
1609) -> Result<TimelineDetails<Box<RepliedToEvent>>, Error> {
1610    if let Some((_, item)) = rfind_event_by_id(&state.items, in_reply_to) {
1611        let details = TimelineDetails::Ready(Box::new(RepliedToEvent {
1612            content: item.content.clone(),
1613            sender: item.sender().to_owned(),
1614            sender_profile: item.sender_profile().clone(),
1615        }));
1616
1617        trace!("Found replied-to event locally");
1618        return Ok(details);
1619    };
1620
1621    // Replace the item with a new timeline item that has the fetching status of the
1622    // replied-to event to pending.
1623    trace!("Setting in-reply-to details to pending");
1624    let reply = message.with_in_reply_to(InReplyToDetails {
1625        event_id: in_reply_to.to_owned(),
1626        event: TimelineDetails::Pending,
1627    });
1628    let event_item = item.with_content(TimelineItemContent::Message(reply));
1629
1630    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1631    state.items.replace(index, new_timeline_item);
1632
1633    // Don't hold the state lock while the network request is made
1634    drop(state);
1635
1636    trace!("Fetching replied-to event");
1637    let res = match room.event(in_reply_to, None).await {
1638        Ok(timeline_event) => TimelineDetails::Ready(Box::new(
1639            RepliedToEvent::try_from_timeline_event(timeline_event, room).await?,
1640        )),
1641        Err(e) => TimelineDetails::Error(Arc::new(e)),
1642    };
1643    Ok(res)
1644}