matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25    deserialized_responses::TimelineEvent,
26    event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
27    paginators::{PaginationResult, PaginationToken, Paginator},
28    send_queue::{
29        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
30    },
31};
32#[cfg(test)]
33use ruma::events::receipt::ReceiptEventContent;
34use ruma::{
35    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
36    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
37    events::{
38        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
39        AnySyncTimelineEvent, MessageLikeEventType,
40        poll::unstable_start::UnstablePollStartEventContent,
41        reaction::ReactionEventContent,
42        receipt::{Receipt, ReceiptThread, ReceiptType},
43        relation::Annotation,
44        room::message::{MessageType, Relation},
45    },
46    room_version_rules::RoomVersionRules,
47    serde::Raw,
48};
49use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
50use tracing::{debug, error, field::debug, info, instrument, trace, warn};
51
52pub(super) use self::{
53    metadata::{RelativePosition, TimelineMetadata},
54    observable_items::{
55        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
56        ObservableItemsTransactionEntry,
57    },
58    state::TimelineState,
59    state_transaction::TimelineStateTransaction,
60};
61use super::{
62    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
63    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
64    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
65    algorithms::{rfind_event_by_id, rfind_event_item},
66    event_item::{ReactionStatus, RemoteEventOrigin},
67    item::TimelineUniqueId,
68    subscriber::TimelineSubscriber,
69    traits::RoomDataProvider,
70};
71use crate::{
72    timeline::{
73        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
74        algorithms::rfind_event_by_item_id,
75        controller::decryption_retry_task::compute_redecryption_candidates,
76        date_dividers::DateDividerAdjuster,
77        event_item::TimelineItemHandle,
78        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79    },
80    unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
94use matrix_sdk_common::serde_helpers::extract_thread_root;
95
96/// Data associated to the current timeline focus.
97///
98/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
99/// version of it, including extra state that makes it useful over the lifetime
100/// of a timeline.
101#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
103    /// The timeline receives live events from the sync.
104    Live {
105        /// Whether to hide in-thread events from the timeline.
106        hide_threaded_events: bool,
107    },
108
109    /// The timeline is focused on a single event, and it can expand in one
110    /// direction or another.
111    Event {
112        /// The paginator instance.
113        paginator: OnceCell<AnyPaginator<P>>,
114    },
115
116    /// A live timeline for a thread.
117    Thread {
118        /// The root event for the current thread.
119        root_event_id: OwnedEventId,
120    },
121
122    PinnedEvents {
123        loader: PinnedEventsLoader,
124    },
125}
126
127#[derive(Debug)]
128pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
129    Unthreaded {
130        /// The actual event paginator.
131        paginator: Paginator<P>,
132        /// Whether to hide in-thread events from the timeline.
133        hide_threaded_events: bool,
134    },
135    Threaded(ThreadedEventsLoader<P>),
136}
137
138impl<P: RoomDataProvider> AnyPaginator<P> {
139    /// Runs a backward pagination (requesting `num_events` to the server), from
140    /// the current state of the object.
141    ///
142    /// Will return immediately if we have already hit the start of the
143    /// timeline.
144    ///
145    /// May return an error if it's already paginating, or if the call to
146    /// the homeserver endpoints failed.
147    pub async fn paginate_backwards(
148        &self,
149        num_events: u16,
150    ) -> Result<PaginationResult, PaginatorError> {
151        match self {
152            Self::Unthreaded { paginator, .. } => {
153                paginator.paginate_backward(num_events.into()).await
154            }
155            Self::Threaded(threaded_paginator) => {
156                threaded_paginator.paginate_backwards(num_events.into()).await
157            }
158        }
159    }
160
161    /// Runs a forward pagination (requesting `num_events` to the server), from
162    /// the current state of the object.
163    ///
164    /// Will return immediately if we have already hit the end of the timeline.
165    ///
166    /// May return an error if it's already paginating, or if the call to
167    /// the homeserver endpoints failed.
168    pub async fn paginate_forwards(
169        &self,
170        num_events: u16,
171    ) -> Result<PaginationResult, PaginatorError> {
172        match self {
173            Self::Unthreaded { paginator, .. } => {
174                paginator.paginate_forward(num_events.into()).await
175            }
176            Self::Threaded(threaded_paginator) => {
177                threaded_paginator.paginate_forwards(num_events.into()).await
178            }
179        }
180    }
181
182    /// Whether to hide in-thread events from the timeline.
183    pub fn hide_threaded_events(&self) -> bool {
184        match self {
185            Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
186            Self::Threaded(_) => false,
187        }
188    }
189
190    /// Returns the root event id of the thread, if the paginator is
191    /// [`AnyPaginator::Threaded`].
192    pub fn thread_root(&self) -> Option<&EventId> {
193        match self {
194            Self::Unthreaded { .. } => None,
195            Self::Threaded(thread_events_loader) => {
196                Some(thread_events_loader.thread_root_event_id())
197            }
198        }
199    }
200}
201
202impl<P: RoomDataProvider> TimelineFocusKind<P> {
203    /// Returns the [`ReceiptThread`] that should be used for the current
204    /// timeline focus.
205    ///
206    /// Live and event timelines will use the unthreaded read receipt type in
207    /// general, unless they hide in-thread events, in which case they will
208    /// use the main thread.
209    pub(super) fn receipt_thread(&self) -> ReceiptThread {
210        if let Some(thread_root) = self.thread_root() {
211            ReceiptThread::Thread(thread_root.to_owned())
212        } else if self.hide_threaded_events() {
213            ReceiptThread::Main
214        } else {
215            ReceiptThread::Unthreaded
216        }
217    }
218
219    /// Whether to hide in-thread events from the timeline.
220    fn hide_threaded_events(&self) -> bool {
221        match self {
222            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
223            TimelineFocusKind::Event { paginator } => {
224                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
225            }
226            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
227        }
228    }
229
230    /// Whether the focus is on a thread (from a live thread or a thread
231    /// permalink).
232    fn is_thread(&self) -> bool {
233        self.thread_root().is_some()
234    }
235
236    /// If the focus is a thread, returns its root event ID.
237    fn thread_root(&self) -> Option<&EventId> {
238        match self {
239            TimelineFocusKind::Event { paginator, .. } => {
240                paginator.get().and_then(|paginator| paginator.thread_root())
241            }
242            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
243            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
244        }
245    }
246}
247
248#[derive(Clone, Debug)]
249pub(super) struct TimelineController<P: RoomDataProvider = Room> {
250    /// Inner mutable state.
251    state: Arc<RwLock<TimelineState<P>>>,
252
253    /// Focus data.
254    focus: Arc<TimelineFocusKind<P>>,
255
256    /// A [`RoomDataProvider`] implementation, providing data.
257    ///
258    /// The type is a `RoomDataProvider` to allow testing. In the real world,
259    /// this would normally be a [`Room`].
260    pub(crate) room_data_provider: P,
261
262    /// Settings applied to this timeline.
263    pub(super) settings: TimelineSettings,
264}
265
266#[derive(Clone)]
267pub(super) struct TimelineSettings {
268    /// Should the read receipts and read markers be handled?
269    pub(super) track_read_receipts: bool,
270
271    /// Event filter that controls what's rendered as a timeline item (and thus
272    /// what can carry read receipts).
273    pub(super) event_filter: Arc<TimelineEventFilterFn>,
274
275    /// Are unparsable events added as timeline items of their own kind?
276    pub(super) add_failed_to_parse: bool,
277
278    /// Should the timeline items be grouped by day or month?
279    pub(super) date_divider_mode: DateDividerMode,
280}
281
282#[cfg(not(tarpaulin_include))]
283impl fmt::Debug for TimelineSettings {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        f.debug_struct("TimelineSettings")
286            .field("track_read_receipts", &self.track_read_receipts)
287            .field("add_failed_to_parse", &self.add_failed_to_parse)
288            .finish_non_exhaustive()
289    }
290}
291
292impl Default for TimelineSettings {
293    fn default() -> Self {
294        Self {
295            track_read_receipts: false,
296            event_filter: Arc::new(default_event_filter),
297            add_failed_to_parse: true,
298            date_divider_mode: DateDividerMode::Daily,
299        }
300    }
301}
302
303/// The default event filter for
304/// [`crate::timeline::TimelineBuilder::event_filter`].
305///
306/// It filters out events that are not rendered by the timeline, including but
307/// not limited to: reactions, edits, redactions on existing messages.
308///
309/// If you have a custom filter, it may be best to chain yours with this one if
310/// you do not want to run into situations where a read receipt is not visible
311/// because it's living on an event that doesn't have a matching timeline item.
312pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
313    match event {
314        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
315            if ev.redacts(&rules.redaction).is_some() {
316                // This is a redaction of an existing message, we'll only update the previous
317                // message and not render a new entry.
318                false
319            } else {
320                // This is a redacted entry, that we'll show only if the redacted entity wasn't
321                // a reaction.
322                ev.event_type() != MessageLikeEventType::Reaction
323            }
324        }
325
326        AnySyncTimelineEvent::MessageLike(msg) => {
327            match msg.original_content() {
328                None => {
329                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
330                    // a reaction.
331                    msg.event_type() != MessageLikeEventType::Reaction
332                }
333
334                Some(original_content) => {
335                    match original_content {
336                        AnyMessageLikeEventContent::RoomMessage(content) => {
337                            if content
338                                .relates_to
339                                .as_ref()
340                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
341                            {
342                                // Edits aren't visible by default.
343                                return false;
344                            }
345
346                            match content.msgtype {
347                                MessageType::Audio(_)
348                                | MessageType::Emote(_)
349                                | MessageType::File(_)
350                                | MessageType::Image(_)
351                                | MessageType::Location(_)
352                                | MessageType::Notice(_)
353                                | MessageType::ServerNotice(_)
354                                | MessageType::Text(_)
355                                | MessageType::Video(_)
356                                | MessageType::VerificationRequest(_) => true,
357                                #[cfg(feature = "unstable-msc4274")]
358                                MessageType::Gallery(_) => true,
359                                _ => false,
360                            }
361                        }
362
363                        AnyMessageLikeEventContent::Sticker(_)
364                        | AnyMessageLikeEventContent::UnstablePollStart(
365                            UnstablePollStartEventContent::New(_),
366                        )
367                        | AnyMessageLikeEventContent::CallInvite(_)
368                        | AnyMessageLikeEventContent::RtcNotification(_)
369                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
370
371                        _ => false,
372                    }
373                }
374            }
375        }
376
377        AnySyncTimelineEvent::State(_) => {
378            // All the state events may get displayed by default.
379            true
380        }
381    }
382}
383
384impl<P: RoomDataProvider> TimelineController<P> {
385    pub(super) fn new(
386        room_data_provider: P,
387        focus: TimelineFocus,
388        internal_id_prefix: Option<String>,
389        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
390        is_room_encrypted: bool,
391        settings: TimelineSettings,
392    ) -> Self {
393        let focus = match focus {
394            TimelineFocus::Live { hide_threaded_events } => {
395                TimelineFocusKind::Live { hide_threaded_events }
396            }
397
398            TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
399
400            TimelineFocus::Thread { root_event_id, .. } => {
401                TimelineFocusKind::Thread { root_event_id }
402            }
403
404            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
405                TimelineFocusKind::PinnedEvents {
406                    loader: PinnedEventsLoader::new(
407                        Arc::new(room_data_provider.clone()),
408                        max_events_to_load as usize,
409                        max_concurrent_requests as usize,
410                    ),
411                }
412            }
413        };
414
415        let focus = Arc::new(focus);
416        let state = Arc::new(RwLock::new(TimelineState::new(
417            focus.clone(),
418            room_data_provider.own_user_id().to_owned(),
419            room_data_provider.room_version_rules(),
420            internal_id_prefix,
421            unable_to_decrypt_hook,
422            is_room_encrypted,
423        )));
424
425        Self { state, focus, room_data_provider, settings }
426    }
427
428    /// Initializes the configured focus with appropriate data.
429    ///
430    /// Should be called only once after creation of the [`TimelineInner`], with
431    /// all its fields set.
432    ///
433    /// Returns whether there were any events added to the timeline.
434    pub(super) async fn init_focus(
435        &self,
436        focus: &TimelineFocus,
437        room_event_cache: &RoomEventCache,
438    ) -> Result<bool, Error> {
439        match focus {
440            TimelineFocus::Live { .. } => {
441                // Retrieve the cached events, and add them to the timeline.
442                let events = room_event_cache.events().await?;
443
444                let has_events = !events.is_empty();
445
446                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
447
448                match room_event_cache.pagination().status().get() {
449                    RoomPaginationStatus::Idle { hit_timeline_start } => {
450                        if hit_timeline_start {
451                            // Eagerly insert the timeline start item, since pagination claims
452                            // we've already hit the timeline start.
453                            self.insert_timeline_start_if_missing().await;
454                        }
455                    }
456                    RoomPaginationStatus::Paginating => {}
457                }
458
459                Ok(has_events)
460            }
461
462            TimelineFocus::Event { target: event_id, num_context_events, hide_threaded_events } => {
463                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
464                    // Note: this is sync'd with code in the ctor.
465                    unreachable!();
466                };
467
468                let event_paginator = Paginator::new(self.room_data_provider.clone());
469
470                // Start a /context request so we can know if the event is in a thread or not,
471                // and know which kind of pagination we'll be using then.
472                let start_from_result = event_paginator
473                    .start_from(event_id, (*num_context_events).into())
474                    .await
475                    .map_err(PaginationError::Paginator)?;
476
477                // Find the target event, and see if it's part of a thread.
478                let thread_root_event_id = start_from_result
479                    .events
480                    .iter()
481                    .find(
482                        |event| {
483                            if let Some(id) = event.event_id() { id == *event_id } else { false }
484                        },
485                    )
486                    .and_then(|event| extract_thread_root(event.raw()));
487
488                let _ = paginator.set(match thread_root_event_id {
489                    Some(root_id) => {
490                        let mut tokens = event_paginator.tokens();
491
492                        // Look if the thread root event is part of the /context response. This
493                        // allows us to spare some backwards pagination with
494                        // /relations.
495                        let includes_root_event = start_from_result.events.iter().any(|event| {
496                            if let Some(id) = event.event_id() { id == root_id } else { false }
497                        });
498
499                        if includes_root_event {
500                            // If we have the root event, there's no need to do back-paginations
501                            // with /relations, since we are at the start of the thread.
502                            tokens.previous = PaginationToken::HitEnd;
503                        }
504
505                        AnyPaginator::Threaded(ThreadedEventsLoader::new(
506                            self.room_data_provider.clone(),
507                            root_id,
508                            tokens,
509                        ))
510                    }
511
512                    None => AnyPaginator::Unthreaded {
513                        paginator: event_paginator,
514                        hide_threaded_events: *hide_threaded_events,
515                    },
516                });
517
518                let has_events = !start_from_result.events.is_empty();
519                let events = start_from_result.events;
520
521                match paginator.get().expect("Paginator was not instantiated") {
522                    AnyPaginator::Unthreaded { .. } => {
523                        self.replace_with_initial_remote_events(
524                            events,
525                            RemoteEventOrigin::Pagination,
526                        )
527                        .await;
528                    }
529
530                    AnyPaginator::Threaded(threaded_events_loader) => {
531                        // We filter only events that are part of the thread (including the root),
532                        // since /context will return adjacent events without filters.
533                        let thread_root = threaded_events_loader.thread_root_event_id();
534                        let events_in_thread = events.into_iter().filter(|event| {
535                            extract_thread_root(event.raw())
536                                .is_some_and(|event_thread_root| event_thread_root == thread_root)
537                                || event.event_id().as_deref() == Some(thread_root)
538                        });
539
540                        self.replace_with_initial_remote_events(
541                            events_in_thread,
542                            RemoteEventOrigin::Pagination,
543                        )
544                        .await;
545                    }
546                }
547
548                Ok(has_events)
549            }
550
551            TimelineFocus::Thread { root_event_id, .. } => {
552                let (events, _) =
553                    room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
554                let has_events = !events.is_empty();
555
556                // For each event, we also need to find the related events, as they don't
557                // include the thread relationship, they won't be included in
558                // the initial list of events.
559                let mut related_events = Vector::new();
560                for event_id in events.iter().filter_map(|event| event.event_id()) {
561                    if let Some((_original, related)) =
562                        room_event_cache.find_event_with_relations(&event_id, None).await?
563                    {
564                        related_events.extend(related);
565                    }
566                }
567
568                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
569
570                // Now that we've inserted the thread events, add the aggregations too.
571                if !related_events.is_empty() {
572                    self.handle_remote_aggregations(
573                        vec![VectorDiff::Append { values: related_events }],
574                        RemoteEventOrigin::Cache,
575                    )
576                    .await;
577                }
578
579                Ok(has_events)
580            }
581
582            TimelineFocus::PinnedEvents { .. } => {
583                let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
584                    // Note: this is sync'd with code in the ctor.
585                    unreachable!();
586                };
587
588                let Some(loaded_events) =
589                    loader.load_events().await.map_err(Error::PinnedEventsError)?
590                else {
591                    // There wasn't any events.
592                    return Ok(false);
593                };
594
595                let has_events = !loaded_events.is_empty();
596
597                self.replace_with_initial_remote_events(
598                    loaded_events,
599                    RemoteEventOrigin::Pagination,
600                )
601                .await;
602
603                Ok(has_events)
604            }
605        }
606    }
607
608    /// Listens to encryption state changes for the room in
609    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
610    /// existing timeline items. This will then cause a refresh of those
611    /// timeline items.
612    pub async fn handle_encryption_state_changes(&self) {
613        let mut room_info = self.room_data_provider.room_info();
614
615        // Small function helper to help mark as encrypted.
616        let mark_encrypted = || async {
617            let mut state = self.state.write().await;
618            state.meta.is_room_encrypted = true;
619            state.mark_all_events_as_encrypted();
620        };
621
622        if room_info.get().encryption_state().is_encrypted() {
623            // If the room was already encrypted, it won't toggle to unencrypted, so we can
624            // shut down this task early.
625            mark_encrypted().await;
626            return;
627        }
628
629        while let Some(info) = room_info.next().await {
630            if info.encryption_state().is_encrypted() {
631                mark_encrypted().await;
632                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
633                // here is done.
634                break;
635            }
636        }
637    }
638
639    pub(crate) async fn reload_pinned_events(
640        &self,
641    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
642        if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
643            loader.load_events().await
644        } else {
645            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
646        }
647    }
648
649    /// Run a lazy backwards pagination (in live mode).
650    ///
651    /// It adjusts the `count` value of the `Skip` higher-order stream so that
652    /// more items are pushed front in the timeline.
653    ///
654    /// If no more items are available (i.e. if the `count` is zero), this
655    /// method returns `Some(needs)` where `needs` is the number of events that
656    /// must be unlazily backwards paginated.
657    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
658        let state = self.state.read().await;
659
660        let (count, needs) = state
661            .meta
662            .subscriber_skip_count
663            .compute_next_when_paginating_backwards(num_events.into());
664
665        // This always happens on a live timeline.
666        let is_live_timeline = true;
667        state.meta.subscriber_skip_count.update(count, is_live_timeline);
668
669        needs
670    }
671
672    /// Run a backwards pagination (in focused mode) and append the results to
673    /// the timeline.
674    ///
675    /// Returns whether we hit the start of the timeline.
676    pub(super) async fn focused_paginate_backwards(
677        &self,
678        num_events: u16,
679    ) -> Result<bool, PaginationError> {
680        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
681            TimelineFocusKind::Live { .. }
682            | TimelineFocusKind::PinnedEvents { .. }
683            | TimelineFocusKind::Thread { .. } => {
684                return Err(PaginationError::NotSupported);
685            }
686            TimelineFocusKind::Event { paginator, .. } => paginator
687                .get()
688                .expect("Paginator was not instantiated")
689                .paginate_backwards(num_events)
690                .await
691                .map_err(PaginationError::Paginator)?,
692        };
693
694        // Events are in reverse topological order.
695        // We can push front each event individually.
696        self.handle_remote_events_with_diffs(
697            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
698            RemoteEventOrigin::Pagination,
699        )
700        .await;
701
702        Ok(hit_end_of_timeline)
703    }
704
705    /// Run a forwards pagination (in focused mode) and append the results to
706    /// the timeline.
707    ///
708    /// Returns whether we hit the end of the timeline.
709    pub(super) async fn focused_paginate_forwards(
710        &self,
711        num_events: u16,
712    ) -> Result<bool, PaginationError> {
713        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
714            TimelineFocusKind::Live { .. }
715            | TimelineFocusKind::PinnedEvents { .. }
716            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
717
718            TimelineFocusKind::Event { paginator, .. } => paginator
719                .get()
720                .expect("Paginator was not instantiated")
721                .paginate_forwards(num_events)
722                .await
723                .map_err(PaginationError::Paginator)?,
724        };
725
726        // Events are in topological order.
727        // We can append all events with no transformation.
728        self.handle_remote_events_with_diffs(
729            vec![VectorDiff::Append { values: events.into() }],
730            RemoteEventOrigin::Pagination,
731        )
732        .await;
733
734        Ok(hit_end_of_timeline)
735    }
736
737    /// Is this timeline receiving events from sync (aka has a live focus)?
738    pub(super) fn is_live(&self) -> bool {
739        matches!(&*self.focus, TimelineFocusKind::Live { .. })
740    }
741
742    /// Is this timeline focused on a thread?
743    pub(super) fn is_threaded(&self) -> bool {
744        self.focus.is_thread()
745    }
746
747    /// The root of the current thread, for a live thread timeline or a
748    /// permalink to a thread message.
749    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
750        self.focus.thread_root().map(ToOwned::to_owned)
751    }
752
753    /// Get a copy of the current items in the list.
754    ///
755    /// Cheap because `im::Vector` is cheap to clone.
756    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
757        self.state.read().await.items.clone_items()
758    }
759
760    #[cfg(test)]
761    pub(super) async fn subscribe_raw(
762        &self,
763    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
764        self.state.read().await.items.subscribe().into_values_and_stream()
765    }
766
767    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
768        let state = self.state.read().await;
769
770        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
771    }
772
773    pub(super) async fn subscribe_filter_map<U, F>(
774        &self,
775        f: F,
776    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
777    where
778        U: Clone,
779        F: Fn(Arc<TimelineItem>) -> Option<U>,
780    {
781        self.state.read().await.items.subscribe().filter_map(f)
782    }
783
784    /// Toggle a reaction locally.
785    ///
786    /// Returns true if the reaction was added, false if it was removed.
787    #[instrument(skip_all)]
788    pub(super) async fn toggle_reaction_local(
789        &self,
790        item_id: &TimelineEventItemId,
791        key: &str,
792    ) -> Result<bool, Error> {
793        let mut state = self.state.write().await;
794
795        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
796            warn!("Timeline item not found, can't add reaction");
797            return Err(Error::FailedToToggleReaction);
798        };
799
800        let user_id = self.room_data_provider.own_user_id();
801        let prev_status = item
802            .content()
803            .reactions()
804            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
805
806        let Some(prev_status) = prev_status else {
807            // Adding the new reaction.
808            match item.handle() {
809                TimelineItemHandle::Local(send_handle) => {
810                    if send_handle
811                        .react(key.to_owned())
812                        .await
813                        .map_err(|err| Error::SendQueueError(err.into()))?
814                        .is_some()
815                    {
816                        trace!("adding a reaction to a local echo");
817                        return Ok(true);
818                    }
819
820                    warn!("couldn't toggle reaction for local echo");
821                    return Ok(false);
822                }
823
824                TimelineItemHandle::Remote(event_id) => {
825                    // Add a reaction through the room data provider.
826                    // No need to reflect the effect locally, since the local echo handling will
827                    // take care of it.
828                    trace!("adding a reaction to a remote echo");
829                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
830                    self.room_data_provider
831                        .send(ReactionEventContent::from(annotation).into())
832                        .await?;
833                    return Ok(true);
834                }
835            }
836        };
837
838        trace!("removing a previous reaction");
839        match prev_status {
840            ReactionStatus::LocalToLocal(send_reaction_handle) => {
841                if let Some(handle) = send_reaction_handle {
842                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
843                        // Impossible state: the reaction has moved from local to echo under our
844                        // feet, but the timeline was supposed to be locked!
845                        warn!("unexpectedly unable to abort sending of local reaction");
846                    }
847                } else {
848                    warn!("no send reaction handle (this should only happen in testing contexts)");
849                }
850            }
851
852            ReactionStatus::LocalToRemote(send_handle) => {
853                // No need to reflect the change ourselves, since handling the discard of the
854                // local echo will take care of it.
855                trace!("aborting send of the previous reaction that was a local echo");
856                if let Some(handle) = send_handle {
857                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
858                        // Impossible state: the reaction has moved from local to echo under our
859                        // feet, but the timeline was supposed to be locked!
860                        warn!("unexpectedly unable to abort sending of local reaction");
861                    }
862                } else {
863                    warn!("no send handle (this should only happen in testing contexts)");
864                }
865            }
866
867            ReactionStatus::RemoteToRemote(event_id) => {
868                // Assume the redaction will work; we'll re-add the reaction if it didn't.
869                let Some(annotated_event_id) =
870                    item.as_remote().map(|event_item| event_item.event_id.clone())
871                else {
872                    warn!("remote reaction to remote event, but the associated item isn't remote");
873                    return Ok(false);
874                };
875
876                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
877                let reaction_info = reactions.remove_reaction(user_id, key);
878
879                if reaction_info.is_some() {
880                    let new_item = item.with_reactions(reactions);
881                    state.items.replace(item_pos, new_item);
882                } else {
883                    warn!(
884                        "reaction is missing on the item, not removing it locally, \
885                         but sending redaction."
886                    );
887                }
888
889                // Release the lock before running the request.
890                drop(state);
891
892                trace!("sending redact for a previous reaction");
893                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
894                    if let Some(reaction_info) = reaction_info {
895                        debug!("sending redact failed, adding the reaction back to the list");
896
897                        let mut state = self.state.write().await;
898                        if let Some((item_pos, item)) =
899                            rfind_event_by_id(&state.items, &annotated_event_id)
900                        {
901                            // Re-add the reaction to the mapping.
902                            let mut reactions =
903                                item.content().reactions().cloned().unwrap_or_default();
904                            reactions
905                                .entry(key.to_owned())
906                                .or_default()
907                                .insert(user_id.to_owned(), reaction_info);
908                            let new_item = item.with_reactions(reactions);
909                            state.items.replace(item_pos, new_item);
910                        } else {
911                            warn!(
912                                "couldn't find item to re-add reaction anymore; \
913                                 maybe it's been redacted?"
914                            );
915                        }
916                    }
917
918                    return Err(err);
919                }
920            }
921        }
922
923        Ok(false)
924    }
925
926    /// Handle updates on events as [`VectorDiff`]s.
927    pub(super) async fn handle_remote_events_with_diffs(
928        &self,
929        diffs: Vec<VectorDiff<TimelineEvent>>,
930        origin: RemoteEventOrigin,
931    ) {
932        if diffs.is_empty() {
933            return;
934        }
935
936        let mut state = self.state.write().await;
937        state
938            .handle_remote_events_with_diffs(
939                diffs,
940                origin,
941                &self.room_data_provider,
942                &self.settings,
943            )
944            .await
945    }
946
947    /// Only handle aggregations received as [`VectorDiff`]s.
948    pub(super) async fn handle_remote_aggregations(
949        &self,
950        diffs: Vec<VectorDiff<TimelineEvent>>,
951        origin: RemoteEventOrigin,
952    ) {
953        if diffs.is_empty() {
954            return;
955        }
956
957        let mut state = self.state.write().await;
958        state
959            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
960            .await
961    }
962
963    pub(super) async fn clear(&self) {
964        self.state.write().await.clear();
965    }
966
967    /// Replaces the content of the current timeline with initial events.
968    ///
969    /// Also sets up read receipts and the read marker for a live timeline of a
970    /// room.
971    ///
972    /// This is all done with a single lock guard, since we don't want the state
973    /// to be modified between the clear and re-insertion of new events.
974    pub(super) async fn replace_with_initial_remote_events<Events>(
975        &self,
976        events: Events,
977        origin: RemoteEventOrigin,
978    ) where
979        Events: IntoIterator,
980        <Events as IntoIterator>::Item: Into<TimelineEvent>,
981    {
982        let mut state = self.state.write().await;
983
984        let track_read_markers = self.settings.track_read_receipts;
985        if track_read_markers {
986            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
987            state
988                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
989                .await;
990        }
991
992        // Replace the events if either the current event list or the new one aren't
993        // empty.
994        // Previously we just had to check the new one wasn't empty because
995        // we did a clear operation before so the current one would always be empty, but
996        // now we may want to replace a populated timeline with an empty one.
997        let mut events = events.into_iter().peekable();
998        if !state.items.is_empty() || events.peek().is_some() {
999            state
1000                .replace_with_remote_events(
1001                    events,
1002                    origin,
1003                    &self.room_data_provider,
1004                    &self.settings,
1005                )
1006                .await;
1007        }
1008
1009        if track_read_markers {
1010            if let Some(fully_read_event_id) =
1011                self.room_data_provider.load_fully_read_marker().await
1012            {
1013                state.handle_fully_read_marker(fully_read_event_id);
1014            } else if let Some(latest_receipt_event_id) = state
1015                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1016            {
1017                // Fall back to read receipt if no fully read marker exists.
1018                debug!("no `m.fully_read` marker found, falling back to read receipt");
1019                state.handle_fully_read_marker(latest_receipt_event_id);
1020            }
1021        }
1022    }
1023
1024    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1025        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1026    }
1027
1028    pub(super) async fn handle_ephemeral_events(
1029        &self,
1030        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1031    ) {
1032        let mut state = self.state.write().await;
1033        state.handle_ephemeral_events(events, &self.room_data_provider).await;
1034    }
1035
1036    /// Creates the local echo for an event we're sending.
1037    #[instrument(skip_all)]
1038    pub(super) async fn handle_local_event(
1039        &self,
1040        txn_id: OwnedTransactionId,
1041        content: AnyMessageLikeEventContent,
1042        send_handle: Option<SendHandle>,
1043    ) {
1044        let sender = self.room_data_provider.own_user_id().to_owned();
1045        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1046
1047        let date_divider_mode = self.settings.date_divider_mode.clone();
1048
1049        let mut state = self.state.write().await;
1050        state
1051            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1052            .await;
1053    }
1054
1055    /// Update the send state of a local event represented by a transaction ID.
1056    ///
1057    /// If the corresponding local timeline item is missing, a warning is
1058    /// raised.
1059    #[instrument(skip(self))]
1060    pub(super) async fn update_event_send_state(
1061        &self,
1062        txn_id: &TransactionId,
1063        send_state: EventSendState,
1064    ) {
1065        let mut state = self.state.write().await;
1066        let mut txn = state.transaction();
1067
1068        let new_event_id: Option<&EventId> =
1069            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1070
1071        // The local echoes are always at the end of the timeline, we must first make
1072        // sure the remote echo hasn't showed up yet.
1073        if rfind_event_item(&txn.items, |it| {
1074            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1075        })
1076        .is_some()
1077        {
1078            // Remote echo already received. This is very unlikely.
1079            trace!("Remote echo received before send-event response");
1080
1081            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1082
1083            // If there's both the remote echo and a local echo, that means the
1084            // remote echo was received before the response *and* contained no
1085            // transaction ID (and thus duplicated the local echo).
1086            if let Some((idx, _)) = local_echo {
1087                warn!("Message echo got duplicated, removing the local one");
1088                txn.items.remove(idx);
1089
1090                // Adjust the date dividers, if needs be.
1091                let mut adjuster =
1092                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1093                adjuster.run(&mut txn.items, &mut txn.meta);
1094            }
1095
1096            txn.commit();
1097            return;
1098        }
1099
1100        // Look for the local event by the transaction ID or event ID.
1101        let result = rfind_event_item(&txn.items, |it| {
1102            it.transaction_id() == Some(txn_id)
1103                || new_event_id.is_some()
1104                    && it.event_id() == new_event_id
1105                    && it.as_local().is_some()
1106        });
1107
1108        let Some((idx, item)) = result else {
1109            // Event wasn't found as a standalone item.
1110            //
1111            // If it was just sent, try to find if it matches a corresponding aggregation,
1112            // and mark it as sent in that case.
1113            if let Some(new_event_id) = new_event_id {
1114                if txn.meta.aggregations.mark_aggregation_as_sent(
1115                    txn_id.to_owned(),
1116                    new_event_id.to_owned(),
1117                    &mut txn.items,
1118                    &txn.meta.room_version_rules,
1119                ) {
1120                    trace!("Aggregation marked as sent");
1121                    txn.commit();
1122                    return;
1123                }
1124
1125                trace!("Sent aggregation was not found");
1126            }
1127
1128            warn!("Timeline item not found, can't update send state");
1129            return;
1130        };
1131
1132        let Some(local_item) = item.as_local() else {
1133            warn!("We looked for a local item, but it transitioned to remote.");
1134            return;
1135        };
1136
1137        // The event was already marked as sent, that's a broken state, let's
1138        // emit an error but also override to the given sent state.
1139        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1140            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1141        }
1142
1143        // If the event has just been marked as sent, update the aggregations mapping to
1144        // take that into account.
1145        if let Some(new_event_id) = new_event_id {
1146            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1147        }
1148
1149        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1150        txn.items.replace(idx, new_item);
1151
1152        txn.commit();
1153    }
1154
1155    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1156        let mut state = self.state.write().await;
1157
1158        if let Some((idx, _)) =
1159            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1160        {
1161            let mut txn = state.transaction();
1162
1163            txn.items.remove(idx);
1164
1165            // A read marker or a date divider may have been inserted before the local echo.
1166            // Ensure both are up to date.
1167            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1168            adjuster.run(&mut txn.items, &mut txn.meta);
1169
1170            txn.meta.update_read_marker(&mut txn.items);
1171
1172            txn.commit();
1173
1174            debug!("discarded local echo");
1175            return true;
1176        }
1177
1178        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1179        // dereferencing it once.
1180        let mut txn = state.transaction();
1181
1182        // Look if this was a local aggregation.
1183        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1184            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1185            &mut txn.items,
1186        ) {
1187            Ok(val) => val,
1188            Err(err) => {
1189                warn!("error when discarding local echo for an aggregation: {err}");
1190                // The aggregation has been found, it's just that we couldn't discard it.
1191                true
1192            }
1193        };
1194
1195        if found_aggregation {
1196            txn.commit();
1197        }
1198
1199        found_aggregation
1200    }
1201
1202    pub(super) async fn replace_local_echo(
1203        &self,
1204        txn_id: &TransactionId,
1205        content: AnyMessageLikeEventContent,
1206    ) -> bool {
1207        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1208            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1209            // handling RoomMessage should be sufficient in most cases. Worst
1210            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1211            // editing the event then.
1212            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1213            return false;
1214        };
1215
1216        let mut state = self.state.write().await;
1217        let mut txn = state.transaction();
1218
1219        let Some((idx, prev_item)) =
1220            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1221        else {
1222            debug!("Can't find local echo to replace");
1223            return false;
1224        };
1225
1226        // Reuse the previous local echo's state, but reset the send state to not sent
1227        // (per API contract).
1228        let ti_kind = {
1229            let Some(prev_local_item) = prev_item.as_local() else {
1230                warn!("We looked for a local item, but it transitioned as remote??");
1231                return false;
1232            };
1233            // If the local echo had an upload progress, retain it.
1234            let progress = as_variant!(&prev_local_item.send_state,
1235                EventSendState::NotSentYet { progress } => progress.clone())
1236            .flatten();
1237            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1238        };
1239
1240        // Replace the local-related state (kind) and the content state.
1241        let new_item = TimelineItem::new(
1242            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1243                content.msgtype,
1244                content.mentions,
1245                prev_item.content().reactions().cloned().unwrap_or_default(),
1246                prev_item.content().thread_root(),
1247                prev_item.content().in_reply_to(),
1248                prev_item.content().thread_summary(),
1249            )),
1250            prev_item.internal_id.to_owned(),
1251        );
1252
1253        txn.items.replace(idx, new_item);
1254
1255        // This doesn't change the original sending time, so there's no need to adjust
1256        // date dividers.
1257
1258        txn.commit();
1259
1260        debug!("Replaced local echo");
1261        true
1262    }
1263
1264    pub(super) async fn compute_redecryption_candidates(
1265        &self,
1266    ) -> (BTreeSet<String>, BTreeSet<String>) {
1267        let state = self.state.read().await;
1268        compute_redecryption_candidates(&state.items)
1269    }
1270
1271    pub(super) async fn set_sender_profiles_pending(&self) {
1272        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1273    }
1274
1275    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1276        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1277    }
1278
1279    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1280        self.state.write().await.items.for_each(|mut entry| {
1281            let Some(event_item) = entry.as_event() else { return };
1282            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1283                let new_item = entry.with_kind(TimelineItemKind::Event(
1284                    event_item.with_sender_profile(profile_state.clone()),
1285                ));
1286                ObservableItemsEntry::replace(&mut entry, new_item);
1287            }
1288        });
1289    }
1290
1291    pub(super) async fn update_missing_sender_profiles(&self) {
1292        trace!("Updating missing sender profiles");
1293
1294        let mut state = self.state.write().await;
1295        let mut entries = state.items.entries();
1296        while let Some(mut entry) = entries.next() {
1297            let Some(event_item) = entry.as_event() else { continue };
1298            let event_id = event_item.event_id().map(debug);
1299            let transaction_id = event_item.transaction_id().map(debug);
1300
1301            if event_item.sender_profile().is_ready() {
1302                trace!(event_id, transaction_id, "Profile already set");
1303                continue;
1304            }
1305
1306            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1307                Some(profile) => {
1308                    trace!(event_id, transaction_id, "Adding profile");
1309                    let updated_item =
1310                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1311                    let new_item = entry.with_kind(updated_item);
1312                    ObservableItemsEntry::replace(&mut entry, new_item);
1313                }
1314                None => {
1315                    if !event_item.sender_profile().is_unavailable() {
1316                        trace!(event_id, transaction_id, "Marking profile unavailable");
1317                        let updated_item =
1318                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1319                        let new_item = entry.with_kind(updated_item);
1320                        ObservableItemsEntry::replace(&mut entry, new_item);
1321                    } else {
1322                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1323                    }
1324                }
1325            }
1326        }
1327
1328        trace!("Done updating missing sender profiles");
1329    }
1330
1331    /// Update the profiles of the given senders, even if they are ready.
1332    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1333        trace!("Forcing update of sender profiles: {sender_ids:?}");
1334
1335        let mut state = self.state.write().await;
1336        let mut entries = state.items.entries();
1337        while let Some(mut entry) = entries.next() {
1338            let Some(event_item) = entry.as_event() else { continue };
1339            if !sender_ids.contains(event_item.sender()) {
1340                continue;
1341            }
1342
1343            let event_id = event_item.event_id().map(debug);
1344            let transaction_id = event_item.transaction_id().map(debug);
1345
1346            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1347                Some(profile) => {
1348                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1349                    {
1350                        debug!(event_id, transaction_id, "Profile already up-to-date");
1351                    } else {
1352                        trace!(event_id, transaction_id, "Updating profile");
1353                        let updated_item =
1354                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1355                        let new_item = entry.with_kind(updated_item);
1356                        ObservableItemsEntry::replace(&mut entry, new_item);
1357                    }
1358                }
1359                None => {
1360                    if !event_item.sender_profile().is_unavailable() {
1361                        trace!(event_id, transaction_id, "Marking profile unavailable");
1362                        let updated_item =
1363                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1364                        let new_item = entry.with_kind(updated_item);
1365                        ObservableItemsEntry::replace(&mut entry, new_item);
1366                    } else {
1367                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1368                    }
1369                }
1370            }
1371        }
1372
1373        trace!("Done forcing update of sender profiles");
1374    }
1375
1376    #[cfg(test)]
1377    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1378        let own_user_id = self.room_data_provider.own_user_id();
1379        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1380    }
1381
1382    /// Get the latest read receipt for the given user.
1383    ///
1384    /// Useful to get the latest read receipt, whether it's private or public.
1385    pub(super) async fn latest_user_read_receipt(
1386        &self,
1387        user_id: &UserId,
1388    ) -> Option<(OwnedEventId, Receipt)> {
1389        let receipt_thread = self.focus.receipt_thread();
1390
1391        self.state
1392            .read()
1393            .await
1394            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1395            .await
1396    }
1397
1398    /// Get the ID of the timeline event with the latest read receipt for the
1399    /// given user.
1400    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1401        &self,
1402        user_id: &UserId,
1403    ) -> Option<OwnedEventId> {
1404        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1405    }
1406
1407    /// Subscribe to changes in the read receipts of our own user.
1408    pub async fn subscribe_own_user_read_receipts_changed(
1409        &self,
1410    ) -> impl Stream<Item = ()> + use<P> {
1411        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1412    }
1413
1414    /// Handle a room send update that's a new local echo.
1415    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1416        match echo.content {
1417            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1418                let content = match serialized_event.deserialize() {
1419                    Ok(d) => d,
1420                    Err(err) => {
1421                        warn!("error deserializing local echo: {err}");
1422                        return;
1423                    }
1424                };
1425
1426                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1427                    .await;
1428
1429                if let Some(send_error) = send_error {
1430                    self.update_event_send_state(
1431                        &echo.transaction_id,
1432                        EventSendState::SendingFailed {
1433                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1434                                send_error,
1435                            ))),
1436                            is_recoverable: false,
1437                        },
1438                    )
1439                    .await;
1440                }
1441            }
1442
1443            LocalEchoContent::React { key, send_handle, applies_to } => {
1444                self.handle_local_reaction(key, send_handle, applies_to).await;
1445            }
1446        }
1447    }
1448
1449    /// Adds a reaction (local echo) to a local echo.
1450    #[instrument(skip(self, send_handle))]
1451    async fn handle_local_reaction(
1452        &self,
1453        reaction_key: String,
1454        send_handle: SendReactionHandle,
1455        applies_to: OwnedTransactionId,
1456    ) {
1457        let mut state = self.state.write().await;
1458        let mut tr = state.transaction();
1459
1460        let target = TimelineEventItemId::TransactionId(applies_to);
1461
1462        let reaction_txn_id = send_handle.transaction_id().to_owned();
1463        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1464        let aggregation = Aggregation::new(
1465            TimelineEventItemId::TransactionId(reaction_txn_id),
1466            AggregationKind::Reaction {
1467                key: reaction_key.clone(),
1468                sender: self.room_data_provider.own_user_id().to_owned(),
1469                timestamp: MilliSecondsSinceUnixEpoch::now(),
1470                reaction_status,
1471            },
1472        );
1473
1474        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1475        find_item_and_apply_aggregation(
1476            &tr.meta.aggregations,
1477            &mut tr.items,
1478            &target,
1479            aggregation,
1480            &tr.meta.room_version_rules,
1481        );
1482
1483        tr.commit();
1484    }
1485
1486    /// Handle a single room send queue update.
1487    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1488        match update {
1489            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1490                self.handle_local_echo(echo).await;
1491            }
1492
1493            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1494                if !self.discard_local_echo(&transaction_id).await {
1495                    warn!("couldn't find the local echo to discard");
1496                }
1497            }
1498
1499            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1500                let content = match new_content.deserialize() {
1501                    Ok(d) => d,
1502                    Err(err) => {
1503                        warn!("error deserializing local echo (upon edit): {err}");
1504                        return;
1505                    }
1506                };
1507
1508                if !self.replace_local_echo(&transaction_id, content).await {
1509                    warn!("couldn't find the local echo to replace");
1510                }
1511            }
1512
1513            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1514                self.update_event_send_state(
1515                    &transaction_id,
1516                    EventSendState::SendingFailed { error, is_recoverable },
1517                )
1518                .await;
1519            }
1520
1521            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1522                self.update_event_send_state(
1523                    &transaction_id,
1524                    EventSendState::NotSentYet { progress: None },
1525                )
1526                .await;
1527            }
1528
1529            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1530                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1531                    .await;
1532            }
1533
1534            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1535                self.update_event_send_state(
1536                    &related_to,
1537                    EventSendState::NotSentYet {
1538                        progress: Some(MediaUploadProgress { index, progress }),
1539                    },
1540                )
1541                .await;
1542            }
1543        }
1544    }
1545
1546    /// Insert a timeline start item at the beginning of the room, if it's
1547    /// missing.
1548    pub async fn insert_timeline_start_if_missing(&self) {
1549        let mut state = self.state.write().await;
1550        let mut txn = state.transaction();
1551        txn.items.push_timeline_start_if_missing(
1552            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1553        );
1554        txn.commit();
1555    }
1556
1557    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1558    /// timeline or not.
1559    ///
1560    /// Can be `None` if the event cannot be represented as a standalone item,
1561    /// because it's an aggregation.
1562    pub(super) async fn make_replied_to(
1563        &self,
1564        event: TimelineEvent,
1565    ) -> Result<Option<EmbeddedEvent>, Error> {
1566        let state = self.state.read().await;
1567        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1568    }
1569}
1570
1571impl TimelineController {
1572    pub(super) fn room(&self) -> &Room {
1573        &self.room_data_provider
1574    }
1575
1576    /// Given an event identifier, will fetch the details for the event it's
1577    /// replying to, if applicable.
1578    #[instrument(skip(self))]
1579    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1580        let state_guard = self.state.write().await;
1581        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1582            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1583        let remote_item = item
1584            .as_remote()
1585            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1586            .clone();
1587
1588        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1589            debug!("Event is not a message");
1590            return Ok(());
1591        };
1592        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1593            debug!("Event is not a reply");
1594            return Ok(());
1595        };
1596        if let TimelineDetails::Pending = &in_reply_to.event {
1597            debug!("Replied-to event is already being fetched");
1598            return Ok(());
1599        }
1600        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1601            debug!("Replied-to event has already been fetched");
1602            return Ok(());
1603        }
1604
1605        let internal_id = item.internal_id.to_owned();
1606        let item = item.clone();
1607        let event = fetch_replied_to_event(
1608            state_guard,
1609            &self.state,
1610            index,
1611            &item,
1612            internal_id,
1613            &msglike,
1614            &in_reply_to.event_id,
1615            self.room(),
1616        )
1617        .await?;
1618
1619        // We need to be sure to have the latest position of the event as it might have
1620        // changed while waiting for the request.
1621        let mut state = self.state.write().await;
1622        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1623            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1624
1625        // Check the state of the event again, it might have been redacted while
1626        // the request was in-flight.
1627        let TimelineItemContent::MsgLike(MsgLikeContent {
1628            kind: MsgLikeKind::Message(message),
1629            reactions,
1630            thread_root,
1631            in_reply_to,
1632            thread_summary,
1633        }) = item.content().clone()
1634        else {
1635            info!("Event is no longer a message (redacted?)");
1636            return Ok(());
1637        };
1638        let Some(in_reply_to) = in_reply_to else {
1639            warn!("Event no longer has a reply (bug?)");
1640            return Ok(());
1641        };
1642
1643        // Now that we've received the content of the replied-to event, replace the
1644        // replied-to content in the item with it.
1645        trace!("Updating in-reply-to details");
1646        let internal_id = item.internal_id.to_owned();
1647        let mut item = item.clone();
1648        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1649            kind: MsgLikeKind::Message(message),
1650            reactions,
1651            thread_root,
1652            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1653            thread_summary,
1654        }));
1655        state.items.replace(index, TimelineItem::new(item, internal_id));
1656
1657        Ok(())
1658    }
1659
1660    /// Returns the thread that should be used for a read receipt based on the
1661    /// current focus of the timeline and the receipt type.
1662    ///
1663    /// A `SendReceiptType::FullyRead` will always use
1664    /// `ReceiptThread::Unthreaded`
1665    pub(super) fn infer_thread_for_read_receipt(
1666        &self,
1667        receipt_type: &SendReceiptType,
1668    ) -> ReceiptThread {
1669        if matches!(receipt_type, SendReceiptType::FullyRead) {
1670            ReceiptThread::Unthreaded
1671        } else {
1672            self.focus.receipt_thread()
1673        }
1674    }
1675
1676    /// Check whether the given receipt should be sent.
1677    ///
1678    /// Returns `false` if the given receipt is older than the current one.
1679    pub(super) async fn should_send_receipt(
1680        &self,
1681        receipt_type: &SendReceiptType,
1682        receipt_thread: &ReceiptThread,
1683        event_id: &EventId,
1684    ) -> bool {
1685        let own_user_id = self.room().own_user_id();
1686        let state = self.state.read().await;
1687        let room = self.room();
1688
1689        match receipt_type {
1690            SendReceiptType::Read => {
1691                if let Some((old_pub_read, _)) = state
1692                    .meta
1693                    .user_receipt(
1694                        own_user_id,
1695                        ReceiptType::Read,
1696                        receipt_thread.clone(),
1697                        room,
1698                        state.items.all_remote_events(),
1699                    )
1700                    .await
1701                {
1702                    trace!(%old_pub_read, "found a previous public receipt");
1703                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1704                        &old_pub_read,
1705                        event_id,
1706                        state.items.all_remote_events(),
1707                    ) {
1708                        trace!(
1709                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1710                        );
1711                        return relative_pos == RelativePosition::After;
1712                    }
1713                }
1714            }
1715
1716            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1717            // doesn't make sense to have a private read receipt behind a public one.
1718            SendReceiptType::ReadPrivate => {
1719                if let Some((old_priv_read, _)) =
1720                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1721                {
1722                    trace!(%old_priv_read, "found a previous private receipt");
1723                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1724                        &old_priv_read,
1725                        event_id,
1726                        state.items.all_remote_events(),
1727                    ) {
1728                        trace!(
1729                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1730                        );
1731                        return relative_pos == RelativePosition::After;
1732                    }
1733                }
1734            }
1735
1736            SendReceiptType::FullyRead => {
1737                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1738                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1739                        &prev_event_id,
1740                        event_id,
1741                        state.items.all_remote_events(),
1742                    )
1743                {
1744                    return relative_pos == RelativePosition::After;
1745                }
1746            }
1747
1748            _ => {}
1749        }
1750
1751        // Let the server handle unknown receipts.
1752        true
1753    }
1754
1755    /// Returns the latest event identifier, even if it's not visible, or if
1756    /// it's folded into another timeline item.
1757    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1758        let state = self.state.read().await;
1759        let filter_out_thread_events = match self.focus() {
1760            TimelineFocusKind::Thread { .. } => false,
1761            TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1762            TimelineFocusKind::Event { paginator } => {
1763                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1764            }
1765            _ => true,
1766        };
1767
1768        // In some timelines, threaded events are added to the `AllRemoteEvents`
1769        // collection since they need to be taken into account to calculate read
1770        // receipts, but we don't want to actually take them into account for returning
1771        // the latest event id since they're not visibly in the timeline
1772        state
1773            .items
1774            .all_remote_events()
1775            .iter()
1776            .rev()
1777            .filter_map(|item| {
1778                if !filter_out_thread_events || item.thread_root_id.is_none() {
1779                    Some(item.event_id.clone())
1780                } else {
1781                    None
1782                }
1783            })
1784            .next()
1785    }
1786
1787    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1788    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1789        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1790
1791        let request = DecryptionRetryRequest {
1792            room_id: self.room().room_id().to_owned(),
1793            utd_session_ids: utds,
1794            refresh_info_session_ids: decrypted,
1795        };
1796
1797        self.room().client().event_cache().request_decryption(request);
1798    }
1799
1800    /// Combine the global (event cache) pagination status with the local state
1801    /// of the timeline.
1802    ///
1803    /// This only changes the global pagination status of this room, in one
1804    /// case: if the timeline has a skip count greater than 0, it will
1805    /// ensure that the pagination status says that we haven't reached the
1806    /// timeline start yet.
1807    pub(super) async fn map_pagination_status(
1808        &self,
1809        status: RoomPaginationStatus,
1810    ) -> RoomPaginationStatus {
1811        match status {
1812            RoomPaginationStatus::Idle { hit_timeline_start } => {
1813                if hit_timeline_start {
1814                    let state = self.state.read().await;
1815                    // If the skip count is greater than 0, it means that a subsequent pagination
1816                    // could return more items, so pretend we didn't get the information that the
1817                    // timeline start was hit.
1818                    if state.meta.subscriber_skip_count.get() > 0 {
1819                        return RoomPaginationStatus::Idle { hit_timeline_start: false };
1820                    }
1821                }
1822            }
1823            RoomPaginationStatus::Paginating => {}
1824        }
1825
1826        // You're perfect, just the way you are.
1827        status
1828    }
1829}
1830
1831impl<P: RoomDataProvider> TimelineController<P> {
1832    /// Returns the timeline focus of the [`TimelineController`].
1833    pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1834        &self.focus
1835    }
1836}
1837
1838#[allow(clippy::too_many_arguments)]
1839async fn fetch_replied_to_event<P: RoomDataProvider>(
1840    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1841    state_lock: &RwLock<TimelineState<P>>,
1842    index: usize,
1843    item: &EventTimelineItem,
1844    internal_id: TimelineUniqueId,
1845    msglike: &MsgLikeContent,
1846    in_reply_to: &EventId,
1847    room: &Room,
1848) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1849    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1850        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1851        trace!("Found replied-to event locally");
1852        return Ok(details);
1853    }
1854
1855    // Replace the item with a new timeline item that has the fetching status of the
1856    // replied-to event to pending.
1857    trace!("Setting in-reply-to details to pending");
1858    let in_reply_to_details =
1859        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1860
1861    let event_item = item
1862        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1863
1864    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1865    state_guard.items.replace(index, new_timeline_item);
1866
1867    // Don't hold the state lock while the network request is made.
1868    drop(state_guard);
1869
1870    trace!("Fetching replied-to event");
1871    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1872        Ok(timeline_event) => {
1873            let state = state_lock.read().await;
1874
1875            let replied_to_item =
1876                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1877
1878            if let Some(item) = replied_to_item {
1879                TimelineDetails::Ready(Box::new(item))
1880            } else {
1881                // The replied-to item is an aggregation, not a standalone item.
1882                return Err(Error::UnsupportedEvent);
1883            }
1884        }
1885
1886        Err(e) => TimelineDetails::Error(Arc::new(e)),
1887    };
1888
1889    Ok(res)
1890}