Skip to main content

matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeSet,
17    fmt,
18    ops::Deref,
19    sync::{Arc, OnceLock},
20};
21
22use as_variant::as_variant;
23use eyeball_im::{VectorDiff, VectorSubscriberStream};
24use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
25use futures_core::Stream;
26use imbl::Vector;
27use matrix_sdk::{
28    deserialized_responses::TimelineEvent,
29    event_cache::{
30        DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, PinnedEventsCache,
31        RoomEventCache, ThreadEventCache, TimelineVectorDiffs,
32    },
33    send_queue::{
34        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
35    },
36    task_monitor::BackgroundTaskHandle,
37};
38#[cfg(test)]
39use ruma::events::receipt::ReceiptEventContent;
40use ruma::{
41    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
42    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
43    events::{
44        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
45        AnySyncTimelineEvent, MessageLikeEventType,
46        poll::unstable_start::UnstablePollStartEventContent,
47        reaction::ReactionEventContent,
48        receipt::{Receipt, ReceiptThread, ReceiptType},
49        relation::Annotation,
50        room::message::{MessageType, Relation},
51    },
52    room_version_rules::RoomVersionRules,
53    serde::Raw,
54};
55use tokio::sync::{RwLock, RwLockWriteGuard, broadcast};
56use tracing::{
57    Instrument as _, Span, debug, error, field::debug, info, info_span, instrument, trace, warn,
58};
59
60pub(super) use self::{
61    metadata::{RelativePosition, TimelineMetadata},
62    observable_items::{
63        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
64        ObservableItemsTransactionEntry,
65    },
66    state::TimelineState,
67    state_transaction::TimelineStateTransaction,
68};
69use super::{
70    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
71    MediaUploadProgress, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus,
72    TimelineItem, TimelineItemContent, TimelineItemKind, TimelineReadReceiptTracking,
73    VirtualTimelineItem,
74    algorithms::{rfind_event_by_id, rfind_event_item},
75    event_item::{ReactionStatus, RemoteEventOrigin},
76    item::TimelineUniqueId,
77    subscriber::TimelineSubscriber,
78    traits::RoomDataProvider,
79};
80use crate::{
81    timeline::{
82        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
83        algorithms::rfind_event_by_item_id,
84        controller::decryption_retry_task::compute_redecryption_candidates,
85        date_dividers::DateDividerAdjuster,
86        event_item::TimelineItemHandle,
87        tasks::{event_focused_task, pinned_events_task, thread_updates_task},
88    },
89    unable_to_decrypt_hook::UtdHookManager,
90};
91
92pub(in crate::timeline) mod aggregations;
93mod decryption_retry_task;
94mod metadata;
95mod observable_items;
96mod read_receipts;
97mod state;
98mod state_transaction;
99
100pub(super) use aggregations::*;
101pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
102
103/// Data associated to the current timeline focus.
104///
105/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
106/// version of it, including extra state that makes it useful over the lifetime
107/// of a timeline.
108#[derive(Debug)]
109pub(in crate::timeline) enum TimelineFocusKind {
110    /// The timeline receives live events from the sync.
111    Live {
112        /// Whether to hide in-thread events from the timeline.
113        hide_threaded_events: bool,
114
115        /// The cache holding all the events for this focus.
116        event_cache: RoomEventCache,
117    },
118
119    /// The timeline is focused on a single event, and it can expand in one
120    /// direction or another.
121    Event {
122        /// The focused event ID.
123        focused_event_id: OwnedEventId,
124
125        /// If the focused event is part or the root of a thread, what's the
126        /// thread root?
127        ///
128        /// This is determined once when initializing the event-focused cache,
129        /// and then it won't change for the duration of this timeline.
130        thread_root: OnceLock<OwnedEventId>,
131
132        /// The thread mode to use for this event-focused timeline, which is
133        /// part of the key for the memoized event-focused cache.
134        thread_mode: TimelineEventFocusThreadMode,
135
136        /// The cache holding all the events for this focus.
137        event_cache: EventFocusedCache,
138    },
139
140    /// A live timeline for a thread.
141    Thread {
142        /// The root event for the current thread.
143        root_event_id: OwnedEventId,
144
145        /// The cache holding all the events for this focus.
146        event_cache: ThreadEventCache,
147    },
148
149    PinnedEvents {
150        /// The cache holding all the events for this focus.
151        event_cache: PinnedEventsCache,
152    },
153}
154
155impl TimelineFocusKind {
156    /// Returns the [`ReceiptThread`] that should be used for the current
157    /// timeline focus.
158    ///
159    /// Live and event timelines will use the unthreaded read receipt type in
160    /// general, unless they hide in-thread events, in which case they will
161    /// use the main thread.
162    pub(super) fn receipt_thread(&self) -> ReceiptThread {
163        if let Some(thread_root) = self.thread_root() {
164            ReceiptThread::Thread(thread_root.to_owned())
165        } else if self.hide_threaded_events() {
166            ReceiptThread::Main
167        } else {
168            ReceiptThread::Unthreaded
169        }
170    }
171
172    /// Whether to hide in-thread events from the timeline.
173    fn hide_threaded_events(&self) -> bool {
174        match self {
175            TimelineFocusKind::Live { hide_threaded_events, .. } => *hide_threaded_events,
176            TimelineFocusKind::Event { thread_mode, .. } => {
177                matches!(
178                    thread_mode,
179                    TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
180                )
181            }
182            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
183        }
184    }
185
186    /// Whether the focus is on a thread (from a live thread or a thread
187    /// permalink).
188    fn is_thread(&self) -> bool {
189        self.thread_root().is_some()
190    }
191
192    /// If the focus is a thread, returns its root event ID.
193    fn thread_root(&self) -> Option<&EventId> {
194        match self {
195            TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
196            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
197            TimelineFocusKind::Thread { root_event_id, .. } => Some(root_event_id),
198        }
199    }
200}
201
202#[derive(Clone, Debug)]
203pub(super) struct TimelineController<P: RoomDataProvider = Room> {
204    /// Inner mutable state.
205    state: Arc<RwLock<TimelineState<P>>>,
206
207    /// Focus data.
208    focus: Arc<TimelineFocusKind>,
209
210    /// A [`RoomDataProvider`] implementation, providing data.
211    ///
212    /// The type is a `RoomDataProvider` to allow testing. In the real world,
213    /// this would normally be a [`Room`].
214    pub(crate) room_data_provider: P,
215
216    /// Settings applied to this timeline.
217    pub(super) settings: TimelineSettings,
218}
219
220#[derive(Clone)]
221pub(super) struct TimelineSettings {
222    /// Should the read receipts and read markers be handled and on which event
223    /// types?
224    pub(super) track_read_receipts: TimelineReadReceiptTracking,
225
226    /// Event filter that controls what's rendered as a timeline item (and thus
227    /// what can carry read receipts).
228    pub(super) event_filter: Arc<TimelineEventFilterFn>,
229
230    /// Are unparsable events added as timeline items of their own kind?
231    pub(super) add_failed_to_parse: bool,
232
233    /// Should the timeline items be grouped by day or month?
234    pub(super) date_divider_mode: DateDividerMode,
235}
236
237#[cfg(not(tarpaulin_include))]
238impl fmt::Debug for TimelineSettings {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        f.debug_struct("TimelineSettings")
241            .field("track_read_receipts", &self.track_read_receipts)
242            .field("add_failed_to_parse", &self.add_failed_to_parse)
243            .finish_non_exhaustive()
244    }
245}
246
247impl Default for TimelineSettings {
248    fn default() -> Self {
249        Self {
250            track_read_receipts: TimelineReadReceiptTracking::Disabled,
251            event_filter: Arc::new(default_event_filter),
252            add_failed_to_parse: true,
253            date_divider_mode: DateDividerMode::Daily,
254        }
255    }
256}
257
258/// The default event filter for
259/// [`crate::timeline::TimelineBuilder::event_filter`].
260///
261/// It filters out events that are not rendered by the timeline, including but
262/// not limited to: reactions, edits, redactions on existing messages.
263///
264/// If you have a custom filter, it may be best to chain yours with this one if
265/// you do not want to run into situations where a read receipt is not visible
266/// because it's living on an event that doesn't have a matching timeline item.
267pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
268    match event {
269        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
270            if ev.redacts(&rules.redaction).is_some() {
271                // This is a redaction of an existing message, we'll only update the previous
272                // message and not render a new entry.
273                false
274            } else {
275                // This is a redacted entry, that we'll show only if the redacted entity wasn't
276                // a reaction.
277                ev.event_type() != MessageLikeEventType::Reaction
278            }
279        }
280
281        AnySyncTimelineEvent::MessageLike(msg) => {
282            match msg.original_content() {
283                None => {
284                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
285                    // a reaction.
286                    msg.event_type() != MessageLikeEventType::Reaction
287                }
288
289                Some(original_content) => {
290                    match original_content {
291                        AnyMessageLikeEventContent::RoomMessage(content) => {
292                            if content
293                                .relates_to
294                                .as_ref()
295                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
296                            {
297                                // Edits aren't visible by default.
298                                return false;
299                            }
300
301                            match content.msgtype {
302                                MessageType::Audio(_)
303                                | MessageType::Emote(_)
304                                | MessageType::File(_)
305                                | MessageType::Image(_)
306                                | MessageType::Location(_)
307                                | MessageType::Notice(_)
308                                | MessageType::ServerNotice(_)
309                                | MessageType::Text(_)
310                                | MessageType::Video(_)
311                                | MessageType::VerificationRequest(_) => true,
312                                #[cfg(feature = "unstable-msc4274")]
313                                MessageType::Gallery(_) => true,
314                                _ => false,
315                            }
316                        }
317
318                        AnyMessageLikeEventContent::Sticker(_)
319                        | AnyMessageLikeEventContent::UnstablePollStart(
320                            UnstablePollStartEventContent::New(_),
321                        )
322                        | AnyMessageLikeEventContent::CallInvite(_)
323                        | AnyMessageLikeEventContent::RtcNotification(_)
324                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
325
326                        // Beacon location-update events are aggregated onto
327                        // their parent `beacon_info` state event's timeline
328                        // item. They are never rendered as standalone items.
329                        AnyMessageLikeEventContent::Beacon(_) => false,
330                        // Ignore decline events, the matching RtcNotification event will be updated
331                        // to reflect the decline.
332                        AnyMessageLikeEventContent::RtcDecline(_) => false,
333
334                        _ => false,
335                    }
336                }
337            }
338        }
339
340        AnySyncTimelineEvent::State(_) => {
341            // All the state events may get displayed by default.
342            true
343        }
344    }
345}
346
347/// Result of calling [`TimelineController::init_focus`].
348pub(super) struct InitFocusResult {
349    /// Did the initialization result in having some events in the timeline?
350    pub has_events: bool,
351    /// If the timeline is a non-live timeline, an extra task that subscribes to
352    /// changes to the focus source.
353    pub focus_task: Option<BackgroundTaskHandle>,
354}
355
356impl<P: RoomDataProvider> TimelineController<P> {
357    pub(super) async fn new(
358        room_data_provider: P,
359        focus: &TimelineFocus,
360        event_cache: &EventCache,
361        internal_id_prefix: Option<String>,
362        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
363        is_room_encrypted: bool,
364        settings: TimelineSettings,
365    ) -> Result<Self, Error> {
366        let room_id = room_data_provider.room_id();
367
368        let focus = match focus {
369            TimelineFocus::Live { hide_threaded_events } => TimelineFocusKind::Live {
370                hide_threaded_events: *hide_threaded_events,
371                event_cache: event_cache.room(room_id).await?.0,
372            },
373
374            TimelineFocus::Event { target, thread_mode, num_context_events, .. } => {
375                TimelineFocusKind::Event {
376                    event_cache: event_cache
377                        .event_focused(room_id, target, (*thread_mode).into(), *num_context_events)
378                        .await?
379                        .0,
380                    focused_event_id: target.clone(),
381                    // This will be initialized in `Self::init_focus`.
382                    thread_root: OnceLock::new(),
383                    thread_mode: *thread_mode,
384                }
385            }
386
387            TimelineFocus::Thread { root_event_id, .. } => TimelineFocusKind::Thread {
388                event_cache: event_cache.thread(room_id, root_event_id).await?.0,
389                root_event_id: root_event_id.clone(),
390            },
391
392            TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents {
393                event_cache: event_cache.pinned_events(room_id).await?.0,
394            },
395        };
396
397        let focus = Arc::new(focus);
398        let state = Arc::new(RwLock::new(TimelineState::new(
399            focus.clone(),
400            room_data_provider.own_user_id().to_owned(),
401            room_data_provider.room_version_rules(),
402            internal_id_prefix,
403            unable_to_decrypt_hook,
404            is_room_encrypted,
405        )));
406
407        Ok(Self { state, focus, room_data_provider, settings })
408    }
409
410    /// Listens to encryption state changes for the room in
411    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
412    /// existing timeline items. This will then cause a refresh of those
413    /// timeline items.
414    pub async fn handle_encryption_state_changes(&self) {
415        let mut room_info = self.room_data_provider.room_info();
416
417        // Small function helper to help mark as encrypted.
418        let mark_encrypted = || async {
419            let mut state = self.state.write().await;
420            state.meta.is_room_encrypted = true;
421            state.mark_all_events_as_encrypted();
422        };
423
424        if room_info.get().encryption_state().is_encrypted() {
425            // If the room was already encrypted, it won't toggle to unencrypted, so we can
426            // shut down this task early.
427            mark_encrypted().await;
428            return;
429        }
430
431        while let Some(info) = room_info.next().await {
432            if info.encryption_state().is_encrypted() {
433                mark_encrypted().await;
434                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
435                // here is done.
436                break;
437            }
438        }
439    }
440
441    /// Run a lazy backwards pagination (in live mode).
442    ///
443    /// It adjusts the `count` value of the `Skip` higher-order stream so that
444    /// more items are pushed front in the timeline.
445    ///
446    /// If no more items are available (i.e. if the `count` is zero), this
447    /// method returns `Some(needs)` where `needs` is the number of events that
448    /// must be unlazily backwards paginated.
449    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
450        let state = self.state.read().await;
451
452        let (count, needs) = state
453            .meta
454            .subscriber_skip_count
455            .compute_next_when_paginating_backwards(num_events.into());
456
457        // This always happens on a live timeline.
458        let is_live_timeline = true;
459        state.meta.subscriber_skip_count.update(count, is_live_timeline);
460
461        needs
462    }
463
464    /// Is this timeline receiving events from sync (aka has a live focus)?
465    pub(super) fn is_live(&self) -> bool {
466        matches!(&*self.focus, TimelineFocusKind::Live { .. })
467    }
468
469    /// Is this timeline focused on a thread?
470    pub(super) fn is_threaded(&self) -> bool {
471        self.focus.is_thread()
472    }
473
474    /// The root of the current thread, for a live thread timeline or a
475    /// permalink to a thread message.
476    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
477        self.focus.thread_root().map(ToOwned::to_owned)
478    }
479
480    /// Get a copy of the current items in the list.
481    ///
482    /// Cheap because `im::Vector` is cheap to clone.
483    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
484        self.state.read().await.items.clone_items()
485    }
486
487    #[cfg(test)]
488    pub(super) async fn subscribe_raw(
489        &self,
490    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
491        self.state.read().await.items.subscribe().into_values_and_stream()
492    }
493
494    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
495        let state = self.state.read().await;
496
497        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
498    }
499
500    pub(super) async fn subscribe_filter_map<U, F>(
501        &self,
502        f: F,
503    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
504    where
505        U: Clone,
506        F: Fn(Arc<TimelineItem>) -> Option<U>,
507    {
508        self.state.read().await.items.subscribe().filter_map(f)
509    }
510
511    /// Toggle a reaction locally.
512    ///
513    /// Returns true if the reaction was added, false if it was removed.
514    #[instrument(skip_all)]
515    pub(super) async fn toggle_reaction_local(
516        &self,
517        item_id: &TimelineEventItemId,
518        key: &str,
519    ) -> Result<bool, Error> {
520        let mut state = self.state.write().await;
521
522        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
523            warn!("Timeline item not found, can't add reaction");
524            return Err(Error::FailedToToggleReaction);
525        };
526
527        let user_id = self.room_data_provider.own_user_id();
528        let prev_status = item
529            .content()
530            .reactions()
531            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
532
533        let Some(prev_status) = prev_status else {
534            // Adding the new reaction.
535            match item.handle() {
536                TimelineItemHandle::Local(send_handle) => {
537                    if send_handle
538                        .react(key.to_owned())
539                        .await
540                        .map_err(|err| Error::SendQueueError(err.into()))?
541                        .is_some()
542                    {
543                        trace!("adding a reaction to a local echo");
544                        return Ok(true);
545                    }
546
547                    warn!("couldn't toggle reaction for local echo");
548                    return Ok(false);
549                }
550
551                TimelineItemHandle::Remote(event_id) => {
552                    // Add a reaction through the room data provider.
553                    // No need to reflect the effect locally, since the local echo handling will
554                    // take care of it.
555                    trace!("adding a reaction to a remote echo");
556                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
557                    self.room_data_provider
558                        .send(ReactionEventContent::from(annotation).into())
559                        .await?;
560                    return Ok(true);
561                }
562            }
563        };
564
565        trace!("removing a previous reaction");
566        match prev_status {
567            ReactionStatus::LocalToLocal(send_reaction_handle) => {
568                if let Some(handle) = send_reaction_handle {
569                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
570                        // Impossible state: the reaction has moved from local to echo under our
571                        // feet, but the timeline was supposed to be locked!
572                        warn!("unexpectedly unable to abort sending of local reaction");
573                    }
574                } else {
575                    warn!("no send reaction handle (this should only happen in testing contexts)");
576                }
577            }
578
579            ReactionStatus::LocalToRemote(send_handle) => {
580                // No need to reflect the change ourselves, since handling the discard of the
581                // local echo will take care of it.
582                trace!("aborting send of the previous reaction that was a local echo");
583                if let Some(handle) = send_handle {
584                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
585                        // Impossible state: the reaction has moved from local to echo under our
586                        // feet, but the timeline was supposed to be locked!
587                        warn!("unexpectedly unable to abort sending of local reaction");
588                    }
589                } else {
590                    warn!("no send handle (this should only happen in testing contexts)");
591                }
592            }
593
594            ReactionStatus::RemoteToRemote(event_id) => {
595                // Assume the redaction will work; we'll re-add the reaction if it didn't.
596                let Some(annotated_event_id) =
597                    item.as_remote().map(|event_item| event_item.event_id.clone())
598                else {
599                    warn!("remote reaction to remote event, but the associated item isn't remote");
600                    return Ok(false);
601                };
602
603                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
604                let reaction_info = reactions.remove_reaction(user_id, key);
605
606                if reaction_info.is_some() {
607                    let new_item = item.with_reactions(reactions);
608                    state.items.replace(item_pos, new_item);
609                } else {
610                    warn!(
611                        "reaction is missing on the item, not removing it locally, \
612                         but sending redaction."
613                    );
614                }
615
616                // Release the lock before running the request.
617                drop(state);
618
619                trace!("sending redact for a previous reaction");
620                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
621                    if let Some(reaction_info) = reaction_info {
622                        debug!("sending redact failed, adding the reaction back to the list");
623
624                        let mut state = self.state.write().await;
625                        if let Some((item_pos, item)) =
626                            rfind_event_by_id(&state.items, &annotated_event_id)
627                        {
628                            // Re-add the reaction to the mapping.
629                            let mut reactions =
630                                item.content().reactions().cloned().unwrap_or_default();
631                            reactions
632                                .entry(key.to_owned())
633                                .or_default()
634                                .insert(user_id.to_owned(), reaction_info);
635                            let new_item = item.with_reactions(reactions);
636                            state.items.replace(item_pos, new_item);
637                        } else {
638                            warn!(
639                                "couldn't find item to re-add reaction anymore; \
640                                 maybe it's been redacted?"
641                            );
642                        }
643                    }
644
645                    return Err(err);
646                }
647            }
648        }
649
650        Ok(false)
651    }
652
653    /// Handle updates on events as [`VectorDiff`]s.
654    pub(super) async fn handle_remote_events_with_diffs(
655        &self,
656        diffs: Vec<VectorDiff<TimelineEvent>>,
657        origin: RemoteEventOrigin,
658    ) {
659        if diffs.is_empty() {
660            return;
661        }
662
663        let mut state = self.state.write().await;
664        state
665            .handle_remote_events_with_diffs(
666                diffs,
667                origin,
668                &self.room_data_provider,
669                &self.settings,
670            )
671            .await
672    }
673
674    /// Only handle aggregations received as [`VectorDiff`]s.
675    pub(super) async fn handle_remote_aggregations(
676        &self,
677        diffs: Vec<VectorDiff<TimelineEvent>>,
678        origin: RemoteEventOrigin,
679    ) {
680        if diffs.is_empty() {
681            return;
682        }
683
684        let mut state = self.state.write().await;
685        state
686            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
687            .await
688    }
689
690    pub(super) async fn clear(&self) {
691        self.state.write().await.clear();
692    }
693
694    /// Replaces the content of the current timeline with initial events.
695    ///
696    /// Also sets up read receipts and the read marker for a live timeline of a
697    /// room.
698    ///
699    /// This is all done with a single lock guard, since we don't want the state
700    /// to be modified between the clear and re-insertion of new events.
701    pub(super) async fn replace_with_initial_remote_events<Events>(
702        &self,
703        events: Events,
704        origin: RemoteEventOrigin,
705    ) where
706        Events: IntoIterator,
707        <Events as IntoIterator>::Item: Into<TimelineEvent>,
708    {
709        let mut state = self.state.write().await;
710
711        let track_read_markers = &self.settings.track_read_receipts;
712        if track_read_markers.is_enabled() {
713            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
714            state
715                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
716                .await;
717        }
718
719        // Replace the events if either the current event list or the new one aren't
720        // empty.
721        // Previously we just had to check the new one wasn't empty because
722        // we did a clear operation before so the current one would always be empty, but
723        // now we may want to replace a populated timeline with an empty one.
724        let mut events = events.into_iter().peekable();
725        if !state.items.is_empty() || events.peek().is_some() {
726            state
727                .replace_with_remote_events(
728                    events,
729                    origin,
730                    &self.room_data_provider,
731                    &self.settings,
732                )
733                .await;
734        }
735
736        if track_read_markers.is_enabled() {
737            if let Some(fully_read_event_id) =
738                self.room_data_provider.load_fully_read_marker().await
739            {
740                state.handle_fully_read_marker(fully_read_event_id);
741            } else if let Some(latest_receipt_event_id) = state
742                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
743            {
744                // Fall back to read receipt if no fully read marker exists.
745                debug!("no `m.fully_read` marker found, falling back to read receipt");
746                state.handle_fully_read_marker(latest_receipt_event_id);
747            }
748        }
749    }
750
751    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
752        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
753    }
754
755    pub(super) async fn handle_ephemeral_events(
756        &self,
757        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
758    ) {
759        // Don't even take the lock if there are no events to process.
760        if events.is_empty() {
761            return;
762        }
763        let mut state = self.state.write().await;
764        state.handle_ephemeral_events(events, &self.room_data_provider).await;
765    }
766
767    /// Creates the local echo for an event we're sending.
768    #[instrument(skip_all)]
769    pub(super) async fn handle_local_event(
770        &self,
771        txn_id: OwnedTransactionId,
772        content: AnyMessageLikeEventContent,
773        send_handle: Option<SendHandle>,
774    ) {
775        let sender = self.room_data_provider.own_user_id().to_owned();
776        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
777
778        let date_divider_mode = self.settings.date_divider_mode.clone();
779
780        let mut state = self.state.write().await;
781        state
782            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
783            .await;
784    }
785
786    /// Update the send state of a local event represented by a transaction ID.
787    ///
788    /// If the corresponding local timeline item is missing, a warning is
789    /// raised.
790    #[instrument(skip(self))]
791    pub(super) async fn update_event_send_state(
792        &self,
793        txn_id: &TransactionId,
794        send_state: EventSendState,
795    ) {
796        let mut state = self.state.write().await;
797        let mut txn = state.transaction();
798
799        let new_event_id: Option<&EventId> =
800            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
801
802        // The local echoes are always at the end of the timeline, we must first make
803        // sure the remote echo hasn't showed up yet.
804        if rfind_event_item(&txn.items, |it| {
805            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
806        })
807        .is_some()
808        {
809            // Remote echo already received. This is very unlikely.
810            trace!("Remote echo received before send-event response");
811
812            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
813
814            // If there's both the remote echo and a local echo, that means the
815            // remote echo was received before the response *and* contained no
816            // transaction ID (and thus duplicated the local echo).
817            if let Some((idx, _)) = local_echo {
818                warn!("Message echo got duplicated, removing the local one");
819                txn.items.remove(idx);
820
821                // Adjust the date dividers, if needs be.
822                let mut adjuster =
823                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
824                adjuster.run(&mut txn.items, &mut txn.meta);
825            }
826
827            txn.commit();
828            return;
829        }
830
831        // Look for the local event by the transaction ID or event ID.
832        let result = rfind_event_item(&txn.items, |it| {
833            it.transaction_id() == Some(txn_id)
834                || new_event_id.is_some()
835                    && it.event_id() == new_event_id
836                    && it.as_local().is_some()
837        });
838
839        let Some((idx, item)) = result else {
840            // Event wasn't found as a standalone item.
841            //
842            // If it was just sent, try to find if it matches a corresponding aggregation,
843            // and mark it as sent in that case.
844            if let Some(new_event_id) = new_event_id {
845                if txn.meta.aggregations.mark_aggregation_as_sent(
846                    txn_id.to_owned(),
847                    new_event_id.to_owned(),
848                    &mut txn.items,
849                    &txn.meta.room_version_rules,
850                ) {
851                    trace!("Aggregation marked as sent");
852                    txn.commit();
853                    return;
854                }
855
856                trace!("Sent aggregation was not found");
857            }
858
859            warn!("Timeline item not found, can't update send state");
860            return;
861        };
862
863        let Some(local_item) = item.as_local() else {
864            warn!("We looked for a local item, but it transitioned to remote.");
865            return;
866        };
867
868        // The event was already marked as sent, that's a broken state, let's
869        // emit an error but also override to the given sent state.
870        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
871            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
872        }
873
874        // If the event has just been marked as sent, update the aggregations mapping to
875        // take that into account.
876        if let Some(new_event_id) = new_event_id {
877            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
878        }
879
880        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
881        txn.items.replace(idx, new_item);
882
883        txn.commit();
884    }
885
886    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
887        let mut state = self.state.write().await;
888
889        if let Some((idx, _)) =
890            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
891        {
892            let mut txn = state.transaction();
893
894            txn.items.remove(idx);
895
896            // A read marker or a date divider may have been inserted before the local echo.
897            // Ensure both are up to date.
898            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
899            adjuster.run(&mut txn.items, &mut txn.meta);
900
901            txn.meta.update_read_marker(&mut txn.items);
902
903            txn.commit();
904
905            debug!("discarded local echo");
906            return true;
907        }
908
909        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
910        // dereferencing it once.
911        let mut txn = state.transaction();
912
913        // Look if this was a local aggregation.
914        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
915            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
916            &mut txn.items,
917        ) {
918            Ok(val) => val,
919            Err(err) => {
920                warn!("error when discarding local echo for an aggregation: {err}");
921                // The aggregation has been found, it's just that we couldn't discard it.
922                true
923            }
924        };
925
926        if found_aggregation {
927            txn.commit();
928        }
929
930        found_aggregation
931    }
932
933    pub(super) async fn replace_local_echo(
934        &self,
935        txn_id: &TransactionId,
936        content: AnyMessageLikeEventContent,
937    ) -> bool {
938        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
939            // Ideally, we'd support replacing local echoes for a reaction, etc., but
940            // handling RoomMessage should be sufficient in most cases. Worst
941            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
942            // editing the event then.
943            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
944            return false;
945        };
946
947        let mut state = self.state.write().await;
948        let mut txn = state.transaction();
949
950        let Some((idx, prev_item)) =
951            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
952        else {
953            debug!("Can't find local echo to replace");
954            return false;
955        };
956
957        // Reuse the previous local echo's state, but reset the send state to not sent
958        // (per API contract).
959        let ti_kind = {
960            let Some(prev_local_item) = prev_item.as_local() else {
961                warn!("We looked for a local item, but it transitioned as remote??");
962                return false;
963            };
964            // If the local echo had an upload progress, retain it.
965            let progress = as_variant!(&prev_local_item.send_state,
966                EventSendState::NotSentYet { progress } => progress.clone())
967            .flatten();
968            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
969        };
970
971        // Replace the local-related state (kind) and the content state.
972        let new_item = TimelineItem::new(
973            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
974                content.msgtype,
975                content.mentions,
976                prev_item.content().reactions().cloned().unwrap_or_default(),
977                prev_item.content().thread_root(),
978                prev_item.content().in_reply_to(),
979                prev_item.content().thread_summary(),
980            )),
981            prev_item.internal_id.to_owned(),
982        );
983
984        txn.items.replace(idx, new_item);
985
986        // This doesn't change the original sending time, so there's no need to adjust
987        // date dividers.
988
989        txn.commit();
990
991        debug!("Replaced local echo");
992        true
993    }
994
995    pub(super) async fn compute_redecryption_candidates(
996        &self,
997    ) -> (BTreeSet<String>, BTreeSet<String>) {
998        let state = self.state.read().await;
999        compute_redecryption_candidates(&state.items)
1000    }
1001
1002    pub(super) async fn set_sender_profiles_pending(&self) {
1003        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1004    }
1005
1006    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1007        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1008    }
1009
1010    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1011        self.state.write().await.items.for_each(|mut entry| {
1012            let Some(event_item) = entry.as_event() else { return };
1013            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1014                let new_item = entry.with_kind(TimelineItemKind::Event(
1015                    event_item.with_sender_profile(profile_state.clone()),
1016                ));
1017                ObservableItemsEntry::replace(&mut entry, new_item);
1018            }
1019        });
1020    }
1021
1022    pub(super) async fn update_missing_sender_profiles(&self) {
1023        trace!("Updating missing sender profiles");
1024
1025        let mut state = self.state.write().await;
1026        let mut entries = state.items.entries();
1027        while let Some(mut entry) = entries.next() {
1028            let Some(event_item) = entry.as_event() else { continue };
1029            let event_id = event_item.event_id().map(debug);
1030            let transaction_id = event_item.transaction_id().map(debug);
1031
1032            if event_item.sender_profile().is_ready() {
1033                trace!(event_id, transaction_id, "Profile already set");
1034                continue;
1035            }
1036
1037            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1038                Some(profile) => {
1039                    trace!(event_id, transaction_id, "Adding profile");
1040                    let updated_item =
1041                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1042                    let new_item = entry.with_kind(updated_item);
1043                    ObservableItemsEntry::replace(&mut entry, new_item);
1044                }
1045                None => {
1046                    if !event_item.sender_profile().is_unavailable() {
1047                        trace!(event_id, transaction_id, "Marking profile unavailable");
1048                        let updated_item =
1049                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1050                        let new_item = entry.with_kind(updated_item);
1051                        ObservableItemsEntry::replace(&mut entry, new_item);
1052                    } else {
1053                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1054                    }
1055                }
1056            }
1057        }
1058
1059        trace!("Done updating missing sender profiles");
1060    }
1061
1062    /// Update the profiles of the given senders, even if they are ready.
1063    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1064        trace!("Forcing update of sender profiles: {sender_ids:?}");
1065
1066        let mut state = self.state.write().await;
1067        let mut entries = state.items.entries();
1068        while let Some(mut entry) = entries.next() {
1069            let Some(event_item) = entry.as_event() else { continue };
1070            if !sender_ids.contains(event_item.sender()) {
1071                continue;
1072            }
1073
1074            let event_id = event_item.event_id().map(debug);
1075            let transaction_id = event_item.transaction_id().map(debug);
1076
1077            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1078                Some(profile) => {
1079                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1080                    {
1081                        debug!(event_id, transaction_id, "Profile already up-to-date");
1082                    } else {
1083                        trace!(event_id, transaction_id, "Updating profile");
1084                        let updated_item =
1085                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1086                        let new_item = entry.with_kind(updated_item);
1087                        ObservableItemsEntry::replace(&mut entry, new_item);
1088                    }
1089                }
1090                None => {
1091                    if !event_item.sender_profile().is_unavailable() {
1092                        trace!(event_id, transaction_id, "Marking profile unavailable");
1093                        let updated_item =
1094                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1095                        let new_item = entry.with_kind(updated_item);
1096                        ObservableItemsEntry::replace(&mut entry, new_item);
1097                    } else {
1098                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1099                    }
1100                }
1101            }
1102        }
1103
1104        trace!("Done forcing update of sender profiles");
1105    }
1106
1107    #[cfg(test)]
1108    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1109        let own_user_id = self.room_data_provider.own_user_id();
1110        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1111    }
1112
1113    /// Get the latest read receipt for the given user.
1114    ///
1115    /// Useful to get the latest read receipt, whether it's private or public.
1116    pub(super) async fn latest_user_read_receipt(
1117        &self,
1118        user_id: &UserId,
1119    ) -> Option<(OwnedEventId, Receipt)> {
1120        let receipt_thread = self.focus.receipt_thread();
1121
1122        self.state
1123            .read()
1124            .await
1125            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1126            .await
1127    }
1128
1129    /// Get the ID of the timeline event with the latest read receipt for the
1130    /// given user.
1131    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1132        &self,
1133        user_id: &UserId,
1134    ) -> Option<OwnedEventId> {
1135        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1136    }
1137
1138    /// Subscribe to changes in the read receipts of our own user.
1139    pub async fn subscribe_own_user_read_receipts_changed(
1140        &self,
1141    ) -> impl Stream<Item = ()> + use<P> {
1142        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1143    }
1144
1145    /// Handle a room send update that's a new local echo.
1146    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1147        match echo.content {
1148            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1149                let content = match serialized_event.deserialize() {
1150                    Ok(d) => d,
1151                    Err(err) => {
1152                        warn!("error deserializing local echo: {err}");
1153                        return;
1154                    }
1155                };
1156
1157                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1158                    .await;
1159
1160                if let Some(send_error) = send_error {
1161                    self.update_event_send_state(
1162                        &echo.transaction_id,
1163                        EventSendState::SendingFailed {
1164                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1165                                send_error,
1166                            ))),
1167                            is_recoverable: false,
1168                        },
1169                    )
1170                    .await;
1171                }
1172            }
1173
1174            LocalEchoContent::React { key, send_handle, applies_to } => {
1175                self.handle_local_reaction(key, send_handle, applies_to).await;
1176            }
1177
1178            LocalEchoContent::Redaction { redacts, send_error, .. } => {
1179                self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
1180
1181                if let Some(send_error) = send_error {
1182                    self.update_event_send_state(
1183                        &echo.transaction_id,
1184                        EventSendState::SendingFailed {
1185                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1186                                send_error,
1187                            ))),
1188                            is_recoverable: false,
1189                        },
1190                    )
1191                    .await;
1192                }
1193            }
1194        }
1195    }
1196
1197    /// Adds a reaction (local echo) to a local echo.
1198    #[instrument(skip(self, send_handle))]
1199    async fn handle_local_reaction(
1200        &self,
1201        reaction_key: String,
1202        send_handle: SendReactionHandle,
1203        applies_to: OwnedTransactionId,
1204    ) {
1205        let mut state = self.state.write().await;
1206        let mut tr = state.transaction();
1207
1208        let target = TimelineEventItemId::TransactionId(applies_to);
1209
1210        let reaction_txn_id = send_handle.transaction_id().to_owned();
1211        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1212        let aggregation = Aggregation::new(
1213            TimelineEventItemId::TransactionId(reaction_txn_id),
1214            AggregationKind::Reaction {
1215                key: reaction_key.clone(),
1216                sender: self.room_data_provider.own_user_id().to_owned(),
1217                timestamp: MilliSecondsSinceUnixEpoch::now(),
1218                reaction_status,
1219            },
1220        );
1221
1222        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1223        find_item_and_apply_aggregation(
1224            &tr.meta.aggregations,
1225            &mut tr.items,
1226            &target,
1227            aggregation,
1228            &tr.meta.room_version_rules,
1229        );
1230
1231        tr.commit();
1232    }
1233
1234    /// Applies a local echo of a redaction.
1235    pub(super) async fn handle_local_redaction(
1236        &self,
1237        txn_id: OwnedTransactionId,
1238        redacts: OwnedEventId,
1239    ) {
1240        let mut state = self.state.write().await;
1241        let mut tr = state.transaction();
1242
1243        let target = TimelineEventItemId::EventId(redacts);
1244
1245        let aggregation = Aggregation::new(
1246            TimelineEventItemId::TransactionId(txn_id),
1247            AggregationKind::Redaction { is_local: true },
1248        );
1249
1250        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1251        find_item_and_apply_aggregation(
1252            &tr.meta.aggregations,
1253            &mut tr.items,
1254            &target,
1255            aggregation,
1256            &tr.meta.room_version_rules,
1257        );
1258
1259        tr.commit();
1260    }
1261
1262    /// Handle a single room send queue update.
1263    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1264        match update {
1265            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1266                self.handle_local_echo(echo).await;
1267            }
1268
1269            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1270                if !self.discard_local_echo(&transaction_id).await {
1271                    warn!("couldn't find the local echo to discard");
1272                }
1273            }
1274
1275            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1276                let content = match new_content.deserialize() {
1277                    Ok(d) => d,
1278                    Err(err) => {
1279                        warn!("error deserializing local echo (upon edit): {err}");
1280                        return;
1281                    }
1282                };
1283
1284                if !self.replace_local_echo(&transaction_id, content).await {
1285                    warn!("couldn't find the local echo to replace");
1286                }
1287            }
1288
1289            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1290                self.update_event_send_state(
1291                    &transaction_id,
1292                    EventSendState::SendingFailed { error, is_recoverable },
1293                )
1294                .await;
1295            }
1296
1297            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1298                self.update_event_send_state(
1299                    &transaction_id,
1300                    EventSendState::NotSentYet { progress: None },
1301                )
1302                .await;
1303            }
1304
1305            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1306                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1307                    .await;
1308            }
1309
1310            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1311                self.update_event_send_state(
1312                    &related_to,
1313                    EventSendState::NotSentYet {
1314                        progress: Some(MediaUploadProgress { index, progress }),
1315                    },
1316                )
1317                .await;
1318            }
1319        }
1320    }
1321
1322    /// Insert a timeline start item at the beginning of the room, if it's
1323    /// missing.
1324    pub async fn insert_timeline_start_if_missing(&self) {
1325        let mut state = self.state.write().await;
1326        let mut txn = state.transaction();
1327        txn.items.push_timeline_start_if_missing(
1328            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1329        );
1330        txn.commit();
1331    }
1332
1333    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1334    /// timeline or not.
1335    ///
1336    /// Can be `None` if the event cannot be represented as a standalone item,
1337    /// because it's an aggregation.
1338    pub(super) async fn make_replied_to(
1339        &self,
1340        event: TimelineEvent,
1341    ) -> Result<Option<EmbeddedEvent>, Error> {
1342        let state = self.state.read().await;
1343        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1344    }
1345}
1346
1347impl TimelineController {
1348    pub(super) fn room(&self) -> &Room {
1349        &self.room_data_provider
1350    }
1351
1352    /// Initializes the configured timeline focus with appropriate data.
1353    ///
1354    /// Should be called only once after creation of the [`TimelineController`],
1355    /// with all its fields set.
1356    pub(super) async fn init_focus(&self) -> Result<InitFocusResult, Error> {
1357        match self.focus.deref() {
1358            TimelineFocusKind::Live { event_cache, .. } => {
1359                // Retrieve the cached events, and add them to the timeline.
1360                let events = event_cache.events().await?;
1361
1362                let has_events = !events.is_empty();
1363
1364                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1365
1366                match event_cache.pagination().status().get() {
1367                    PaginationStatus::Idle { hit_timeline_start } => {
1368                        if hit_timeline_start {
1369                            // Eagerly insert the timeline start item, since pagination claims
1370                            // we've already hit the timeline start.
1371                            self.insert_timeline_start_if_missing().await;
1372                        }
1373                    }
1374                    PaginationStatus::Paginating => {}
1375                }
1376
1377                Ok(InitFocusResult { has_events, focus_task: None })
1378            }
1379
1380            TimelineFocusKind::Event {
1381                focused_event_id: event_id,
1382                thread_mode,
1383                thread_root: focus_thread_root,
1384                event_cache,
1385                ..
1386            } => {
1387                let (events, receiver) = event_cache.subscribe().await?;
1388
1389                let has_events = !events.is_empty();
1390
1391                // Ask the cache for the thread root, if it managed to extract one or decided
1392                // that the target event was the thread root.
1393                if let Some(thread_root) = event_cache.thread_root().await? {
1394                    focus_thread_root.get_or_init(|| thread_root);
1395                }
1396
1397                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
1398                    .await;
1399
1400                let task = self
1401                    .room_data_provider
1402                    .client()
1403                    .task_monitor()
1404                    .spawn_infinite_task(
1405                        "timeline::event_focused_cache_updates",
1406                        event_focused_task(
1407                            event_id.clone(),
1408                            (*thread_mode).into(),
1409                            event_cache.clone(),
1410                            self.clone(),
1411                            receiver,
1412                        ),
1413                    )
1414                    .abort_on_drop();
1415
1416                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1417            }
1418
1419            TimelineFocusKind::Thread { event_cache, .. } => {
1420                let (has_events, receiver) = self.init_with_thread_root(event_cache).await?;
1421
1422                let room = &self.room_data_provider;
1423                let span = info_span!(
1424                    parent: Span::none(),
1425                    "thread_live_update_handler",
1426                    room_id = ?room.room_id(),
1427                );
1428                span.follows_from(Span::current());
1429
1430                let task = room
1431                    .client()
1432                    .task_monitor()
1433                    .spawn_infinite_task(
1434                        "timeline::thread_event_cache_updates",
1435                        thread_updates_task(receiver, event_cache.clone(), self.clone())
1436                            .instrument(span),
1437                    )
1438                    .abort_on_drop();
1439
1440                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1441            }
1442
1443            TimelineFocusKind::PinnedEvents { event_cache } => {
1444                let (initial_events, pinned_events_recv) = event_cache.subscribe().await?;
1445
1446                let has_events = !initial_events.is_empty();
1447
1448                self.replace_with_initial_remote_events(
1449                    initial_events,
1450                    RemoteEventOrigin::Pagination,
1451                )
1452                .await;
1453
1454                let task = self
1455                    .room_data_provider
1456                    .client()
1457                    .task_monitor()
1458                    .spawn_infinite_task(
1459                        "timeline::pinned_events_cache_updates",
1460                        pinned_events_task(event_cache.clone(), self.clone(), pinned_events_recv),
1461                    )
1462                    .abort_on_drop();
1463
1464                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1465            }
1466        }
1467    }
1468
1469    /// (Re-)initialise a timeline using [`TimelineFocus::Thread`] with cached
1470    /// threaded events and secondary relations.
1471    ///
1472    /// Returns whether there were any events added to the timeline, and a
1473    /// receiver to return updates after the initial events have been
1474    /// inserted in the timeline.
1475    pub(super) async fn init_with_thread_root(
1476        &self,
1477        event_cache: &ThreadEventCache,
1478    ) -> Result<(bool, broadcast::Receiver<TimelineVectorDiffs>), Error> {
1479        let (events, receiver) = event_cache.subscribe().await?;
1480        let has_events = !events.is_empty();
1481
1482        // For each event, we also need to find the related events, as they don't
1483        // include the thread relationship, they won't be included in
1484        // the initial list of events.
1485        let mut related_events = Vector::new();
1486        for event_id in events.iter().filter_map(|event| event.event_id()) {
1487            if let Some((_original, related)) =
1488                event_cache.find_event_with_relations(event_id, None).await?
1489            {
1490                related_events.extend(related);
1491            }
1492        }
1493
1494        self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1495
1496        // Now that we've inserted the thread events, add the aggregations too.
1497        if !related_events.is_empty() {
1498            self.handle_remote_aggregations(
1499                vec![VectorDiff::Append { values: related_events }],
1500                RemoteEventOrigin::Cache,
1501            )
1502            .await;
1503        }
1504
1505        Ok((has_events, receiver))
1506    }
1507
1508    /// Given an event identifier, will fetch the details for the event it's
1509    /// replying to, if applicable.
1510    #[instrument(skip(self))]
1511    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1512        let state_guard = self.state.write().await;
1513        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1514            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1515        let remote_item = item
1516            .as_remote()
1517            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1518            .clone();
1519
1520        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1521            debug!("Event is not a message");
1522            return Ok(());
1523        };
1524        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1525            debug!("Event is not a reply");
1526            return Ok(());
1527        };
1528        if let TimelineDetails::Pending = &in_reply_to.event {
1529            debug!("Replied-to event is already being fetched");
1530            return Ok(());
1531        }
1532        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1533            debug!("Replied-to event has already been fetched");
1534            return Ok(());
1535        }
1536
1537        let internal_id = item.internal_id.to_owned();
1538        let item = item.clone();
1539        let event = fetch_replied_to_event(
1540            state_guard,
1541            &self.state,
1542            index,
1543            &item,
1544            internal_id,
1545            &msglike,
1546            &in_reply_to.event_id,
1547            self.room(),
1548        )
1549        .await?;
1550
1551        // We need to be sure to have the latest position of the event as it might have
1552        // changed while waiting for the request.
1553        let mut state = self.state.write().await;
1554        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1555            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1556
1557        // Check the state of the event again, it might have been redacted while
1558        // the request was in-flight.
1559        let TimelineItemContent::MsgLike(MsgLikeContent {
1560            kind: MsgLikeKind::Message(message),
1561            reactions,
1562            thread_root,
1563            in_reply_to,
1564            thread_summary,
1565        }) = item.content().clone()
1566        else {
1567            info!("Event is no longer a message (redacted?)");
1568            return Ok(());
1569        };
1570        let Some(in_reply_to) = in_reply_to else {
1571            warn!("Event no longer has a reply (bug?)");
1572            return Ok(());
1573        };
1574
1575        // Now that we've received the content of the replied-to event, replace the
1576        // replied-to content in the item with it.
1577        trace!("Updating in-reply-to details");
1578        let internal_id = item.internal_id.to_owned();
1579        let mut item = item.clone();
1580        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1581            kind: MsgLikeKind::Message(message),
1582            reactions,
1583            thread_root,
1584            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1585            thread_summary,
1586        }));
1587        state.items.replace(index, TimelineItem::new(item, internal_id));
1588
1589        Ok(())
1590    }
1591
1592    /// Returns the thread that should be used for a read receipt based on the
1593    /// current focus of the timeline and the receipt type.
1594    ///
1595    /// A `SendReceiptType::FullyRead` will always use
1596    /// `ReceiptThread::Unthreaded`
1597    pub(super) fn infer_thread_for_read_receipt(
1598        &self,
1599        receipt_type: &SendReceiptType,
1600    ) -> ReceiptThread {
1601        if matches!(receipt_type, SendReceiptType::FullyRead) {
1602            ReceiptThread::Unthreaded
1603        } else {
1604            self.focus.receipt_thread()
1605        }
1606    }
1607
1608    /// Check whether the given receipt should be sent.
1609    ///
1610    /// Returns `false` if the given receipt is older than the current one.
1611    pub(super) async fn should_send_receipt(
1612        &self,
1613        receipt_type: &SendReceiptType,
1614        receipt_thread: &ReceiptThread,
1615        event_id: &EventId,
1616    ) -> bool {
1617        let own_user_id = self.room().own_user_id();
1618        let state = self.state.read().await;
1619        let room = self.room();
1620
1621        match receipt_type {
1622            SendReceiptType::Read => {
1623                if let Some((old_pub_read, _)) = state
1624                    .meta
1625                    .user_receipt(
1626                        own_user_id,
1627                        ReceiptType::Read,
1628                        receipt_thread.clone(),
1629                        room,
1630                        state.items.all_remote_events(),
1631                    )
1632                    .await
1633                {
1634                    trace!(%old_pub_read, "found a previous public receipt");
1635                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1636                        &old_pub_read,
1637                        event_id,
1638                        state.items.all_remote_events(),
1639                    ) {
1640                        trace!(
1641                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1642                        );
1643                        return relative_pos == RelativePosition::After;
1644                    }
1645                }
1646            }
1647
1648            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1649            // doesn't make sense to have a private read receipt behind a public one.
1650            SendReceiptType::ReadPrivate => {
1651                if let Some((old_priv_read, _)) =
1652                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1653                {
1654                    trace!(%old_priv_read, "found a previous private receipt");
1655                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1656                        &old_priv_read,
1657                        event_id,
1658                        state.items.all_remote_events(),
1659                    ) {
1660                        trace!(
1661                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1662                        );
1663                        return relative_pos == RelativePosition::After;
1664                    }
1665                }
1666            }
1667
1668            SendReceiptType::FullyRead => {
1669                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1670                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1671                        &prev_event_id,
1672                        event_id,
1673                        state.items.all_remote_events(),
1674                    )
1675                {
1676                    return relative_pos == RelativePosition::After;
1677                }
1678            }
1679
1680            _ => {}
1681        }
1682
1683        // Let the server handle unknown receipts.
1684        true
1685    }
1686
1687    /// Returns the latest event identifier, even if it's not visible, or if
1688    /// it's folded into another timeline item.
1689    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1690        let state = self.state.read().await;
1691        let filter_out_thread_events = match self.focus() {
1692            TimelineFocusKind::Thread { .. } => false,
1693            TimelineFocusKind::Live { hide_threaded_events, .. } => *hide_threaded_events,
1694            TimelineFocusKind::Event { .. } => {
1695                // For event-focused timelines, filtering is handled in the event cache layer.
1696                false
1697            }
1698            TimelineFocusKind::PinnedEvents { .. } => true,
1699        };
1700
1701        state
1702            .items
1703            .all_remote_events()
1704            .iter()
1705            .rev()
1706            .filter_map(|event_meta| {
1707                if !filter_out_thread_events {
1708                    // For an unthreaded timeline, the last event is always the latest event.
1709                    Some(event_meta.event_id.clone())
1710                } else if event_meta.thread_root_id.is_none() {
1711                    // For the main-thread timeline, only non-threaded events are valid candidates
1712                    // for the latest event.
1713                    //
1714                    // But! An event could be an aggregation that relate to an in-thread
1715                    // event. In this case, it's not a valid latest event.
1716                    if let Some(TimelineEventItemId::EventId(target_event_id)) =
1717                        state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1718                            event_meta.event_id.clone(),
1719                        ))
1720                        && let Some(target_meta) =
1721                            state.items.all_remote_events().get_by_event_id(target_event_id)
1722                        && target_meta.thread_root_id.is_some()
1723                    {
1724                        // This event is an aggregation of an in-thread event, so skip it.
1725                        None
1726                    } else {
1727                        // Not in a thread, and not the aggregation of an in-thread event, so it's
1728                        // a valid candidate for the latest event.
1729                        Some(event_meta.event_id.clone())
1730                    }
1731                } else {
1732                    // An in-thread event, when we're filtering out threaded events, is never a
1733                    // valid candidate for the latest event.
1734                    None
1735                }
1736            })
1737            .next()
1738    }
1739
1740    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1741    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1742        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1743
1744        let request = DecryptionRetryRequest {
1745            room_id: self.room().room_id().to_owned(),
1746            utd_session_ids: utds,
1747            refresh_info_session_ids: decrypted,
1748        };
1749
1750        self.room().client().event_cache().request_decryption(request);
1751    }
1752
1753    /// Combine the global (event cache) pagination status with the local state
1754    /// of the timeline.
1755    ///
1756    /// This only changes the global pagination status of this room, in one
1757    /// case: if the timeline has a skip count greater than 0, it will
1758    /// ensure that the pagination status says that we haven't reached the
1759    /// timeline start yet.
1760    pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1761        match status {
1762            PaginationStatus::Idle { hit_timeline_start } => {
1763                if hit_timeline_start {
1764                    let state = self.state.read().await;
1765                    // If the skip count is greater than 0, it means that a subsequent pagination
1766                    // could return more items, so pretend we didn't get the information that the
1767                    // timeline start was hit.
1768                    if state.meta.subscriber_skip_count.get() > 0 {
1769                        return PaginationStatus::Idle { hit_timeline_start: false };
1770                    }
1771                }
1772            }
1773            PaginationStatus::Paginating => {}
1774        }
1775
1776        // You're perfect, just the way you are.
1777        status
1778    }
1779}
1780
1781impl<P: RoomDataProvider> TimelineController<P> {
1782    /// Returns the timeline focus of the [`TimelineController`].
1783    pub(super) fn focus(&self) -> &TimelineFocusKind {
1784        &self.focus
1785    }
1786}
1787
1788#[allow(clippy::too_many_arguments)]
1789async fn fetch_replied_to_event<P: RoomDataProvider>(
1790    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1791    state_lock: &RwLock<TimelineState<P>>,
1792    index: usize,
1793    item: &EventTimelineItem,
1794    internal_id: TimelineUniqueId,
1795    msglike: &MsgLikeContent,
1796    in_reply_to: &EventId,
1797    room: &Room,
1798) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1799    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1800        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1801        trace!("Found replied-to event locally");
1802        return Ok(details);
1803    }
1804
1805    // Replace the item with a new timeline item that has the fetching status of the
1806    // replied-to event to pending.
1807    trace!("Setting in-reply-to details to pending");
1808    let in_reply_to_details =
1809        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1810
1811    let event_item = item
1812        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1813
1814    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1815    state_guard.items.replace(index, new_timeline_item);
1816
1817    // Don't hold the state lock while the network request is made.
1818    drop(state_guard);
1819
1820    trace!("Fetching replied-to event");
1821    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1822        Ok(timeline_event) => {
1823            let state = state_lock.read().await;
1824
1825            let replied_to_item =
1826                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1827
1828            if let Some(item) = replied_to_item {
1829                TimelineDetails::Ready(Box::new(item))
1830            } else {
1831                // The replied-to item is an aggregation, not a standalone item.
1832                return Err(Error::UnsupportedEvent);
1833            }
1834        }
1835
1836        Err(e) => TimelineDetails::Error(Arc::new(e)),
1837    };
1838
1839    Ok(res)
1840}