matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::Result;
25use matrix_sdk::{
26    deserialized_responses::TimelineEvent,
27    event_cache::{RoomEventCache, RoomPaginationStatus},
28    paginators::{PaginationResult, PaginationToken, Paginator},
29    send_queue::{
30        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31    },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40        AnySyncTimelineEvent, MessageLikeEventType,
41        poll::unstable_start::UnstablePollStartEventContent,
42        reaction::ReactionEventContent,
43        receipt::{Receipt, ReceiptThread, ReceiptType},
44        relation::Annotation,
45        room::message::{MessageType, Relation},
46    },
47    room_version_rules::RoomVersionRules,
48    serde::Raw,
49};
50use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54    metadata::{RelativePosition, TimelineMetadata},
55    observable_items::{
56        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57        ObservableItemsTransactionEntry,
58    },
59    state::TimelineState,
60    state_transaction::TimelineStateTransaction,
61};
62use super::{
63    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
66    algorithms::{rfind_event_by_id, rfind_event_item},
67    event_item::{ReactionStatus, RemoteEventOrigin},
68    item::TimelineUniqueId,
69    subscriber::TimelineSubscriber,
70    traits::RoomDataProvider,
71};
72use crate::{
73    timeline::{
74        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75        algorithms::rfind_event_by_item_id,
76        date_dividers::DateDividerAdjuster,
77        event_item::TimelineItemHandle,
78        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79    },
80    unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
94use matrix_sdk_common::serde_helpers::extract_thread_root;
95
96/// Data associated to the current timeline focus.
97///
98/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
99/// version of it, including extra state that makes it useful over the lifetime
100/// of a timeline.
101#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
103    /// The timeline receives live events from the sync.
104    Live {
105        /// Whether to hide in-thread events from the timeline.
106        hide_threaded_events: bool,
107    },
108
109    /// The timeline is focused on a single event, and it can expand in one
110    /// direction or another.
111    Event {
112        /// The paginator instance.
113        paginator: OnceCell<AnyPaginator<P>>,
114    },
115
116    /// A live timeline for a thread.
117    Thread {
118        /// The root event for the current thread.
119        root_event_id: OwnedEventId,
120    },
121
122    PinnedEvents {
123        loader: PinnedEventsLoader,
124    },
125}
126
127#[derive(Debug)]
128pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
129    Unthreaded {
130        /// The actual event paginator.
131        paginator: Paginator<P>,
132        /// Whether to hide in-thread events from the timeline.
133        hide_threaded_events: bool,
134    },
135    Threaded(ThreadedEventsLoader<P>),
136}
137
138impl<P: RoomDataProvider> AnyPaginator<P> {
139    /// Runs a backward pagination (requesting `num_events` to the server), from
140    /// the current state of the object.
141    ///
142    /// Will return immediately if we have already hit the start of the
143    /// timeline.
144    ///
145    /// May return an error if it's already paginating, or if the call to
146    /// the homeserver endpoints failed.
147    pub async fn paginate_backwards(
148        &self,
149        num_events: u16,
150    ) -> Result<PaginationResult, PaginatorError> {
151        match self {
152            Self::Unthreaded { paginator, .. } => {
153                paginator.paginate_backward(num_events.into()).await
154            }
155            Self::Threaded(threaded_paginator) => {
156                threaded_paginator.paginate_backwards(num_events.into()).await
157            }
158        }
159    }
160
161    /// Runs a forward pagination (requesting `num_events` to the server), from
162    /// the current state of the object.
163    ///
164    /// Will return immediately if we have already hit the end of the timeline.
165    ///
166    /// May return an error if it's already paginating, or if the call to
167    /// the homeserver endpoints failed.
168    pub async fn paginate_forwards(
169        &self,
170        num_events: u16,
171    ) -> Result<PaginationResult, PaginatorError> {
172        match self {
173            Self::Unthreaded { paginator, .. } => {
174                paginator.paginate_forward(num_events.into()).await
175            }
176            Self::Threaded(threaded_paginator) => {
177                threaded_paginator.paginate_forwards(num_events.into()).await
178            }
179        }
180    }
181
182    /// Whether to hide in-thread events from the timeline.
183    pub fn hide_threaded_events(&self) -> bool {
184        match self {
185            Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
186            Self::Threaded(_) => false,
187        }
188    }
189
190    /// Returns the root event id of the thread, if the paginator is
191    /// [`AnyPaginator::Threaded`].
192    pub fn thread_root(&self) -> Option<&EventId> {
193        match self {
194            Self::Unthreaded { .. } => None,
195            Self::Threaded(thread_events_loader) => {
196                Some(thread_events_loader.thread_root_event_id())
197            }
198        }
199    }
200}
201
202impl<P: RoomDataProvider> TimelineFocusKind<P> {
203    /// Returns the [`ReceiptThread`] that should be used for the current
204    /// timeline focus.
205    ///
206    /// Live and event timelines will use the unthreaded read receipt type in
207    /// general, unless they hide in-thread events, in which case they will
208    /// use the main thread.
209    pub(super) fn receipt_thread(&self) -> ReceiptThread {
210        if let Some(thread_root) = self.thread_root() {
211            ReceiptThread::Thread(thread_root.to_owned())
212        } else if self.hide_threaded_events() {
213            ReceiptThread::Main
214        } else {
215            ReceiptThread::Unthreaded
216        }
217    }
218
219    /// Whether to hide in-thread events from the timeline.
220    fn hide_threaded_events(&self) -> bool {
221        match self {
222            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
223            TimelineFocusKind::Event { paginator } => {
224                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
225            }
226            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
227        }
228    }
229
230    /// Whether the focus is on a thread (from a live thread or a thread
231    /// permalink).
232    fn is_thread(&self) -> bool {
233        self.thread_root().is_some()
234    }
235
236    /// If the focus is a thread, returns its root event ID.
237    fn thread_root(&self) -> Option<&EventId> {
238        match self {
239            TimelineFocusKind::Event { paginator, .. } => {
240                paginator.get().and_then(|paginator| paginator.thread_root())
241            }
242            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
243            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
244        }
245    }
246}
247
248#[derive(Clone, Debug)]
249pub(super) struct TimelineController<P: RoomDataProvider = Room> {
250    /// Inner mutable state.
251    state: Arc<RwLock<TimelineState<P>>>,
252
253    /// Focus data.
254    focus: Arc<TimelineFocusKind<P>>,
255
256    /// A [`RoomDataProvider`] implementation, providing data.
257    ///
258    /// The type is a `RoomDataProvider` to allow testing. In the real world,
259    /// this would normally be a [`Room`].
260    pub(crate) room_data_provider: P,
261
262    /// Settings applied to this timeline.
263    pub(super) settings: TimelineSettings,
264
265    /// Long-running task used to retry decryption of timeline items without
266    /// blocking main processing.
267    decryption_retry_task: DecryptionRetryTask<P, P>,
268}
269
270#[derive(Clone)]
271pub(super) struct TimelineSettings {
272    /// Should the read receipts and read markers be handled?
273    pub(super) track_read_receipts: bool,
274
275    /// Event filter that controls what's rendered as a timeline item (and thus
276    /// what can carry read receipts).
277    pub(super) event_filter: Arc<TimelineEventFilterFn>,
278
279    /// Are unparsable events added as timeline items of their own kind?
280    pub(super) add_failed_to_parse: bool,
281
282    /// Should the timeline items be grouped by day or month?
283    pub(super) date_divider_mode: DateDividerMode,
284}
285
286#[cfg(not(tarpaulin_include))]
287impl fmt::Debug for TimelineSettings {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.debug_struct("TimelineSettings")
290            .field("track_read_receipts", &self.track_read_receipts)
291            .field("add_failed_to_parse", &self.add_failed_to_parse)
292            .finish_non_exhaustive()
293    }
294}
295
296impl Default for TimelineSettings {
297    fn default() -> Self {
298        Self {
299            track_read_receipts: false,
300            event_filter: Arc::new(default_event_filter),
301            add_failed_to_parse: true,
302            date_divider_mode: DateDividerMode::Daily,
303        }
304    }
305}
306
307/// The default event filter for
308/// [`crate::timeline::TimelineBuilder::event_filter`].
309///
310/// It filters out events that are not rendered by the timeline, including but
311/// not limited to: reactions, edits, redactions on existing messages.
312///
313/// If you have a custom filter, it may be best to chain yours with this one if
314/// you do not want to run into situations where a read receipt is not visible
315/// because it's living on an event that doesn't have a matching timeline item.
316pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
317    match event {
318        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
319            if ev.redacts(&rules.redaction).is_some() {
320                // This is a redaction of an existing message, we'll only update the previous
321                // message and not render a new entry.
322                false
323            } else {
324                // This is a redacted entry, that we'll show only if the redacted entity wasn't
325                // a reaction.
326                ev.event_type() != MessageLikeEventType::Reaction
327            }
328        }
329
330        AnySyncTimelineEvent::MessageLike(msg) => {
331            match msg.original_content() {
332                None => {
333                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
334                    // a reaction.
335                    msg.event_type() != MessageLikeEventType::Reaction
336                }
337
338                Some(original_content) => {
339                    match original_content {
340                        AnyMessageLikeEventContent::RoomMessage(content) => {
341                            if content
342                                .relates_to
343                                .as_ref()
344                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
345                            {
346                                // Edits aren't visible by default.
347                                return false;
348                            }
349
350                            match content.msgtype {
351                                MessageType::Audio(_)
352                                | MessageType::Emote(_)
353                                | MessageType::File(_)
354                                | MessageType::Image(_)
355                                | MessageType::Location(_)
356                                | MessageType::Notice(_)
357                                | MessageType::ServerNotice(_)
358                                | MessageType::Text(_)
359                                | MessageType::Video(_)
360                                | MessageType::VerificationRequest(_) => true,
361                                #[cfg(feature = "unstable-msc4274")]
362                                MessageType::Gallery(_) => true,
363                                _ => false,
364                            }
365                        }
366
367                        AnyMessageLikeEventContent::Sticker(_)
368                        | AnyMessageLikeEventContent::UnstablePollStart(
369                            UnstablePollStartEventContent::New(_),
370                        )
371                        | AnyMessageLikeEventContent::CallInvite(_)
372                        | AnyMessageLikeEventContent::RtcNotification(_)
373                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
374
375                        _ => false,
376                    }
377                }
378            }
379        }
380
381        AnySyncTimelineEvent::State(_) => {
382            // All the state events may get displayed by default.
383            true
384        }
385    }
386}
387
388impl<P: RoomDataProvider> TimelineController<P> {
389    pub(super) fn new(
390        room_data_provider: P,
391        focus: TimelineFocus,
392        internal_id_prefix: Option<String>,
393        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
394        is_room_encrypted: bool,
395        settings: TimelineSettings,
396    ) -> Self {
397        let focus = match focus {
398            TimelineFocus::Live { hide_threaded_events } => {
399                TimelineFocusKind::Live { hide_threaded_events }
400            }
401
402            TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
403
404            TimelineFocus::Thread { root_event_id, .. } => {
405                TimelineFocusKind::Thread { root_event_id }
406            }
407
408            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
409                TimelineFocusKind::PinnedEvents {
410                    loader: PinnedEventsLoader::new(
411                        Arc::new(room_data_provider.clone()),
412                        max_events_to_load as usize,
413                        max_concurrent_requests as usize,
414                    ),
415                }
416            }
417        };
418
419        let focus = Arc::new(focus);
420        let state = Arc::new(RwLock::new(TimelineState::new(
421            focus.clone(),
422            room_data_provider.own_user_id().to_owned(),
423            room_data_provider.room_version_rules(),
424            internal_id_prefix,
425            unable_to_decrypt_hook,
426            is_room_encrypted,
427        )));
428
429        let decryption_retry_task =
430            DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
431
432        Self { state, focus, room_data_provider, settings, decryption_retry_task }
433    }
434
435    /// Initializes the configured focus with appropriate data.
436    ///
437    /// Should be called only once after creation of the [`TimelineInner`], with
438    /// all its fields set.
439    ///
440    /// Returns whether there were any events added to the timeline.
441    pub(super) async fn init_focus(
442        &self,
443        focus: &TimelineFocus,
444        room_event_cache: &RoomEventCache,
445    ) -> Result<bool, Error> {
446        match focus {
447            TimelineFocus::Live { .. } => {
448                // Retrieve the cached events, and add them to the timeline.
449                let events = room_event_cache.events().await;
450
451                let has_events = !events.is_empty();
452
453                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
454
455                match room_event_cache.pagination().status().get() {
456                    RoomPaginationStatus::Idle { hit_timeline_start } => {
457                        if hit_timeline_start {
458                            // Eagerly insert the timeline start item, since pagination claims
459                            // we've already hit the timeline start.
460                            self.insert_timeline_start_if_missing().await;
461                        }
462                    }
463                    RoomPaginationStatus::Paginating => {}
464                }
465
466                Ok(has_events)
467            }
468
469            TimelineFocus::Event { target: event_id, num_context_events, hide_threaded_events } => {
470                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
471                    // Note: this is sync'd with code in the ctor.
472                    unreachable!();
473                };
474
475                let event_paginator = Paginator::new(self.room_data_provider.clone());
476
477                // Start a /context request so we can know if the event is in a thread or not,
478                // and know which kind of pagination we'll be using then.
479                let start_from_result = event_paginator
480                    .start_from(event_id, (*num_context_events).into())
481                    .await
482                    .map_err(PaginationError::Paginator)?;
483
484                // Find the target event, and see if it's part of a thread.
485                let thread_root_event_id = start_from_result
486                    .events
487                    .iter()
488                    .find(
489                        |event| {
490                            if let Some(id) = event.event_id() { id == *event_id } else { false }
491                        },
492                    )
493                    .and_then(|event| extract_thread_root(event.raw()));
494
495                let _ = paginator.set(match thread_root_event_id {
496                    Some(root_id) => {
497                        let mut tokens = event_paginator.tokens();
498
499                        // Look if the thread root event is part of the /context response. This
500                        // allows us to spare some backwards pagination with
501                        // /relations.
502                        let includes_root_event = start_from_result.events.iter().any(|event| {
503                            if let Some(id) = event.event_id() { id == root_id } else { false }
504                        });
505
506                        if includes_root_event {
507                            // If we have the root event, there's no need to do back-paginations
508                            // with /relations, since we are at the start of the thread.
509                            tokens.previous = PaginationToken::HitEnd;
510                        }
511
512                        AnyPaginator::Threaded(ThreadedEventsLoader::new(
513                            self.room_data_provider.clone(),
514                            root_id,
515                            tokens,
516                        ))
517                    }
518
519                    None => AnyPaginator::Unthreaded {
520                        paginator: event_paginator,
521                        hide_threaded_events: *hide_threaded_events,
522                    },
523                });
524
525                let has_events = !start_from_result.events.is_empty();
526                let events = start_from_result.events;
527
528                match paginator.get().expect("Paginator was not instantiated") {
529                    AnyPaginator::Unthreaded { .. } => {
530                        self.replace_with_initial_remote_events(
531                            events,
532                            RemoteEventOrigin::Pagination,
533                        )
534                        .await;
535                    }
536
537                    AnyPaginator::Threaded(threaded_events_loader) => {
538                        // We filter only events that are part of the thread (including the root),
539                        // since /context will return adjacent events without filters.
540                        let thread_root = threaded_events_loader.thread_root_event_id();
541                        let events_in_thread = events.into_iter().filter(|event| {
542                            extract_thread_root(event.raw())
543                                .is_some_and(|event_thread_root| event_thread_root == thread_root)
544                                || event.event_id().as_deref() == Some(thread_root)
545                        });
546
547                        self.replace_with_initial_remote_events(
548                            events_in_thread,
549                            RemoteEventOrigin::Pagination,
550                        )
551                        .await;
552                    }
553                }
554
555                Ok(has_events)
556            }
557
558            TimelineFocus::Thread { root_event_id, .. } => {
559                let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
560                let has_events = !events.is_empty();
561
562                // For each event, we also need to find the related events, as they don't
563                // include the thread relationship, they won't be included in
564                // the initial list of events.
565                let mut related_events = Vector::new();
566                for event_id in events.iter().filter_map(|event| event.event_id()) {
567                    if let Some((_original, related)) =
568                        room_event_cache.find_event_with_relations(&event_id, None).await
569                    {
570                        related_events.extend(related);
571                    }
572                }
573
574                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
575
576                // Now that we've inserted the thread events, add the aggregations too.
577                if !related_events.is_empty() {
578                    self.handle_remote_aggregations(
579                        vec![VectorDiff::Append { values: related_events }],
580                        RemoteEventOrigin::Cache,
581                    )
582                    .await;
583                }
584
585                Ok(has_events)
586            }
587
588            TimelineFocus::PinnedEvents { .. } => {
589                let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
590                    // Note: this is sync'd with code in the ctor.
591                    unreachable!();
592                };
593
594                let Some(loaded_events) =
595                    loader.load_events().await.map_err(Error::PinnedEventsError)?
596                else {
597                    // There wasn't any events.
598                    return Ok(false);
599                };
600
601                let has_events = !loaded_events.is_empty();
602
603                self.replace_with_initial_remote_events(
604                    loaded_events,
605                    RemoteEventOrigin::Pagination,
606                )
607                .await;
608
609                Ok(has_events)
610            }
611        }
612    }
613
614    /// Listens to encryption state changes for the room in
615    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
616    /// existing timeline items. This will then cause a refresh of those
617    /// timeline items.
618    pub async fn handle_encryption_state_changes(&self) {
619        let mut room_info = self.room_data_provider.room_info();
620
621        // Small function helper to help mark as encrypted.
622        let mark_encrypted = || async {
623            let mut state = self.state.write().await;
624            state.meta.is_room_encrypted = true;
625            state.mark_all_events_as_encrypted();
626        };
627
628        if room_info.get().encryption_state().is_encrypted() {
629            // If the room was already encrypted, it won't toggle to unencrypted, so we can
630            // shut down this task early.
631            mark_encrypted().await;
632            return;
633        }
634
635        while let Some(info) = room_info.next().await {
636            if info.encryption_state().is_encrypted() {
637                mark_encrypted().await;
638                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
639                // here is done.
640                break;
641            }
642        }
643    }
644
645    pub(crate) async fn reload_pinned_events(
646        &self,
647    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
648        if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
649            loader.load_events().await
650        } else {
651            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
652        }
653    }
654
655    /// Run a lazy backwards pagination (in live mode).
656    ///
657    /// It adjusts the `count` value of the `Skip` higher-order stream so that
658    /// more items are pushed front in the timeline.
659    ///
660    /// If no more items are available (i.e. if the `count` is zero), this
661    /// method returns `Some(needs)` where `needs` is the number of events that
662    /// must be unlazily backwards paginated.
663    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
664        let state = self.state.read().await;
665
666        let (count, needs) = state
667            .meta
668            .subscriber_skip_count
669            .compute_next_when_paginating_backwards(num_events.into());
670
671        // This always happens on a live timeline.
672        let is_live_timeline = true;
673        state.meta.subscriber_skip_count.update(count, is_live_timeline);
674
675        needs
676    }
677
678    /// Run a backwards pagination (in focused mode) and append the results to
679    /// the timeline.
680    ///
681    /// Returns whether we hit the start of the timeline.
682    pub(super) async fn focused_paginate_backwards(
683        &self,
684        num_events: u16,
685    ) -> Result<bool, PaginationError> {
686        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
687            TimelineFocusKind::Live { .. }
688            | TimelineFocusKind::PinnedEvents { .. }
689            | TimelineFocusKind::Thread { .. } => {
690                return Err(PaginationError::NotSupported);
691            }
692            TimelineFocusKind::Event { paginator, .. } => paginator
693                .get()
694                .expect("Paginator was not instantiated")
695                .paginate_backwards(num_events)
696                .await
697                .map_err(PaginationError::Paginator)?,
698        };
699
700        // Events are in reverse topological order.
701        // We can push front each event individually.
702        self.handle_remote_events_with_diffs(
703            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
704            RemoteEventOrigin::Pagination,
705        )
706        .await;
707
708        Ok(hit_end_of_timeline)
709    }
710
711    /// Run a forwards pagination (in focused mode) and append the results to
712    /// the timeline.
713    ///
714    /// Returns whether we hit the end of the timeline.
715    pub(super) async fn focused_paginate_forwards(
716        &self,
717        num_events: u16,
718    ) -> Result<bool, PaginationError> {
719        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
720            TimelineFocusKind::Live { .. }
721            | TimelineFocusKind::PinnedEvents { .. }
722            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
723
724            TimelineFocusKind::Event { paginator, .. } => paginator
725                .get()
726                .expect("Paginator was not instantiated")
727                .paginate_forwards(num_events)
728                .await
729                .map_err(PaginationError::Paginator)?,
730        };
731
732        // Events are in topological order.
733        // We can append all events with no transformation.
734        self.handle_remote_events_with_diffs(
735            vec![VectorDiff::Append { values: events.into() }],
736            RemoteEventOrigin::Pagination,
737        )
738        .await;
739
740        Ok(hit_end_of_timeline)
741    }
742
743    /// Is this timeline receiving events from sync (aka has a live focus)?
744    pub(super) fn is_live(&self) -> bool {
745        matches!(&*self.focus, TimelineFocusKind::Live { .. })
746    }
747
748    /// Is this timeline focused on a thread?
749    pub(super) fn is_threaded(&self) -> bool {
750        self.focus.is_thread()
751    }
752
753    /// The root of the current thread, for a live thread timeline or a
754    /// permalink to a thread message.
755    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
756        self.focus.thread_root().map(ToOwned::to_owned)
757    }
758
759    /// Get a copy of the current items in the list.
760    ///
761    /// Cheap because `im::Vector` is cheap to clone.
762    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
763        self.state.read().await.items.clone_items()
764    }
765
766    #[cfg(test)]
767    pub(super) async fn subscribe_raw(
768        &self,
769    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
770        self.state.read().await.items.subscribe().into_values_and_stream()
771    }
772
773    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
774        let state = self.state.read().await;
775
776        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
777    }
778
779    pub(super) async fn subscribe_filter_map<U, F>(
780        &self,
781        f: F,
782    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
783    where
784        U: Clone,
785        F: Fn(Arc<TimelineItem>) -> Option<U>,
786    {
787        self.state.read().await.items.subscribe().filter_map(f)
788    }
789
790    /// Toggle a reaction locally.
791    ///
792    /// Returns true if the reaction was added, false if it was removed.
793    #[instrument(skip_all)]
794    pub(super) async fn toggle_reaction_local(
795        &self,
796        item_id: &TimelineEventItemId,
797        key: &str,
798    ) -> Result<bool, Error> {
799        let mut state = self.state.write().await;
800
801        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
802            warn!("Timeline item not found, can't add reaction");
803            return Err(Error::FailedToToggleReaction);
804        };
805
806        let user_id = self.room_data_provider.own_user_id();
807        let prev_status = item
808            .content()
809            .reactions()
810            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
811
812        let Some(prev_status) = prev_status else {
813            // Adding the new reaction.
814            match item.handle() {
815                TimelineItemHandle::Local(send_handle) => {
816                    if send_handle
817                        .react(key.to_owned())
818                        .await
819                        .map_err(|err| Error::SendQueueError(err.into()))?
820                        .is_some()
821                    {
822                        trace!("adding a reaction to a local echo");
823                        return Ok(true);
824                    }
825
826                    warn!("couldn't toggle reaction for local echo");
827                    return Ok(false);
828                }
829
830                TimelineItemHandle::Remote(event_id) => {
831                    // Add a reaction through the room data provider.
832                    // No need to reflect the effect locally, since the local echo handling will
833                    // take care of it.
834                    trace!("adding a reaction to a remote echo");
835                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
836                    self.room_data_provider
837                        .send(ReactionEventContent::from(annotation).into())
838                        .await?;
839                    return Ok(true);
840                }
841            }
842        };
843
844        trace!("removing a previous reaction");
845        match prev_status {
846            ReactionStatus::LocalToLocal(send_reaction_handle) => {
847                if let Some(handle) = send_reaction_handle {
848                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
849                        // Impossible state: the reaction has moved from local to echo under our
850                        // feet, but the timeline was supposed to be locked!
851                        warn!("unexpectedly unable to abort sending of local reaction");
852                    }
853                } else {
854                    warn!("no send reaction handle (this should only happen in testing contexts)");
855                }
856            }
857
858            ReactionStatus::LocalToRemote(send_handle) => {
859                // No need to reflect the change ourselves, since handling the discard of the
860                // local echo will take care of it.
861                trace!("aborting send of the previous reaction that was a local echo");
862                if let Some(handle) = send_handle {
863                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
864                        // Impossible state: the reaction has moved from local to echo under our
865                        // feet, but the timeline was supposed to be locked!
866                        warn!("unexpectedly unable to abort sending of local reaction");
867                    }
868                } else {
869                    warn!("no send handle (this should only happen in testing contexts)");
870                }
871            }
872
873            ReactionStatus::RemoteToRemote(event_id) => {
874                // Assume the redaction will work; we'll re-add the reaction if it didn't.
875                let Some(annotated_event_id) =
876                    item.as_remote().map(|event_item| event_item.event_id.clone())
877                else {
878                    warn!("remote reaction to remote event, but the associated item isn't remote");
879                    return Ok(false);
880                };
881
882                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
883                let reaction_info = reactions.remove_reaction(user_id, key);
884
885                if reaction_info.is_some() {
886                    let new_item = item.with_reactions(reactions);
887                    state.items.replace(item_pos, new_item);
888                } else {
889                    warn!(
890                        "reaction is missing on the item, not removing it locally, \
891                         but sending redaction."
892                    );
893                }
894
895                // Release the lock before running the request.
896                drop(state);
897
898                trace!("sending redact for a previous reaction");
899                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
900                    if let Some(reaction_info) = reaction_info {
901                        debug!("sending redact failed, adding the reaction back to the list");
902
903                        let mut state = self.state.write().await;
904                        if let Some((item_pos, item)) =
905                            rfind_event_by_id(&state.items, &annotated_event_id)
906                        {
907                            // Re-add the reaction to the mapping.
908                            let mut reactions =
909                                item.content().reactions().cloned().unwrap_or_default();
910                            reactions
911                                .entry(key.to_owned())
912                                .or_default()
913                                .insert(user_id.to_owned(), reaction_info);
914                            let new_item = item.with_reactions(reactions);
915                            state.items.replace(item_pos, new_item);
916                        } else {
917                            warn!(
918                                "couldn't find item to re-add reaction anymore; \
919                                 maybe it's been redacted?"
920                            );
921                        }
922                    }
923
924                    return Err(err);
925                }
926            }
927        }
928
929        Ok(false)
930    }
931
932    /// Handle updates on events as [`VectorDiff`]s.
933    pub(super) async fn handle_remote_events_with_diffs(
934        &self,
935        diffs: Vec<VectorDiff<TimelineEvent>>,
936        origin: RemoteEventOrigin,
937    ) {
938        if diffs.is_empty() {
939            return;
940        }
941
942        let mut state = self.state.write().await;
943        state
944            .handle_remote_events_with_diffs(
945                diffs,
946                origin,
947                &self.room_data_provider,
948                &self.settings,
949            )
950            .await
951    }
952
953    /// Only handle aggregations received as [`VectorDiff`]s.
954    pub(super) async fn handle_remote_aggregations(
955        &self,
956        diffs: Vec<VectorDiff<TimelineEvent>>,
957        origin: RemoteEventOrigin,
958    ) {
959        if diffs.is_empty() {
960            return;
961        }
962
963        let mut state = self.state.write().await;
964        state
965            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
966            .await
967    }
968
969    pub(super) async fn clear(&self) {
970        self.state.write().await.clear();
971    }
972
973    /// Replaces the content of the current timeline with initial events.
974    ///
975    /// Also sets up read receipts and the read marker for a live timeline of a
976    /// room.
977    ///
978    /// This is all done with a single lock guard, since we don't want the state
979    /// to be modified between the clear and re-insertion of new events.
980    pub(super) async fn replace_with_initial_remote_events<Events>(
981        &self,
982        events: Events,
983        origin: RemoteEventOrigin,
984    ) where
985        Events: IntoIterator,
986        <Events as IntoIterator>::Item: Into<TimelineEvent>,
987    {
988        let mut state = self.state.write().await;
989
990        let track_read_markers = self.settings.track_read_receipts;
991        if track_read_markers {
992            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
993            state
994                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
995                .await;
996        }
997
998        // Replace the events if either the current event list or the new one aren't
999        // empty.
1000        // Previously we just had to check the new one wasn't empty because
1001        // we did a clear operation before so the current one would always be empty, but
1002        // now we may want to replace a populated timeline with an empty one.
1003        let mut events = events.into_iter().peekable();
1004        if !state.items.is_empty() || events.peek().is_some() {
1005            state
1006                .replace_with_remote_events(
1007                    events,
1008                    origin,
1009                    &self.room_data_provider,
1010                    &self.settings,
1011                )
1012                .await;
1013        }
1014
1015        if track_read_markers {
1016            if let Some(fully_read_event_id) =
1017                self.room_data_provider.load_fully_read_marker().await
1018            {
1019                state.handle_fully_read_marker(fully_read_event_id);
1020            } else if let Some(latest_receipt_event_id) = state
1021                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1022            {
1023                // Fall back to read receipt if no fully read marker exists.
1024                debug!("no `m.fully_read` marker found, falling back to read receipt");
1025                state.handle_fully_read_marker(latest_receipt_event_id);
1026            }
1027        }
1028    }
1029
1030    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1031        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1032    }
1033
1034    pub(super) async fn handle_ephemeral_events(
1035        &self,
1036        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1037    ) {
1038        let mut state = self.state.write().await;
1039        state.handle_ephemeral_events(events, &self.room_data_provider).await;
1040    }
1041
1042    /// Creates the local echo for an event we're sending.
1043    #[instrument(skip_all)]
1044    pub(super) async fn handle_local_event(
1045        &self,
1046        txn_id: OwnedTransactionId,
1047        content: AnyMessageLikeEventContent,
1048        send_handle: Option<SendHandle>,
1049    ) {
1050        let sender = self.room_data_provider.own_user_id().to_owned();
1051        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1052
1053        let date_divider_mode = self.settings.date_divider_mode.clone();
1054
1055        let mut state = self.state.write().await;
1056        state
1057            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1058            .await;
1059    }
1060
1061    /// Update the send state of a local event represented by a transaction ID.
1062    ///
1063    /// If the corresponding local timeline item is missing, a warning is
1064    /// raised.
1065    #[instrument(skip(self))]
1066    pub(super) async fn update_event_send_state(
1067        &self,
1068        txn_id: &TransactionId,
1069        send_state: EventSendState,
1070    ) {
1071        let mut state = self.state.write().await;
1072        let mut txn = state.transaction();
1073
1074        let new_event_id: Option<&EventId> =
1075            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1076
1077        // The local echoes are always at the end of the timeline, we must first make
1078        // sure the remote echo hasn't showed up yet.
1079        if rfind_event_item(&txn.items, |it| {
1080            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1081        })
1082        .is_some()
1083        {
1084            // Remote echo already received. This is very unlikely.
1085            trace!("Remote echo received before send-event response");
1086
1087            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1088
1089            // If there's both the remote echo and a local echo, that means the
1090            // remote echo was received before the response *and* contained no
1091            // transaction ID (and thus duplicated the local echo).
1092            if let Some((idx, _)) = local_echo {
1093                warn!("Message echo got duplicated, removing the local one");
1094                txn.items.remove(idx);
1095
1096                // Adjust the date dividers, if needs be.
1097                let mut adjuster =
1098                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1099                adjuster.run(&mut txn.items, &mut txn.meta);
1100            }
1101
1102            txn.commit();
1103            return;
1104        }
1105
1106        // Look for the local event by the transaction ID or event ID.
1107        let result = rfind_event_item(&txn.items, |it| {
1108            it.transaction_id() == Some(txn_id)
1109                || new_event_id.is_some()
1110                    && it.event_id() == new_event_id
1111                    && it.as_local().is_some()
1112        });
1113
1114        let Some((idx, item)) = result else {
1115            // Event wasn't found as a standalone item.
1116            //
1117            // If it was just sent, try to find if it matches a corresponding aggregation,
1118            // and mark it as sent in that case.
1119            if let Some(new_event_id) = new_event_id {
1120                if txn.meta.aggregations.mark_aggregation_as_sent(
1121                    txn_id.to_owned(),
1122                    new_event_id.to_owned(),
1123                    &mut txn.items,
1124                    &txn.meta.room_version_rules,
1125                ) {
1126                    trace!("Aggregation marked as sent");
1127                    txn.commit();
1128                    return;
1129                }
1130
1131                trace!("Sent aggregation was not found");
1132            }
1133
1134            warn!("Timeline item not found, can't update send state");
1135            return;
1136        };
1137
1138        let Some(local_item) = item.as_local() else {
1139            warn!("We looked for a local item, but it transitioned to remote.");
1140            return;
1141        };
1142
1143        // The event was already marked as sent, that's a broken state, let's
1144        // emit an error but also override to the given sent state.
1145        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1146            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1147        }
1148
1149        // If the event has just been marked as sent, update the aggregations mapping to
1150        // take that into account.
1151        if let Some(new_event_id) = new_event_id {
1152            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1153        }
1154
1155        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1156        txn.items.replace(idx, new_item);
1157
1158        txn.commit();
1159    }
1160
1161    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1162        let mut state = self.state.write().await;
1163
1164        if let Some((idx, _)) =
1165            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1166        {
1167            let mut txn = state.transaction();
1168
1169            txn.items.remove(idx);
1170
1171            // A read marker or a date divider may have been inserted before the local echo.
1172            // Ensure both are up to date.
1173            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1174            adjuster.run(&mut txn.items, &mut txn.meta);
1175
1176            txn.meta.update_read_marker(&mut txn.items);
1177
1178            txn.commit();
1179
1180            debug!("discarded local echo");
1181            return true;
1182        }
1183
1184        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1185        // dereferencing it once.
1186        let mut txn = state.transaction();
1187
1188        // Look if this was a local aggregation.
1189        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1190            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1191            &mut txn.items,
1192        ) {
1193            Ok(val) => val,
1194            Err(err) => {
1195                warn!("error when discarding local echo for an aggregation: {err}");
1196                // The aggregation has been found, it's just that we couldn't discard it.
1197                true
1198            }
1199        };
1200
1201        if found_aggregation {
1202            txn.commit();
1203        }
1204
1205        found_aggregation
1206    }
1207
1208    pub(super) async fn replace_local_echo(
1209        &self,
1210        txn_id: &TransactionId,
1211        content: AnyMessageLikeEventContent,
1212    ) -> bool {
1213        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1214            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1215            // handling RoomMessage should be sufficient in most cases. Worst
1216            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1217            // editing the event then.
1218            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1219            return false;
1220        };
1221
1222        let mut state = self.state.write().await;
1223        let mut txn = state.transaction();
1224
1225        let Some((idx, prev_item)) =
1226            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1227        else {
1228            debug!("Can't find local echo to replace");
1229            return false;
1230        };
1231
1232        // Reuse the previous local echo's state, but reset the send state to not sent
1233        // (per API contract).
1234        let ti_kind = {
1235            let Some(prev_local_item) = prev_item.as_local() else {
1236                warn!("We looked for a local item, but it transitioned as remote??");
1237                return false;
1238            };
1239            // If the local echo had an upload progress, retain it.
1240            let progress = as_variant!(&prev_local_item.send_state,
1241                EventSendState::NotSentYet { progress } => progress.clone())
1242            .flatten();
1243            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1244        };
1245
1246        // Replace the local-related state (kind) and the content state.
1247        let new_item = TimelineItem::new(
1248            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1249                content.msgtype,
1250                content.mentions,
1251                prev_item.content().reactions().cloned().unwrap_or_default(),
1252                prev_item.content().thread_root(),
1253                prev_item.content().in_reply_to(),
1254                prev_item.content().thread_summary(),
1255            )),
1256            prev_item.internal_id.to_owned(),
1257        );
1258
1259        txn.items.replace(idx, new_item);
1260
1261        // This doesn't change the original sending time, so there's no need to adjust
1262        // date dividers.
1263
1264        txn.commit();
1265
1266        debug!("Replaced local echo");
1267        true
1268    }
1269
1270    pub(crate) async fn retry_event_decryption_inner(&self, session_ids: Option<BTreeSet<String>>) {
1271        self.decryption_retry_task
1272            .decrypt(self.room_data_provider.clone(), session_ids, self.settings.clone())
1273            .await;
1274    }
1275
1276    pub(super) async fn set_sender_profiles_pending(&self) {
1277        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1278    }
1279
1280    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1281        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1282    }
1283
1284    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1285        self.state.write().await.items.for_each(|mut entry| {
1286            let Some(event_item) = entry.as_event() else { return };
1287            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1288                let new_item = entry.with_kind(TimelineItemKind::Event(
1289                    event_item.with_sender_profile(profile_state.clone()),
1290                ));
1291                ObservableItemsEntry::replace(&mut entry, new_item);
1292            }
1293        });
1294    }
1295
1296    pub(super) async fn update_missing_sender_profiles(&self) {
1297        trace!("Updating missing sender profiles");
1298
1299        let mut state = self.state.write().await;
1300        let mut entries = state.items.entries();
1301        while let Some(mut entry) = entries.next() {
1302            let Some(event_item) = entry.as_event() else { continue };
1303            let event_id = event_item.event_id().map(debug);
1304            let transaction_id = event_item.transaction_id().map(debug);
1305
1306            if event_item.sender_profile().is_ready() {
1307                trace!(event_id, transaction_id, "Profile already set");
1308                continue;
1309            }
1310
1311            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1312                Some(profile) => {
1313                    trace!(event_id, transaction_id, "Adding profile");
1314                    let updated_item =
1315                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1316                    let new_item = entry.with_kind(updated_item);
1317                    ObservableItemsEntry::replace(&mut entry, new_item);
1318                }
1319                None => {
1320                    if !event_item.sender_profile().is_unavailable() {
1321                        trace!(event_id, transaction_id, "Marking profile unavailable");
1322                        let updated_item =
1323                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1324                        let new_item = entry.with_kind(updated_item);
1325                        ObservableItemsEntry::replace(&mut entry, new_item);
1326                    } else {
1327                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1328                    }
1329                }
1330            }
1331        }
1332
1333        trace!("Done updating missing sender profiles");
1334    }
1335
1336    /// Update the profiles of the given senders, even if they are ready.
1337    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1338        trace!("Forcing update of sender profiles: {sender_ids:?}");
1339
1340        let mut state = self.state.write().await;
1341        let mut entries = state.items.entries();
1342        while let Some(mut entry) = entries.next() {
1343            let Some(event_item) = entry.as_event() else { continue };
1344            if !sender_ids.contains(event_item.sender()) {
1345                continue;
1346            }
1347
1348            let event_id = event_item.event_id().map(debug);
1349            let transaction_id = event_item.transaction_id().map(debug);
1350
1351            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1352                Some(profile) => {
1353                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1354                    {
1355                        debug!(event_id, transaction_id, "Profile already up-to-date");
1356                    } else {
1357                        trace!(event_id, transaction_id, "Updating profile");
1358                        let updated_item =
1359                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1360                        let new_item = entry.with_kind(updated_item);
1361                        ObservableItemsEntry::replace(&mut entry, new_item);
1362                    }
1363                }
1364                None => {
1365                    if !event_item.sender_profile().is_unavailable() {
1366                        trace!(event_id, transaction_id, "Marking profile unavailable");
1367                        let updated_item =
1368                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1369                        let new_item = entry.with_kind(updated_item);
1370                        ObservableItemsEntry::replace(&mut entry, new_item);
1371                    } else {
1372                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1373                    }
1374                }
1375            }
1376        }
1377
1378        trace!("Done forcing update of sender profiles");
1379    }
1380
1381    #[cfg(test)]
1382    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1383        let own_user_id = self.room_data_provider.own_user_id();
1384        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1385    }
1386
1387    /// Get the latest read receipt for the given user.
1388    ///
1389    /// Useful to get the latest read receipt, whether it's private or public.
1390    pub(super) async fn latest_user_read_receipt(
1391        &self,
1392        user_id: &UserId,
1393    ) -> Option<(OwnedEventId, Receipt)> {
1394        let receipt_thread = self.focus.receipt_thread();
1395
1396        self.state
1397            .read()
1398            .await
1399            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1400            .await
1401    }
1402
1403    /// Get the ID of the timeline event with the latest read receipt for the
1404    /// given user.
1405    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1406        &self,
1407        user_id: &UserId,
1408    ) -> Option<OwnedEventId> {
1409        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1410    }
1411
1412    /// Subscribe to changes in the read receipts of our own user.
1413    pub async fn subscribe_own_user_read_receipts_changed(
1414        &self,
1415    ) -> impl Stream<Item = ()> + use<P> {
1416        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1417    }
1418
1419    /// Handle a room send update that's a new local echo.
1420    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1421        match echo.content {
1422            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1423                let content = match serialized_event.deserialize() {
1424                    Ok(d) => d,
1425                    Err(err) => {
1426                        warn!("error deserializing local echo: {err}");
1427                        return;
1428                    }
1429                };
1430
1431                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1432                    .await;
1433
1434                if let Some(send_error) = send_error {
1435                    self.update_event_send_state(
1436                        &echo.transaction_id,
1437                        EventSendState::SendingFailed {
1438                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1439                                send_error,
1440                            ))),
1441                            is_recoverable: false,
1442                        },
1443                    )
1444                    .await;
1445                }
1446            }
1447
1448            LocalEchoContent::React { key, send_handle, applies_to } => {
1449                self.handle_local_reaction(key, send_handle, applies_to).await;
1450            }
1451        }
1452    }
1453
1454    /// Adds a reaction (local echo) to a local echo.
1455    #[instrument(skip(self, send_handle))]
1456    async fn handle_local_reaction(
1457        &self,
1458        reaction_key: String,
1459        send_handle: SendReactionHandle,
1460        applies_to: OwnedTransactionId,
1461    ) {
1462        let mut state = self.state.write().await;
1463        let mut tr = state.transaction();
1464
1465        let target = TimelineEventItemId::TransactionId(applies_to);
1466
1467        let reaction_txn_id = send_handle.transaction_id().to_owned();
1468        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1469        let aggregation = Aggregation::new(
1470            TimelineEventItemId::TransactionId(reaction_txn_id),
1471            AggregationKind::Reaction {
1472                key: reaction_key.clone(),
1473                sender: self.room_data_provider.own_user_id().to_owned(),
1474                timestamp: MilliSecondsSinceUnixEpoch::now(),
1475                reaction_status,
1476            },
1477        );
1478
1479        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1480        find_item_and_apply_aggregation(
1481            &tr.meta.aggregations,
1482            &mut tr.items,
1483            &target,
1484            aggregation,
1485            &tr.meta.room_version_rules,
1486        );
1487
1488        tr.commit();
1489    }
1490
1491    /// Handle a single room send queue update.
1492    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1493        match update {
1494            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1495                self.handle_local_echo(echo).await;
1496            }
1497
1498            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1499                if !self.discard_local_echo(&transaction_id).await {
1500                    warn!("couldn't find the local echo to discard");
1501                }
1502            }
1503
1504            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1505                let content = match new_content.deserialize() {
1506                    Ok(d) => d,
1507                    Err(err) => {
1508                        warn!("error deserializing local echo (upon edit): {err}");
1509                        return;
1510                    }
1511                };
1512
1513                if !self.replace_local_echo(&transaction_id, content).await {
1514                    warn!("couldn't find the local echo to replace");
1515                }
1516            }
1517
1518            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1519                self.update_event_send_state(
1520                    &transaction_id,
1521                    EventSendState::SendingFailed { error, is_recoverable },
1522                )
1523                .await;
1524            }
1525
1526            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1527                self.update_event_send_state(
1528                    &transaction_id,
1529                    EventSendState::NotSentYet { progress: None },
1530                )
1531                .await;
1532            }
1533
1534            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1535                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1536                    .await;
1537            }
1538
1539            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1540                self.update_event_send_state(
1541                    &related_to,
1542                    EventSendState::NotSentYet {
1543                        progress: Some(MediaUploadProgress { index, progress }),
1544                    },
1545                )
1546                .await;
1547            }
1548        }
1549    }
1550
1551    /// Insert a timeline start item at the beginning of the room, if it's
1552    /// missing.
1553    pub async fn insert_timeline_start_if_missing(&self) {
1554        let mut state = self.state.write().await;
1555        let mut txn = state.transaction();
1556        txn.items.push_timeline_start_if_missing(
1557            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1558        );
1559        txn.commit();
1560    }
1561
1562    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1563    /// timeline or not.
1564    ///
1565    /// Can be `None` if the event cannot be represented as a standalone item,
1566    /// because it's an aggregation.
1567    pub(super) async fn make_replied_to(
1568        &self,
1569        event: TimelineEvent,
1570    ) -> Result<Option<EmbeddedEvent>, Error> {
1571        let state = self.state.read().await;
1572        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1573    }
1574}
1575
1576impl TimelineController {
1577    pub(super) fn room(&self) -> &Room {
1578        &self.room_data_provider
1579    }
1580
1581    /// Given an event identifier, will fetch the details for the event it's
1582    /// replying to, if applicable.
1583    #[instrument(skip(self))]
1584    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1585        let state_guard = self.state.write().await;
1586        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1587            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1588        let remote_item = item
1589            .as_remote()
1590            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1591            .clone();
1592
1593        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1594            debug!("Event is not a message");
1595            return Ok(());
1596        };
1597        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1598            debug!("Event is not a reply");
1599            return Ok(());
1600        };
1601        if let TimelineDetails::Pending = &in_reply_to.event {
1602            debug!("Replied-to event is already being fetched");
1603            return Ok(());
1604        }
1605        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1606            debug!("Replied-to event has already been fetched");
1607            return Ok(());
1608        }
1609
1610        let internal_id = item.internal_id.to_owned();
1611        let item = item.clone();
1612        let event = fetch_replied_to_event(
1613            state_guard,
1614            &self.state,
1615            index,
1616            &item,
1617            internal_id,
1618            &msglike,
1619            &in_reply_to.event_id,
1620            self.room(),
1621        )
1622        .await?;
1623
1624        // We need to be sure to have the latest position of the event as it might have
1625        // changed while waiting for the request.
1626        let mut state = self.state.write().await;
1627        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1628            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1629
1630        // Check the state of the event again, it might have been redacted while
1631        // the request was in-flight.
1632        let TimelineItemContent::MsgLike(MsgLikeContent {
1633            kind: MsgLikeKind::Message(message),
1634            reactions,
1635            thread_root,
1636            in_reply_to,
1637            thread_summary,
1638        }) = item.content().clone()
1639        else {
1640            info!("Event is no longer a message (redacted?)");
1641            return Ok(());
1642        };
1643        let Some(in_reply_to) = in_reply_to else {
1644            warn!("Event no longer has a reply (bug?)");
1645            return Ok(());
1646        };
1647
1648        // Now that we've received the content of the replied-to event, replace the
1649        // replied-to content in the item with it.
1650        trace!("Updating in-reply-to details");
1651        let internal_id = item.internal_id.to_owned();
1652        let mut item = item.clone();
1653        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1654            kind: MsgLikeKind::Message(message),
1655            reactions,
1656            thread_root,
1657            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1658            thread_summary,
1659        }));
1660        state.items.replace(index, TimelineItem::new(item, internal_id));
1661
1662        Ok(())
1663    }
1664
1665    /// Returns the thread that should be used for a read receipt based on the
1666    /// current focus of the timeline and the receipt type.
1667    ///
1668    /// A `SendReceiptType::FullyRead` will always use
1669    /// `ReceiptThread::Unthreaded`
1670    pub(super) fn infer_thread_for_read_receipt(
1671        &self,
1672        receipt_type: &SendReceiptType,
1673    ) -> ReceiptThread {
1674        if matches!(receipt_type, SendReceiptType::FullyRead) {
1675            ReceiptThread::Unthreaded
1676        } else {
1677            self.focus.receipt_thread()
1678        }
1679    }
1680
1681    /// Check whether the given receipt should be sent.
1682    ///
1683    /// Returns `false` if the given receipt is older than the current one.
1684    pub(super) async fn should_send_receipt(
1685        &self,
1686        receipt_type: &SendReceiptType,
1687        receipt_thread: &ReceiptThread,
1688        event_id: &EventId,
1689    ) -> bool {
1690        let own_user_id = self.room().own_user_id();
1691        let state = self.state.read().await;
1692        let room = self.room();
1693
1694        match receipt_type {
1695            SendReceiptType::Read => {
1696                if let Some((old_pub_read, _)) = state
1697                    .meta
1698                    .user_receipt(
1699                        own_user_id,
1700                        ReceiptType::Read,
1701                        receipt_thread.clone(),
1702                        room,
1703                        state.items.all_remote_events(),
1704                    )
1705                    .await
1706                {
1707                    trace!(%old_pub_read, "found a previous public receipt");
1708                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1709                        &old_pub_read,
1710                        event_id,
1711                        state.items.all_remote_events(),
1712                    ) {
1713                        trace!(
1714                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1715                        );
1716                        return relative_pos == RelativePosition::After;
1717                    }
1718                }
1719            }
1720
1721            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1722            // doesn't make sense to have a private read receipt behind a public one.
1723            SendReceiptType::ReadPrivate => {
1724                if let Some((old_priv_read, _)) =
1725                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1726                {
1727                    trace!(%old_priv_read, "found a previous private receipt");
1728                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1729                        &old_priv_read,
1730                        event_id,
1731                        state.items.all_remote_events(),
1732                    ) {
1733                        trace!(
1734                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1735                        );
1736                        return relative_pos == RelativePosition::After;
1737                    }
1738                }
1739            }
1740
1741            SendReceiptType::FullyRead => {
1742                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1743                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1744                        &prev_event_id,
1745                        event_id,
1746                        state.items.all_remote_events(),
1747                    )
1748                {
1749                    return relative_pos == RelativePosition::After;
1750                }
1751            }
1752
1753            _ => {}
1754        }
1755
1756        // Let the server handle unknown receipts.
1757        true
1758    }
1759
1760    /// Returns the latest event identifier, even if it's not visible, or if
1761    /// it's folded into another timeline item.
1762    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1763        let state = self.state.read().await;
1764        state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1765    }
1766
1767    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1768    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1769        self.retry_event_decryption_inner(session_ids).await
1770    }
1771
1772    /// Combine the global (event cache) pagination status with the local state
1773    /// of the timeline.
1774    ///
1775    /// This only changes the global pagination status of this room, in one
1776    /// case: if the timeline has a skip count greater than 0, it will
1777    /// ensure that the pagination status says that we haven't reached the
1778    /// timeline start yet.
1779    pub(super) async fn map_pagination_status(
1780        &self,
1781        status: RoomPaginationStatus,
1782    ) -> RoomPaginationStatus {
1783        match status {
1784            RoomPaginationStatus::Idle { hit_timeline_start } => {
1785                if hit_timeline_start {
1786                    let state = self.state.read().await;
1787                    // If the skip count is greater than 0, it means that a subsequent pagination
1788                    // could return more items, so pretend we didn't get the information that the
1789                    // timeline start was hit.
1790                    if state.meta.subscriber_skip_count.get() > 0 {
1791                        return RoomPaginationStatus::Idle { hit_timeline_start: false };
1792                    }
1793                }
1794            }
1795            RoomPaginationStatus::Paginating => {}
1796        }
1797
1798        // You're perfect, just the way you are.
1799        status
1800    }
1801}
1802
1803impl<P: RoomDataProvider> TimelineController<P> {
1804    /// Returns the timeline focus of the [`TimelineController`].
1805    pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1806        &self.focus
1807    }
1808}
1809
1810#[allow(clippy::too_many_arguments)]
1811async fn fetch_replied_to_event<P: RoomDataProvider>(
1812    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1813    state_lock: &RwLock<TimelineState<P>>,
1814    index: usize,
1815    item: &EventTimelineItem,
1816    internal_id: TimelineUniqueId,
1817    msglike: &MsgLikeContent,
1818    in_reply_to: &EventId,
1819    room: &Room,
1820) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1821    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1822        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1823        trace!("Found replied-to event locally");
1824        return Ok(details);
1825    }
1826
1827    // Replace the item with a new timeline item that has the fetching status of the
1828    // replied-to event to pending.
1829    trace!("Setting in-reply-to details to pending");
1830    let in_reply_to_details =
1831        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1832
1833    let event_item = item
1834        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1835
1836    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1837    state_guard.items.replace(index, new_timeline_item);
1838
1839    // Don't hold the state lock while the network request is made.
1840    drop(state_guard);
1841
1842    trace!("Fetching replied-to event");
1843    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1844        Ok(timeline_event) => {
1845            let state = state_lock.read().await;
1846
1847            let replied_to_item =
1848                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1849
1850            if let Some(item) = replied_to_item {
1851                TimelineDetails::Ready(Box::new(item))
1852            } else {
1853                // The replied-to item is an aggregation, not a standalone item.
1854                return Err(Error::UnsupportedEvent);
1855            }
1856        }
1857
1858        Err(e) => TimelineDetails::Error(Arc::new(e)),
1859    };
1860
1861    Ok(res)
1862}