matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::Result;
25use matrix_sdk::{
26    deserialized_responses::TimelineEvent,
27    event_cache::{RoomEventCache, RoomPaginationStatus},
28    paginators::{PaginationResult, Paginator},
29    send_queue::{
30        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31    },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40        AnySyncTimelineEvent, MessageLikeEventType,
41        poll::unstable_start::UnstablePollStartEventContent,
42        reaction::ReactionEventContent,
43        receipt::{Receipt, ReceiptThread, ReceiptType},
44        relation::Annotation,
45        room::message::{MessageType, Relation},
46    },
47    room_version_rules::RoomVersionRules,
48    serde::Raw,
49};
50use tokio::sync::{RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54    metadata::{RelativePosition, TimelineMetadata},
55    observable_items::{
56        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57        ObservableItemsTransactionEntry,
58    },
59    state::TimelineState,
60    state_transaction::TimelineStateTransaction,
61};
62use super::{
63    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
66    algorithms::{rfind_event_by_id, rfind_event_item},
67    event_item::{ReactionStatus, RemoteEventOrigin},
68    item::TimelineUniqueId,
69    subscriber::TimelineSubscriber,
70    traits::RoomDataProvider,
71};
72use crate::{
73    timeline::{
74        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75        algorithms::rfind_event_by_item_id,
76        date_dividers::DateDividerAdjuster,
77        event_item::TimelineItemHandle,
78        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79    },
80    unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93
94/// Data associated to the current timeline focus.
95///
96/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
97/// version of it, including extra state that makes it useful over the lifetime
98/// of a timeline.
99#[derive(Debug)]
100pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
101    /// The timeline receives live events from the sync.
102    Live {
103        /// Whether to hide in-thread events from the timeline.
104        hide_threaded_events: bool,
105    },
106
107    /// The timeline is focused on a single event, and it can expand in one
108    /// direction or another.
109    Event {
110        /// The paginator instance.
111        paginator: Paginator<P>,
112
113        /// Whether to hide in-thread events from the timeline.
114        hide_threaded_events: bool,
115    },
116
117    /// A live timeline for a thread.
118    Thread {
119        /// The root event for the current thread.
120        root_event_id: OwnedEventId,
121    },
122
123    PinnedEvents {
124        loader: PinnedEventsLoader,
125    },
126}
127
128impl<P: RoomDataProvider> TimelineFocusKind<P> {
129    /// Returns the [`ReceiptThread`] that should be used for the current
130    /// timeline focus.
131    ///
132    /// Live and event timelines will use the unthreaded read receipt type in
133    /// general, unless they hide in-thread events, in which case they will
134    /// use the main thread.
135    pub(super) fn receipt_thread(&self) -> ReceiptThread {
136        if let Some(thread_root) = self.thread_root() {
137            ReceiptThread::Thread(thread_root.to_owned())
138        } else if self.hide_threaded_events() {
139            ReceiptThread::Main
140        } else {
141            ReceiptThread::Unthreaded
142        }
143    }
144
145    /// Whether to hide in-thread events from the timeline.
146    fn hide_threaded_events(&self) -> bool {
147        match self {
148            TimelineFocusKind::Live { hide_threaded_events }
149            | TimelineFocusKind::Event { hide_threaded_events, .. } => *hide_threaded_events,
150            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
151        }
152    }
153
154    /// Whether the focus is on a thread (from a live thread or a thread
155    /// permalink).
156    fn is_thread(&self) -> bool {
157        self.thread_root().is_some()
158    }
159
160    /// If the focus is a thread, returns its root event ID.
161    fn thread_root(&self) -> Option<&EventId> {
162        match self {
163            TimelineFocusKind::Live { .. }
164            | TimelineFocusKind::Event { paginator: _, hide_threaded_events: _ }
165            | TimelineFocusKind::PinnedEvents { .. } => None,
166            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
167        }
168    }
169}
170
171#[derive(Clone, Debug)]
172pub(super) struct TimelineController<P: RoomDataProvider = Room> {
173    /// Inner mutable state.
174    state: Arc<RwLock<TimelineState<P>>>,
175
176    /// Focus data.
177    focus: Arc<TimelineFocusKind<P>>,
178
179    /// A [`RoomDataProvider`] implementation, providing data.
180    ///
181    /// The type is a `RoomDataProvider` to allow testing. In the real world,
182    /// this would normally be a [`Room`].
183    pub(crate) room_data_provider: P,
184
185    /// Settings applied to this timeline.
186    pub(super) settings: TimelineSettings,
187
188    /// Long-running task used to retry decryption of timeline items without
189    /// blocking main processing.
190    decryption_retry_task: DecryptionRetryTask<P, P>,
191}
192
193#[derive(Clone)]
194pub(super) struct TimelineSettings {
195    /// Should the read receipts and read markers be handled?
196    pub(super) track_read_receipts: bool,
197
198    /// Event filter that controls what's rendered as a timeline item (and thus
199    /// what can carry read receipts).
200    pub(super) event_filter: Arc<TimelineEventFilterFn>,
201
202    /// Are unparsable events added as timeline items of their own kind?
203    pub(super) add_failed_to_parse: bool,
204
205    /// Should the timeline items be grouped by day or month?
206    pub(super) date_divider_mode: DateDividerMode,
207}
208
209#[cfg(not(tarpaulin_include))]
210impl fmt::Debug for TimelineSettings {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        f.debug_struct("TimelineSettings")
213            .field("track_read_receipts", &self.track_read_receipts)
214            .field("add_failed_to_parse", &self.add_failed_to_parse)
215            .finish_non_exhaustive()
216    }
217}
218
219impl Default for TimelineSettings {
220    fn default() -> Self {
221        Self {
222            track_read_receipts: false,
223            event_filter: Arc::new(default_event_filter),
224            add_failed_to_parse: true,
225            date_divider_mode: DateDividerMode::Daily,
226        }
227    }
228}
229
230/// The default event filter for
231/// [`crate::timeline::TimelineBuilder::event_filter`].
232///
233/// It filters out events that are not rendered by the timeline, including but
234/// not limited to: reactions, edits, redactions on existing messages.
235///
236/// If you have a custom filter, it may be best to chain yours with this one if
237/// you do not want to run into situations where a read receipt is not visible
238/// because it's living on an event that doesn't have a matching timeline item.
239pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
240    match event {
241        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
242            if ev.redacts(&rules.redaction).is_some() {
243                // This is a redaction of an existing message, we'll only update the previous
244                // message and not render a new entry.
245                false
246            } else {
247                // This is a redacted entry, that we'll show only if the redacted entity wasn't
248                // a reaction.
249                ev.event_type() != MessageLikeEventType::Reaction
250            }
251        }
252
253        AnySyncTimelineEvent::MessageLike(msg) => {
254            match msg.original_content() {
255                None => {
256                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
257                    // a reaction.
258                    msg.event_type() != MessageLikeEventType::Reaction
259                }
260
261                Some(original_content) => {
262                    match original_content {
263                        AnyMessageLikeEventContent::RoomMessage(content) => {
264                            if content
265                                .relates_to
266                                .as_ref()
267                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
268                            {
269                                // Edits aren't visible by default.
270                                return false;
271                            }
272
273                            match content.msgtype {
274                                MessageType::Audio(_)
275                                | MessageType::Emote(_)
276                                | MessageType::File(_)
277                                | MessageType::Image(_)
278                                | MessageType::Location(_)
279                                | MessageType::Notice(_)
280                                | MessageType::ServerNotice(_)
281                                | MessageType::Text(_)
282                                | MessageType::Video(_)
283                                | MessageType::VerificationRequest(_) => true,
284                                #[cfg(feature = "unstable-msc4274")]
285                                MessageType::Gallery(_) => true,
286                                _ => false,
287                            }
288                        }
289
290                        AnyMessageLikeEventContent::Sticker(_)
291                        | AnyMessageLikeEventContent::UnstablePollStart(
292                            UnstablePollStartEventContent::New(_),
293                        )
294                        | AnyMessageLikeEventContent::CallInvite(_)
295                        | AnyMessageLikeEventContent::RtcNotification(_)
296                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
297
298                        _ => false,
299                    }
300                }
301            }
302        }
303
304        AnySyncTimelineEvent::State(_) => {
305            // All the state events may get displayed by default.
306            true
307        }
308    }
309}
310
311impl<P: RoomDataProvider> TimelineController<P> {
312    pub(super) fn new(
313        room_data_provider: P,
314        focus: TimelineFocus,
315        internal_id_prefix: Option<String>,
316        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
317        is_room_encrypted: bool,
318        settings: TimelineSettings,
319    ) -> Self {
320        let focus = match focus {
321            TimelineFocus::Live { hide_threaded_events } => {
322                TimelineFocusKind::Live { hide_threaded_events }
323            }
324
325            TimelineFocus::Event { hide_threaded_events, .. } => {
326                let paginator = Paginator::new(room_data_provider.clone());
327                TimelineFocusKind::Event { paginator, hide_threaded_events }
328            }
329
330            TimelineFocus::Thread { root_event_id, .. } => {
331                TimelineFocusKind::Thread { root_event_id }
332            }
333
334            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
335                TimelineFocusKind::PinnedEvents {
336                    loader: PinnedEventsLoader::new(
337                        Arc::new(room_data_provider.clone()),
338                        max_events_to_load as usize,
339                        max_concurrent_requests as usize,
340                    ),
341                }
342            }
343        };
344
345        let focus = Arc::new(focus);
346        let state = Arc::new(RwLock::new(TimelineState::new(
347            focus.clone(),
348            room_data_provider.own_user_id().to_owned(),
349            room_data_provider.room_version_rules(),
350            internal_id_prefix,
351            unable_to_decrypt_hook,
352            is_room_encrypted,
353        )));
354
355        let decryption_retry_task =
356            DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
357
358        Self { state, focus, room_data_provider, settings, decryption_retry_task }
359    }
360
361    /// Initializes the configured focus with appropriate data.
362    ///
363    /// Should be called only once after creation of the [`TimelineInner`], with
364    /// all its fields set.
365    ///
366    /// Returns whether there were any events added to the timeline.
367    pub(super) async fn init_focus(
368        &self,
369        focus: &TimelineFocus,
370        room_event_cache: &RoomEventCache,
371    ) -> Result<bool, Error> {
372        match focus {
373            TimelineFocus::Live { .. } => {
374                // Retrieve the cached events, and add them to the timeline.
375                let events = room_event_cache.events().await;
376
377                let has_events = !events.is_empty();
378
379                self.replace_with_initial_remote_events(
380                    events.into_iter(),
381                    RemoteEventOrigin::Cache,
382                )
383                .await;
384
385                match room_event_cache.pagination().status().get() {
386                    RoomPaginationStatus::Idle { hit_timeline_start } => {
387                        if hit_timeline_start {
388                            // Eagerly insert the timeline start item, since pagination claims
389                            // we've already hit the timeline start.
390                            self.insert_timeline_start_if_missing().await;
391                        }
392                    }
393                    RoomPaginationStatus::Paginating => {}
394                }
395
396                Ok(has_events)
397            }
398
399            TimelineFocus::Event { target: event_id, num_context_events, .. } => {
400                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
401                    // Note: this is sync'd with code in the ctor.
402                    unreachable!();
403                };
404
405                // Start a /context request, and append the results (in order) to the timeline.
406                let start_from_result = paginator
407                    .start_from(event_id, (*num_context_events).into())
408                    .await
409                    .map_err(PaginationError::Paginator)?;
410
411                let has_events = !start_from_result.events.is_empty();
412
413                self.replace_with_initial_remote_events(
414                    start_from_result.events.into_iter(),
415                    RemoteEventOrigin::Pagination,
416                )
417                .await;
418
419                Ok(has_events)
420            }
421
422            TimelineFocus::Thread { root_event_id, .. } => {
423                let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
424                let has_events = !events.is_empty();
425
426                // For each event, we also need to find the related events, as they don't
427                // include the thread relationship, they won't be included in
428                // the initial list of events.
429                let mut related_events = Vector::new();
430                for event_id in events.iter().filter_map(|event| event.event_id()) {
431                    if let Some((_original, related)) =
432                        room_event_cache.find_event_with_relations(&event_id, None).await
433                    {
434                        related_events.extend(related);
435                    }
436                }
437
438                self.replace_with_initial_remote_events(
439                    events.into_iter(),
440                    RemoteEventOrigin::Cache,
441                )
442                .await;
443
444                // Now that we've inserted the thread events, add the aggregations too.
445                if !related_events.is_empty() {
446                    self.handle_remote_aggregations(
447                        vec![VectorDiff::Append { values: related_events }],
448                        RemoteEventOrigin::Cache,
449                    )
450                    .await;
451                }
452
453                Ok(has_events)
454            }
455
456            TimelineFocus::PinnedEvents { .. } => {
457                let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
458                    // Note: this is sync'd with code in the ctor.
459                    unreachable!();
460                };
461
462                let Some(loaded_events) =
463                    loader.load_events().await.map_err(Error::PinnedEventsError)?
464                else {
465                    // There wasn't any events.
466                    return Ok(false);
467                };
468
469                let has_events = !loaded_events.is_empty();
470
471                self.replace_with_initial_remote_events(
472                    loaded_events.into_iter(),
473                    RemoteEventOrigin::Pagination,
474                )
475                .await;
476
477                Ok(has_events)
478            }
479        }
480    }
481
482    /// Listens to encryption state changes for the room in
483    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
484    /// existing timeline items. This will then cause a refresh of those
485    /// timeline items.
486    pub async fn handle_encryption_state_changes(&self) {
487        let mut room_info = self.room_data_provider.room_info();
488
489        // Small function helper to help mark as encrypted.
490        let mark_encrypted = || async {
491            let mut state = self.state.write().await;
492            state.meta.is_room_encrypted = true;
493            state.mark_all_events_as_encrypted();
494        };
495
496        if room_info.get().encryption_state().is_encrypted() {
497            // If the room was already encrypted, it won't toggle to unencrypted, so we can
498            // shut down this task early.
499            mark_encrypted().await;
500            return;
501        }
502
503        while let Some(info) = room_info.next().await {
504            if info.encryption_state().is_encrypted() {
505                mark_encrypted().await;
506                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
507                // here is done.
508                break;
509            }
510        }
511    }
512
513    pub(crate) async fn reload_pinned_events(
514        &self,
515    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
516        if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
517            loader.load_events().await
518        } else {
519            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
520        }
521    }
522
523    /// Run a lazy backwards pagination (in live mode).
524    ///
525    /// It adjusts the `count` value of the `Skip` higher-order stream so that
526    /// more items are pushed front in the timeline.
527    ///
528    /// If no more items are available (i.e. if the `count` is zero), this
529    /// method returns `Some(needs)` where `needs` is the number of events that
530    /// must be unlazily backwards paginated.
531    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
532        let state = self.state.read().await;
533
534        let (count, needs) = state
535            .meta
536            .subscriber_skip_count
537            .compute_next_when_paginating_backwards(num_events.into());
538
539        // This always happens on a live timeline.
540        let is_live_timeline = true;
541        state.meta.subscriber_skip_count.update(count, is_live_timeline);
542
543        needs
544    }
545
546    /// Run a backwards pagination (in focused mode) and append the results to
547    /// the timeline.
548    ///
549    /// Returns whether we hit the start of the timeline.
550    pub(super) async fn focused_paginate_backwards(
551        &self,
552        num_events: u16,
553    ) -> Result<bool, PaginationError> {
554        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
555            TimelineFocusKind::Live { .. }
556            | TimelineFocusKind::PinnedEvents { .. }
557            | TimelineFocusKind::Thread { .. } => {
558                return Err(PaginationError::NotSupported);
559            }
560            TimelineFocusKind::Event { paginator, .. } => paginator
561                .paginate_backward(num_events.into())
562                .await
563                .map_err(PaginationError::Paginator)?,
564        };
565
566        // Events are in reverse topological order.
567        // We can push front each event individually.
568        self.handle_remote_events_with_diffs(
569            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
570            RemoteEventOrigin::Pagination,
571        )
572        .await;
573
574        Ok(hit_end_of_timeline)
575    }
576
577    /// Run a forwards pagination (in focused mode) and append the results to
578    /// the timeline.
579    ///
580    /// Returns whether we hit the end of the timeline.
581    pub(super) async fn focused_paginate_forwards(
582        &self,
583        num_events: u16,
584    ) -> Result<bool, PaginationError> {
585        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
586            TimelineFocusKind::Live { .. }
587            | TimelineFocusKind::PinnedEvents { .. }
588            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
589
590            TimelineFocusKind::Event { paginator, .. } => paginator
591                .paginate_forward(num_events.into())
592                .await
593                .map_err(PaginationError::Paginator)?,
594        };
595
596        // Events are in topological order.
597        // We can append all events with no transformation.
598        self.handle_remote_events_with_diffs(
599            vec![VectorDiff::Append { values: events.into() }],
600            RemoteEventOrigin::Pagination,
601        )
602        .await;
603
604        Ok(hit_end_of_timeline)
605    }
606
607    /// Is this timeline receiving events from sync (aka has a live focus)?
608    pub(super) fn is_live(&self) -> bool {
609        matches!(&*self.focus, TimelineFocusKind::Live { .. })
610    }
611
612    /// Is this timeline focused on a thread?
613    pub(super) fn is_threaded(&self) -> bool {
614        self.focus.is_thread()
615    }
616
617    /// The root of the current thread, for a live thread timeline or a
618    /// permalink to a thread message.
619    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
620        self.focus.thread_root().map(ToOwned::to_owned)
621    }
622
623    /// Get a copy of the current items in the list.
624    ///
625    /// Cheap because `im::Vector` is cheap to clone.
626    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
627        self.state.read().await.items.clone_items()
628    }
629
630    #[cfg(test)]
631    pub(super) async fn subscribe_raw(
632        &self,
633    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
634        self.state.read().await.items.subscribe().into_values_and_stream()
635    }
636
637    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
638        let state = self.state.read().await;
639
640        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
641    }
642
643    pub(super) async fn subscribe_filter_map<U, F>(
644        &self,
645        f: F,
646    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
647    where
648        U: Clone,
649        F: Fn(Arc<TimelineItem>) -> Option<U>,
650    {
651        self.state.read().await.items.subscribe().filter_map(f)
652    }
653
654    /// Toggle a reaction locally.
655    ///
656    /// Returns true if the reaction was added, false if it was removed.
657    #[instrument(skip_all)]
658    pub(super) async fn toggle_reaction_local(
659        &self,
660        item_id: &TimelineEventItemId,
661        key: &str,
662    ) -> Result<bool, Error> {
663        let mut state = self.state.write().await;
664
665        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
666            warn!("Timeline item not found, can't add reaction");
667            return Err(Error::FailedToToggleReaction);
668        };
669
670        let user_id = self.room_data_provider.own_user_id();
671        let prev_status = item
672            .content()
673            .reactions()
674            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
675
676        let Some(prev_status) = prev_status else {
677            // Adding the new reaction.
678            match item.handle() {
679                TimelineItemHandle::Local(send_handle) => {
680                    if send_handle
681                        .react(key.to_owned())
682                        .await
683                        .map_err(|err| Error::SendQueueError(err.into()))?
684                        .is_some()
685                    {
686                        trace!("adding a reaction to a local echo");
687                        return Ok(true);
688                    }
689
690                    warn!("couldn't toggle reaction for local echo");
691                    return Ok(false);
692                }
693
694                TimelineItemHandle::Remote(event_id) => {
695                    // Add a reaction through the room data provider.
696                    // No need to reflect the effect locally, since the local echo handling will
697                    // take care of it.
698                    trace!("adding a reaction to a remote echo");
699                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
700                    self.room_data_provider
701                        .send(ReactionEventContent::from(annotation).into())
702                        .await?;
703                    return Ok(true);
704                }
705            }
706        };
707
708        trace!("removing a previous reaction");
709        match prev_status {
710            ReactionStatus::LocalToLocal(send_reaction_handle) => {
711                if let Some(handle) = send_reaction_handle {
712                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
713                        // Impossible state: the reaction has moved from local to echo under our
714                        // feet, but the timeline was supposed to be locked!
715                        warn!("unexpectedly unable to abort sending of local reaction");
716                    }
717                } else {
718                    warn!("no send reaction handle (this should only happen in testing contexts)");
719                }
720            }
721
722            ReactionStatus::LocalToRemote(send_handle) => {
723                // No need to reflect the change ourselves, since handling the discard of the
724                // local echo will take care of it.
725                trace!("aborting send of the previous reaction that was a local echo");
726                if let Some(handle) = send_handle {
727                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
728                        // Impossible state: the reaction has moved from local to echo under our
729                        // feet, but the timeline was supposed to be locked!
730                        warn!("unexpectedly unable to abort sending of local reaction");
731                    }
732                } else {
733                    warn!("no send handle (this should only happen in testing contexts)");
734                }
735            }
736
737            ReactionStatus::RemoteToRemote(event_id) => {
738                // Assume the redaction will work; we'll re-add the reaction if it didn't.
739                let Some(annotated_event_id) =
740                    item.as_remote().map(|event_item| event_item.event_id.clone())
741                else {
742                    warn!("remote reaction to remote event, but the associated item isn't remote");
743                    return Ok(false);
744                };
745
746                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
747                let reaction_info = reactions.remove_reaction(user_id, key);
748
749                if reaction_info.is_some() {
750                    let new_item = item.with_reactions(reactions);
751                    state.items.replace(item_pos, new_item);
752                } else {
753                    warn!(
754                        "reaction is missing on the item, not removing it locally, \
755                         but sending redaction."
756                    );
757                }
758
759                // Release the lock before running the request.
760                drop(state);
761
762                trace!("sending redact for a previous reaction");
763                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
764                    if let Some(reaction_info) = reaction_info {
765                        debug!("sending redact failed, adding the reaction back to the list");
766
767                        let mut state = self.state.write().await;
768                        if let Some((item_pos, item)) =
769                            rfind_event_by_id(&state.items, &annotated_event_id)
770                        {
771                            // Re-add the reaction to the mapping.
772                            let mut reactions =
773                                item.content().reactions().cloned().unwrap_or_default();
774                            reactions
775                                .entry(key.to_owned())
776                                .or_default()
777                                .insert(user_id.to_owned(), reaction_info);
778                            let new_item = item.with_reactions(reactions);
779                            state.items.replace(item_pos, new_item);
780                        } else {
781                            warn!(
782                                "couldn't find item to re-add reaction anymore; \
783                                 maybe it's been redacted?"
784                            );
785                        }
786                    }
787
788                    return Err(err);
789                }
790            }
791        }
792
793        Ok(false)
794    }
795
796    /// Handle updates on events as [`VectorDiff`]s.
797    pub(super) async fn handle_remote_events_with_diffs(
798        &self,
799        diffs: Vec<VectorDiff<TimelineEvent>>,
800        origin: RemoteEventOrigin,
801    ) {
802        if diffs.is_empty() {
803            return;
804        }
805
806        let mut state = self.state.write().await;
807        state
808            .handle_remote_events_with_diffs(
809                diffs,
810                origin,
811                &self.room_data_provider,
812                &self.settings,
813            )
814            .await
815    }
816
817    /// Only handle aggregations received as [`VectorDiff`]s.
818    pub(super) async fn handle_remote_aggregations(
819        &self,
820        diffs: Vec<VectorDiff<TimelineEvent>>,
821        origin: RemoteEventOrigin,
822    ) {
823        if diffs.is_empty() {
824            return;
825        }
826
827        let mut state = self.state.write().await;
828        state
829            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
830            .await
831    }
832
833    pub(super) async fn clear(&self) {
834        self.state.write().await.clear();
835    }
836
837    /// Replaces the content of the current timeline with initial events.
838    ///
839    /// Also sets up read receipts and the read marker for a live timeline of a
840    /// room.
841    ///
842    /// This is all done with a single lock guard, since we don't want the state
843    /// to be modified between the clear and re-insertion of new events.
844    pub(super) async fn replace_with_initial_remote_events<Events>(
845        &self,
846        events: Events,
847        origin: RemoteEventOrigin,
848    ) where
849        Events: IntoIterator + ExactSizeIterator,
850        <Events as IntoIterator>::Item: Into<TimelineEvent>,
851    {
852        let mut state = self.state.write().await;
853
854        let track_read_markers = self.settings.track_read_receipts;
855        if track_read_markers {
856            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
857            state
858                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
859                .await;
860        }
861
862        // Replace the events if either the current event list or the new one aren't
863        // empty.
864        // Previously we just had to check the new one wasn't empty because
865        // we did a clear operation before so the current one would always be empty, but
866        // now we may want to replace a populated timeline with an empty one.
867        if !state.items.is_empty() || events.len() > 0 {
868            state
869                .replace_with_remote_events(
870                    events,
871                    origin,
872                    &self.room_data_provider,
873                    &self.settings,
874                )
875                .await;
876        }
877
878        if track_read_markers
879            && let Some(fully_read_event_id) =
880                self.room_data_provider.load_fully_read_marker().await
881        {
882            state.handle_fully_read_marker(fully_read_event_id);
883        }
884    }
885
886    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
887        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
888    }
889
890    pub(super) async fn handle_ephemeral_events(
891        &self,
892        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
893    ) {
894        let mut state = self.state.write().await;
895        state.handle_ephemeral_events(events, &self.room_data_provider).await;
896    }
897
898    /// Creates the local echo for an event we're sending.
899    #[instrument(skip_all)]
900    pub(super) async fn handle_local_event(
901        &self,
902        txn_id: OwnedTransactionId,
903        content: AnyMessageLikeEventContent,
904        send_handle: Option<SendHandle>,
905    ) {
906        let sender = self.room_data_provider.own_user_id().to_owned();
907        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
908
909        let date_divider_mode = self.settings.date_divider_mode.clone();
910
911        let mut state = self.state.write().await;
912        state
913            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
914            .await;
915    }
916
917    /// Update the send state of a local event represented by a transaction ID.
918    ///
919    /// If the corresponding local timeline item is missing, a warning is
920    /// raised.
921    #[instrument(skip(self))]
922    pub(super) async fn update_event_send_state(
923        &self,
924        txn_id: &TransactionId,
925        send_state: EventSendState,
926    ) {
927        let mut state = self.state.write().await;
928        let mut txn = state.transaction();
929
930        let new_event_id: Option<&EventId> =
931            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
932
933        // The local echoes are always at the end of the timeline, we must first make
934        // sure the remote echo hasn't showed up yet.
935        if rfind_event_item(&txn.items, |it| {
936            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
937        })
938        .is_some()
939        {
940            // Remote echo already received. This is very unlikely.
941            trace!("Remote echo received before send-event response");
942
943            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
944
945            // If there's both the remote echo and a local echo, that means the
946            // remote echo was received before the response *and* contained no
947            // transaction ID (and thus duplicated the local echo).
948            if let Some((idx, _)) = local_echo {
949                warn!("Message echo got duplicated, removing the local one");
950                txn.items.remove(idx);
951
952                // Adjust the date dividers, if needs be.
953                let mut adjuster =
954                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
955                adjuster.run(&mut txn.items, &mut txn.meta);
956            }
957
958            txn.commit();
959            return;
960        }
961
962        // Look for the local event by the transaction ID or event ID.
963        let result = rfind_event_item(&txn.items, |it| {
964            it.transaction_id() == Some(txn_id)
965                || new_event_id.is_some()
966                    && it.event_id() == new_event_id
967                    && it.as_local().is_some()
968        });
969
970        let Some((idx, item)) = result else {
971            // Event wasn't found as a standalone item.
972            //
973            // If it was just sent, try to find if it matches a corresponding aggregation,
974            // and mark it as sent in that case.
975            if let Some(new_event_id) = new_event_id {
976                if txn.meta.aggregations.mark_aggregation_as_sent(
977                    txn_id.to_owned(),
978                    new_event_id.to_owned(),
979                    &mut txn.items,
980                    &txn.meta.room_version_rules,
981                ) {
982                    trace!("Aggregation marked as sent");
983                    txn.commit();
984                    return;
985                }
986
987                trace!("Sent aggregation was not found");
988            }
989
990            warn!("Timeline item not found, can't update send state");
991            return;
992        };
993
994        let Some(local_item) = item.as_local() else {
995            warn!("We looked for a local item, but it transitioned to remote.");
996            return;
997        };
998
999        // The event was already marked as sent, that's a broken state, let's
1000        // emit an error but also override to the given sent state.
1001        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1002            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1003        }
1004
1005        // If the event has just been marked as sent, update the aggregations mapping to
1006        // take that into account.
1007        if let Some(new_event_id) = new_event_id {
1008            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1009        }
1010
1011        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1012        txn.items.replace(idx, new_item);
1013
1014        txn.commit();
1015    }
1016
1017    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1018        let mut state = self.state.write().await;
1019
1020        if let Some((idx, _)) =
1021            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1022        {
1023            let mut txn = state.transaction();
1024
1025            txn.items.remove(idx);
1026
1027            // A read marker or a date divider may have been inserted before the local echo.
1028            // Ensure both are up to date.
1029            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1030            adjuster.run(&mut txn.items, &mut txn.meta);
1031
1032            txn.meta.update_read_marker(&mut txn.items);
1033
1034            txn.commit();
1035
1036            debug!("discarded local echo");
1037            return true;
1038        }
1039
1040        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1041        // dereferencing it once.
1042        let mut txn = state.transaction();
1043
1044        // Look if this was a local aggregation.
1045        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1046            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1047            &mut txn.items,
1048        ) {
1049            Ok(val) => val,
1050            Err(err) => {
1051                warn!("error when discarding local echo for an aggregation: {err}");
1052                // The aggregation has been found, it's just that we couldn't discard it.
1053                true
1054            }
1055        };
1056
1057        if found_aggregation {
1058            txn.commit();
1059        }
1060
1061        found_aggregation
1062    }
1063
1064    pub(super) async fn replace_local_echo(
1065        &self,
1066        txn_id: &TransactionId,
1067        content: AnyMessageLikeEventContent,
1068    ) -> bool {
1069        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1070            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1071            // handling RoomMessage should be sufficient in most cases. Worst
1072            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1073            // editing the event then.
1074            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1075            return false;
1076        };
1077
1078        let mut state = self.state.write().await;
1079        let mut txn = state.transaction();
1080
1081        let Some((idx, prev_item)) =
1082            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1083        else {
1084            debug!("Can't find local echo to replace");
1085            return false;
1086        };
1087
1088        // Reuse the previous local echo's state, but reset the send state to not sent
1089        // (per API contract).
1090        let ti_kind = {
1091            let Some(prev_local_item) = prev_item.as_local() else {
1092                warn!("We looked for a local item, but it transitioned as remote??");
1093                return false;
1094            };
1095            // If the local echo had an upload progress, retain it.
1096            let progress = as_variant!(&prev_local_item.send_state,
1097                EventSendState::NotSentYet { progress } => progress.clone())
1098            .flatten();
1099            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1100        };
1101
1102        // Replace the local-related state (kind) and the content state.
1103        let new_item = TimelineItem::new(
1104            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1105                content.msgtype,
1106                content.mentions,
1107                prev_item.content().reactions().cloned().unwrap_or_default(),
1108                prev_item.content().thread_root(),
1109                prev_item.content().in_reply_to(),
1110                prev_item.content().thread_summary(),
1111            )),
1112            prev_item.internal_id.to_owned(),
1113        );
1114
1115        txn.items.replace(idx, new_item);
1116
1117        // This doesn't change the original sending time, so there's no need to adjust
1118        // date dividers.
1119
1120        txn.commit();
1121
1122        debug!("Replaced local echo");
1123        true
1124    }
1125
1126    pub(crate) async fn retry_event_decryption_inner(&self, session_ids: Option<BTreeSet<String>>) {
1127        self.decryption_retry_task
1128            .decrypt(self.room_data_provider.clone(), session_ids, self.settings.clone())
1129            .await;
1130    }
1131
1132    pub(super) async fn set_sender_profiles_pending(&self) {
1133        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1134    }
1135
1136    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1137        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1138    }
1139
1140    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1141        self.state.write().await.items.for_each(|mut entry| {
1142            let Some(event_item) = entry.as_event() else { return };
1143            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1144                let new_item = entry.with_kind(TimelineItemKind::Event(
1145                    event_item.with_sender_profile(profile_state.clone()),
1146                ));
1147                ObservableItemsEntry::replace(&mut entry, new_item);
1148            }
1149        });
1150    }
1151
1152    pub(super) async fn update_missing_sender_profiles(&self) {
1153        trace!("Updating missing sender profiles");
1154
1155        let mut state = self.state.write().await;
1156        let mut entries = state.items.entries();
1157        while let Some(mut entry) = entries.next() {
1158            let Some(event_item) = entry.as_event() else { continue };
1159            let event_id = event_item.event_id().map(debug);
1160            let transaction_id = event_item.transaction_id().map(debug);
1161
1162            if event_item.sender_profile().is_ready() {
1163                trace!(event_id, transaction_id, "Profile already set");
1164                continue;
1165            }
1166
1167            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1168                Some(profile) => {
1169                    trace!(event_id, transaction_id, "Adding profile");
1170                    let updated_item =
1171                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1172                    let new_item = entry.with_kind(updated_item);
1173                    ObservableItemsEntry::replace(&mut entry, new_item);
1174                }
1175                None => {
1176                    if !event_item.sender_profile().is_unavailable() {
1177                        trace!(event_id, transaction_id, "Marking profile unavailable");
1178                        let updated_item =
1179                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1180                        let new_item = entry.with_kind(updated_item);
1181                        ObservableItemsEntry::replace(&mut entry, new_item);
1182                    } else {
1183                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1184                    }
1185                }
1186            }
1187        }
1188
1189        trace!("Done updating missing sender profiles");
1190    }
1191
1192    /// Update the profiles of the given senders, even if they are ready.
1193    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1194        trace!("Forcing update of sender profiles: {sender_ids:?}");
1195
1196        let mut state = self.state.write().await;
1197        let mut entries = state.items.entries();
1198        while let Some(mut entry) = entries.next() {
1199            let Some(event_item) = entry.as_event() else { continue };
1200            if !sender_ids.contains(event_item.sender()) {
1201                continue;
1202            }
1203
1204            let event_id = event_item.event_id().map(debug);
1205            let transaction_id = event_item.transaction_id().map(debug);
1206
1207            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1208                Some(profile) => {
1209                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1210                    {
1211                        debug!(event_id, transaction_id, "Profile already up-to-date");
1212                    } else {
1213                        trace!(event_id, transaction_id, "Updating profile");
1214                        let updated_item =
1215                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1216                        let new_item = entry.with_kind(updated_item);
1217                        ObservableItemsEntry::replace(&mut entry, new_item);
1218                    }
1219                }
1220                None => {
1221                    if !event_item.sender_profile().is_unavailable() {
1222                        trace!(event_id, transaction_id, "Marking profile unavailable");
1223                        let updated_item =
1224                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1225                        let new_item = entry.with_kind(updated_item);
1226                        ObservableItemsEntry::replace(&mut entry, new_item);
1227                    } else {
1228                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1229                    }
1230                }
1231            }
1232        }
1233
1234        trace!("Done forcing update of sender profiles");
1235    }
1236
1237    #[cfg(test)]
1238    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1239        let own_user_id = self.room_data_provider.own_user_id();
1240        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1241    }
1242
1243    /// Get the latest read receipt for the given user.
1244    ///
1245    /// Useful to get the latest read receipt, whether it's private or public.
1246    pub(super) async fn latest_user_read_receipt(
1247        &self,
1248        user_id: &UserId,
1249    ) -> Option<(OwnedEventId, Receipt)> {
1250        let receipt_thread = self.focus.receipt_thread();
1251
1252        self.state
1253            .read()
1254            .await
1255            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1256            .await
1257    }
1258
1259    /// Get the ID of the timeline event with the latest read receipt for the
1260    /// given user.
1261    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1262        &self,
1263        user_id: &UserId,
1264    ) -> Option<OwnedEventId> {
1265        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1266    }
1267
1268    /// Subscribe to changes in the read receipts of our own user.
1269    pub async fn subscribe_own_user_read_receipts_changed(
1270        &self,
1271    ) -> impl Stream<Item = ()> + use<P> {
1272        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1273    }
1274
1275    /// Handle a room send update that's a new local echo.
1276    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1277        match echo.content {
1278            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1279                let content = match serialized_event.deserialize() {
1280                    Ok(d) => d,
1281                    Err(err) => {
1282                        warn!("error deserializing local echo: {err}");
1283                        return;
1284                    }
1285                };
1286
1287                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1288                    .await;
1289
1290                if let Some(send_error) = send_error {
1291                    self.update_event_send_state(
1292                        &echo.transaction_id,
1293                        EventSendState::SendingFailed {
1294                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1295                                send_error,
1296                            ))),
1297                            is_recoverable: false,
1298                        },
1299                    )
1300                    .await;
1301                }
1302            }
1303
1304            LocalEchoContent::React { key, send_handle, applies_to } => {
1305                self.handle_local_reaction(key, send_handle, applies_to).await;
1306            }
1307        }
1308    }
1309
1310    /// Adds a reaction (local echo) to a local echo.
1311    #[instrument(skip(self, send_handle))]
1312    async fn handle_local_reaction(
1313        &self,
1314        reaction_key: String,
1315        send_handle: SendReactionHandle,
1316        applies_to: OwnedTransactionId,
1317    ) {
1318        let mut state = self.state.write().await;
1319        let mut tr = state.transaction();
1320
1321        let target = TimelineEventItemId::TransactionId(applies_to);
1322
1323        let reaction_txn_id = send_handle.transaction_id().to_owned();
1324        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1325        let aggregation = Aggregation::new(
1326            TimelineEventItemId::TransactionId(reaction_txn_id),
1327            AggregationKind::Reaction {
1328                key: reaction_key.clone(),
1329                sender: self.room_data_provider.own_user_id().to_owned(),
1330                timestamp: MilliSecondsSinceUnixEpoch::now(),
1331                reaction_status,
1332            },
1333        );
1334
1335        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1336        find_item_and_apply_aggregation(
1337            &tr.meta.aggregations,
1338            &mut tr.items,
1339            &target,
1340            aggregation,
1341            &tr.meta.room_version_rules,
1342        );
1343
1344        tr.commit();
1345    }
1346
1347    /// Handle a single room send queue update.
1348    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1349        match update {
1350            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1351                self.handle_local_echo(echo).await;
1352            }
1353
1354            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1355                if !self.discard_local_echo(&transaction_id).await {
1356                    warn!("couldn't find the local echo to discard");
1357                }
1358            }
1359
1360            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1361                let content = match new_content.deserialize() {
1362                    Ok(d) => d,
1363                    Err(err) => {
1364                        warn!("error deserializing local echo (upon edit): {err}");
1365                        return;
1366                    }
1367                };
1368
1369                if !self.replace_local_echo(&transaction_id, content).await {
1370                    warn!("couldn't find the local echo to replace");
1371                }
1372            }
1373
1374            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1375                self.update_event_send_state(
1376                    &transaction_id,
1377                    EventSendState::SendingFailed { error, is_recoverable },
1378                )
1379                .await;
1380            }
1381
1382            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1383                self.update_event_send_state(
1384                    &transaction_id,
1385                    EventSendState::NotSentYet { progress: None },
1386                )
1387                .await;
1388            }
1389
1390            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1391                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1392                    .await;
1393            }
1394
1395            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1396                self.update_event_send_state(
1397                    &related_to,
1398                    EventSendState::NotSentYet {
1399                        progress: Some(MediaUploadProgress { index, progress }),
1400                    },
1401                )
1402                .await;
1403            }
1404        }
1405    }
1406
1407    /// Insert a timeline start item at the beginning of the room, if it's
1408    /// missing.
1409    pub async fn insert_timeline_start_if_missing(&self) {
1410        let mut state = self.state.write().await;
1411        let mut txn = state.transaction();
1412        txn.items.push_timeline_start_if_missing(
1413            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1414        );
1415        txn.commit();
1416    }
1417
1418    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1419    /// timeline or not.
1420    ///
1421    /// Can be `None` if the event cannot be represented as a standalone item,
1422    /// because it's an aggregation.
1423    pub(super) async fn make_replied_to(
1424        &self,
1425        event: TimelineEvent,
1426    ) -> Result<Option<EmbeddedEvent>, Error> {
1427        let state = self.state.read().await;
1428        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1429    }
1430}
1431
1432impl TimelineController {
1433    pub(super) fn room(&self) -> &Room {
1434        &self.room_data_provider
1435    }
1436
1437    /// Given an event identifier, will fetch the details for the event it's
1438    /// replying to, if applicable.
1439    #[instrument(skip(self))]
1440    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1441        let state_guard = self.state.write().await;
1442        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1443            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1444        let remote_item = item
1445            .as_remote()
1446            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1447            .clone();
1448
1449        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1450            debug!("Event is not a message");
1451            return Ok(());
1452        };
1453        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1454            debug!("Event is not a reply");
1455            return Ok(());
1456        };
1457        if let TimelineDetails::Pending = &in_reply_to.event {
1458            debug!("Replied-to event is already being fetched");
1459            return Ok(());
1460        }
1461        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1462            debug!("Replied-to event has already been fetched");
1463            return Ok(());
1464        }
1465
1466        let internal_id = item.internal_id.to_owned();
1467        let item = item.clone();
1468        let event = fetch_replied_to_event(
1469            state_guard,
1470            &self.state,
1471            index,
1472            &item,
1473            internal_id,
1474            &msglike,
1475            &in_reply_to.event_id,
1476            self.room(),
1477        )
1478        .await?;
1479
1480        // We need to be sure to have the latest position of the event as it might have
1481        // changed while waiting for the request.
1482        let mut state = self.state.write().await;
1483        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1484            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1485
1486        // Check the state of the event again, it might have been redacted while
1487        // the request was in-flight.
1488        let TimelineItemContent::MsgLike(MsgLikeContent {
1489            kind: MsgLikeKind::Message(message),
1490            reactions,
1491            thread_root,
1492            in_reply_to,
1493            thread_summary,
1494        }) = item.content().clone()
1495        else {
1496            info!("Event is no longer a message (redacted?)");
1497            return Ok(());
1498        };
1499        let Some(in_reply_to) = in_reply_to else {
1500            warn!("Event no longer has a reply (bug?)");
1501            return Ok(());
1502        };
1503
1504        // Now that we've received the content of the replied-to event, replace the
1505        // replied-to content in the item with it.
1506        trace!("Updating in-reply-to details");
1507        let internal_id = item.internal_id.to_owned();
1508        let mut item = item.clone();
1509        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1510            kind: MsgLikeKind::Message(message),
1511            reactions,
1512            thread_root,
1513            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1514            thread_summary,
1515        }));
1516        state.items.replace(index, TimelineItem::new(item, internal_id));
1517
1518        Ok(())
1519    }
1520
1521    /// Returns the thread that should be used for a read receipt based on the
1522    /// current focus of the timeline and the receipt type.
1523    ///
1524    /// A `SendReceiptType::FullyRead` will always use
1525    /// `ReceiptThread::Unthreaded`
1526    pub(super) fn infer_thread_for_read_receipt(
1527        &self,
1528        receipt_type: &SendReceiptType,
1529    ) -> ReceiptThread {
1530        if matches!(receipt_type, SendReceiptType::FullyRead) {
1531            ReceiptThread::Unthreaded
1532        } else {
1533            self.focus.receipt_thread()
1534        }
1535    }
1536
1537    /// Check whether the given receipt should be sent.
1538    ///
1539    /// Returns `false` if the given receipt is older than the current one.
1540    pub(super) async fn should_send_receipt(
1541        &self,
1542        receipt_type: &SendReceiptType,
1543        receipt_thread: &ReceiptThread,
1544        event_id: &EventId,
1545    ) -> bool {
1546        let own_user_id = self.room().own_user_id();
1547        let state = self.state.read().await;
1548        let room = self.room();
1549
1550        match receipt_type {
1551            SendReceiptType::Read => {
1552                if let Some((old_pub_read, _)) = state
1553                    .meta
1554                    .user_receipt(
1555                        own_user_id,
1556                        ReceiptType::Read,
1557                        receipt_thread.clone(),
1558                        room,
1559                        state.items.all_remote_events(),
1560                    )
1561                    .await
1562                {
1563                    trace!(%old_pub_read, "found a previous public receipt");
1564                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1565                        &old_pub_read,
1566                        event_id,
1567                        state.items.all_remote_events(),
1568                    ) {
1569                        trace!(
1570                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1571                        );
1572                        return relative_pos == RelativePosition::After;
1573                    }
1574                }
1575            }
1576
1577            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1578            // doesn't make sense to have a private read receipt behind a public one.
1579            SendReceiptType::ReadPrivate => {
1580                if let Some((old_priv_read, _)) =
1581                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1582                {
1583                    trace!(%old_priv_read, "found a previous private receipt");
1584                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1585                        &old_priv_read,
1586                        event_id,
1587                        state.items.all_remote_events(),
1588                    ) {
1589                        trace!(
1590                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1591                        );
1592                        return relative_pos == RelativePosition::After;
1593                    }
1594                }
1595            }
1596
1597            SendReceiptType::FullyRead => {
1598                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1599                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1600                        &prev_event_id,
1601                        event_id,
1602                        state.items.all_remote_events(),
1603                    )
1604                {
1605                    return relative_pos == RelativePosition::After;
1606                }
1607            }
1608
1609            _ => {}
1610        }
1611
1612        // Let the server handle unknown receipts.
1613        true
1614    }
1615
1616    /// Returns the latest event identifier, even if it's not visible, or if
1617    /// it's folded into another timeline item.
1618    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1619        let state = self.state.read().await;
1620        state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1621    }
1622
1623    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1624    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1625        self.retry_event_decryption_inner(session_ids).await
1626    }
1627
1628    /// Combine the global (event cache) pagination status with the local state
1629    /// of the timeline.
1630    ///
1631    /// This only changes the global pagination status of this room, in one
1632    /// case: if the timeline has a skip count greater than 0, it will
1633    /// ensure that the pagination status says that we haven't reached the
1634    /// timeline start yet.
1635    pub(super) async fn map_pagination_status(
1636        &self,
1637        status: RoomPaginationStatus,
1638    ) -> RoomPaginationStatus {
1639        match status {
1640            RoomPaginationStatus::Idle { hit_timeline_start } => {
1641                if hit_timeline_start {
1642                    let state = self.state.read().await;
1643                    // If the skip count is greater than 0, it means that a subsequent pagination
1644                    // could return more items, so pretend we didn't get the information that the
1645                    // timeline start was hit.
1646                    if state.meta.subscriber_skip_count.get() > 0 {
1647                        return RoomPaginationStatus::Idle { hit_timeline_start: false };
1648                    }
1649                }
1650            }
1651            RoomPaginationStatus::Paginating => {}
1652        }
1653
1654        // You're perfect, just the way you are.
1655        status
1656    }
1657}
1658
1659#[allow(clippy::too_many_arguments)]
1660async fn fetch_replied_to_event<P: RoomDataProvider>(
1661    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1662    state_lock: &RwLock<TimelineState<P>>,
1663    index: usize,
1664    item: &EventTimelineItem,
1665    internal_id: TimelineUniqueId,
1666    msglike: &MsgLikeContent,
1667    in_reply_to: &EventId,
1668    room: &Room,
1669) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1670    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1671        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1672        trace!("Found replied-to event locally");
1673        return Ok(details);
1674    }
1675
1676    // Replace the item with a new timeline item that has the fetching status of the
1677    // replied-to event to pending.
1678    trace!("Setting in-reply-to details to pending");
1679    let in_reply_to_details =
1680        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1681
1682    let event_item = item
1683        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1684
1685    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1686    state_guard.items.replace(index, new_timeline_item);
1687
1688    // Don't hold the state lock while the network request is made.
1689    drop(state_guard);
1690
1691    trace!("Fetching replied-to event");
1692    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1693        Ok(timeline_event) => {
1694            let state = state_lock.read().await;
1695
1696            let replied_to_item =
1697                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1698
1699            if let Some(item) = replied_to_item {
1700                TimelineDetails::Ready(Box::new(item))
1701            } else {
1702                // The replied-to item is an aggregation, not a standalone item.
1703                return Err(Error::UnsupportedEvent);
1704            }
1705        }
1706
1707        Err(e) => TimelineDetails::Error(Arc::new(e)),
1708    };
1709
1710    Ok(res)
1711}