Skip to main content

matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeSet,
17    fmt,
18    sync::{Arc, OnceLock},
19};
20
21use as_variant::as_variant;
22use eyeball_im::{VectorDiff, VectorSubscriberStream};
23use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26#[cfg(test)]
27use matrix_sdk::Result;
28use matrix_sdk::{
29    deserialized_responses::TimelineEvent,
30    event_cache::{
31        DecryptionRetryRequest, EventFocusThreadMode, PaginationStatus, RoomEventCache,
32        TimelineVectorDiffs,
33    },
34    send_queue::{
35        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
36    },
37    task_monitor::BackgroundTaskHandle,
38};
39#[cfg(test)]
40use ruma::events::receipt::ReceiptEventContent;
41use ruma::{
42    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
43    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
44    events::{
45        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
46        AnySyncTimelineEvent, MessageLikeEventType,
47        poll::unstable_start::UnstablePollStartEventContent,
48        reaction::ReactionEventContent,
49        receipt::{Receipt, ReceiptThread, ReceiptType},
50        relation::Annotation,
51        room::message::{MessageType, Relation},
52    },
53    room_version_rules::RoomVersionRules,
54    serde::Raw,
55};
56use tokio::sync::{RwLock, RwLockWriteGuard, broadcast};
57use tracing::{
58    Instrument as _, Span, debug, error, field::debug, info, info_span, instrument, trace, warn,
59};
60
61pub(super) use self::{
62    metadata::{RelativePosition, TimelineMetadata},
63    observable_items::{
64        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
65        ObservableItemsTransactionEntry,
66    },
67    state::TimelineState,
68    state_transaction::TimelineStateTransaction,
69};
70use super::{
71    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
72    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
73    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
74    TimelineReadReceiptTracking, VirtualTimelineItem,
75    algorithms::{rfind_event_by_id, rfind_event_item},
76    event_item::{ReactionStatus, RemoteEventOrigin},
77    item::TimelineUniqueId,
78    subscriber::TimelineSubscriber,
79    traits::RoomDataProvider,
80};
81use crate::{
82    timeline::{
83        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
84        algorithms::rfind_event_by_item_id,
85        controller::decryption_retry_task::compute_redecryption_candidates,
86        date_dividers::DateDividerAdjuster,
87        event_item::TimelineItemHandle,
88        tasks::{event_focused_task, pinned_events_task, thread_updates_task},
89    },
90    unable_to_decrypt_hook::UtdHookManager,
91};
92
93pub(in crate::timeline) mod aggregations;
94mod decryption_retry_task;
95mod metadata;
96mod observable_items;
97mod read_receipts;
98mod state;
99mod state_transaction;
100
101pub(super) use aggregations::*;
102pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
103
104/// Data associated to the current timeline focus.
105///
106/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
107/// version of it, including extra state that makes it useful over the lifetime
108/// of a timeline.
109#[derive(Debug)]
110pub(in crate::timeline) enum TimelineFocusKind {
111    /// The timeline receives live events from the sync.
112    Live {
113        /// Whether to hide in-thread events from the timeline.
114        hide_threaded_events: bool,
115    },
116
117    /// The timeline is focused on a single event, and it can expand in one
118    /// direction or another.
119    Event {
120        /// The focused event ID.
121        focused_event_id: OwnedEventId,
122
123        /// If the focused event is part or the root of a thread, what's the
124        /// thread root?
125        ///
126        /// This is determined once when initializing the event-focused cache,
127        /// and then it won't change for the duration of this timeline.
128        thread_root: OnceLock<OwnedEventId>,
129
130        /// The thread mode to use for this event-focused timeline, which is
131        /// part of the key for the memoized event-focused cache.
132        thread_mode: TimelineEventFocusThreadMode,
133    },
134
135    /// A live timeline for a thread.
136    Thread {
137        /// The root event for the current thread.
138        root_event_id: OwnedEventId,
139    },
140
141    PinnedEvents,
142}
143
144impl TimelineFocusKind {
145    /// Returns the [`ReceiptThread`] that should be used for the current
146    /// timeline focus.
147    ///
148    /// Live and event timelines will use the unthreaded read receipt type in
149    /// general, unless they hide in-thread events, in which case they will
150    /// use the main thread.
151    pub(super) fn receipt_thread(&self) -> ReceiptThread {
152        if let Some(thread_root) = self.thread_root() {
153            ReceiptThread::Thread(thread_root.to_owned())
154        } else if self.hide_threaded_events() {
155            ReceiptThread::Main
156        } else {
157            ReceiptThread::Unthreaded
158        }
159    }
160
161    /// Whether to hide in-thread events from the timeline.
162    fn hide_threaded_events(&self) -> bool {
163        match self {
164            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
165            TimelineFocusKind::Event { thread_mode, .. } => {
166                matches!(
167                    thread_mode,
168                    TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
169                )
170            }
171            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
172        }
173    }
174
175    /// Whether the focus is on a thread (from a live thread or a thread
176    /// permalink).
177    fn is_thread(&self) -> bool {
178        self.thread_root().is_some()
179    }
180
181    /// If the focus is a thread, returns its root event ID.
182    fn thread_root(&self) -> Option<&EventId> {
183        match self {
184            TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
185            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
186            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
187        }
188    }
189}
190
191#[derive(Clone, Debug)]
192pub(super) struct TimelineController<P: RoomDataProvider = Room> {
193    /// Inner mutable state.
194    state: Arc<RwLock<TimelineState<P>>>,
195
196    /// Focus data.
197    focus: Arc<TimelineFocusKind>,
198
199    /// A [`RoomDataProvider`] implementation, providing data.
200    ///
201    /// The type is a `RoomDataProvider` to allow testing. In the real world,
202    /// this would normally be a [`Room`].
203    pub(crate) room_data_provider: P,
204
205    /// Settings applied to this timeline.
206    pub(super) settings: TimelineSettings,
207}
208
209#[derive(Clone)]
210pub(super) struct TimelineSettings {
211    /// Should the read receipts and read markers be handled and on which event
212    /// types?
213    pub(super) track_read_receipts: TimelineReadReceiptTracking,
214
215    /// Event filter that controls what's rendered as a timeline item (and thus
216    /// what can carry read receipts).
217    pub(super) event_filter: Arc<TimelineEventFilterFn>,
218
219    /// Are unparsable events added as timeline items of their own kind?
220    pub(super) add_failed_to_parse: bool,
221
222    /// Should the timeline items be grouped by day or month?
223    pub(super) date_divider_mode: DateDividerMode,
224}
225
226#[cfg(not(tarpaulin_include))]
227impl fmt::Debug for TimelineSettings {
228    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229        f.debug_struct("TimelineSettings")
230            .field("track_read_receipts", &self.track_read_receipts)
231            .field("add_failed_to_parse", &self.add_failed_to_parse)
232            .finish_non_exhaustive()
233    }
234}
235
236impl Default for TimelineSettings {
237    fn default() -> Self {
238        Self {
239            track_read_receipts: TimelineReadReceiptTracking::Disabled,
240            event_filter: Arc::new(default_event_filter),
241            add_failed_to_parse: true,
242            date_divider_mode: DateDividerMode::Daily,
243        }
244    }
245}
246
247/// The default event filter for
248/// [`crate::timeline::TimelineBuilder::event_filter`].
249///
250/// It filters out events that are not rendered by the timeline, including but
251/// not limited to: reactions, edits, redactions on existing messages.
252///
253/// If you have a custom filter, it may be best to chain yours with this one if
254/// you do not want to run into situations where a read receipt is not visible
255/// because it's living on an event that doesn't have a matching timeline item.
256pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
257    match event {
258        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
259            if ev.redacts(&rules.redaction).is_some() {
260                // This is a redaction of an existing message, we'll only update the previous
261                // message and not render a new entry.
262                false
263            } else {
264                // This is a redacted entry, that we'll show only if the redacted entity wasn't
265                // a reaction.
266                ev.event_type() != MessageLikeEventType::Reaction
267            }
268        }
269
270        AnySyncTimelineEvent::MessageLike(msg) => {
271            match msg.original_content() {
272                None => {
273                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
274                    // a reaction.
275                    msg.event_type() != MessageLikeEventType::Reaction
276                }
277
278                Some(original_content) => {
279                    match original_content {
280                        AnyMessageLikeEventContent::RoomMessage(content) => {
281                            if content
282                                .relates_to
283                                .as_ref()
284                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
285                            {
286                                // Edits aren't visible by default.
287                                return false;
288                            }
289
290                            match content.msgtype {
291                                MessageType::Audio(_)
292                                | MessageType::Emote(_)
293                                | MessageType::File(_)
294                                | MessageType::Image(_)
295                                | MessageType::Location(_)
296                                | MessageType::Notice(_)
297                                | MessageType::ServerNotice(_)
298                                | MessageType::Text(_)
299                                | MessageType::Video(_)
300                                | MessageType::VerificationRequest(_) => true,
301                                #[cfg(feature = "unstable-msc4274")]
302                                MessageType::Gallery(_) => true,
303                                _ => false,
304                            }
305                        }
306
307                        AnyMessageLikeEventContent::Sticker(_)
308                        | AnyMessageLikeEventContent::UnstablePollStart(
309                            UnstablePollStartEventContent::New(_),
310                        )
311                        | AnyMessageLikeEventContent::CallInvite(_)
312                        | AnyMessageLikeEventContent::RtcNotification(_)
313                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
314
315                        // Beacon location-update events are aggregated onto
316                        // their parent `beacon_info` state event's timeline
317                        // item. They are never rendered as standalone items.
318                        AnyMessageLikeEventContent::Beacon(_) => false,
319
320                        _ => false,
321                    }
322                }
323            }
324        }
325
326        AnySyncTimelineEvent::State(_) => {
327            // All the state events may get displayed by default.
328            true
329        }
330    }
331}
332
333/// Result of calling [`TimelineController::init_focus`].
334pub(super) struct InitFocusResult {
335    /// Did the initialization result in having some events in the timeline?
336    pub has_events: bool,
337    /// If the timeline is a non-live timeline, an extra task that subscribes to
338    /// changes to the focus source.
339    pub focus_task: Option<BackgroundTaskHandle>,
340}
341
342impl<P: RoomDataProvider> TimelineController<P> {
343    pub(super) fn new(
344        room_data_provider: P,
345        focus: TimelineFocus,
346        internal_id_prefix: Option<String>,
347        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
348        is_room_encrypted: bool,
349        settings: TimelineSettings,
350    ) -> Self {
351        let focus = match focus {
352            TimelineFocus::Live { hide_threaded_events } => {
353                TimelineFocusKind::Live { hide_threaded_events }
354            }
355
356            TimelineFocus::Event { target, thread_mode, .. } => {
357                TimelineFocusKind::Event {
358                    focused_event_id: target,
359                    // This will be initialized in `Self::init_focus`.
360                    thread_root: OnceLock::new(),
361                    thread_mode,
362                }
363            }
364
365            TimelineFocus::Thread { root_event_id, .. } => {
366                TimelineFocusKind::Thread { root_event_id }
367            }
368
369            TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
370        };
371
372        let focus = Arc::new(focus);
373        let state = Arc::new(RwLock::new(TimelineState::new(
374            focus.clone(),
375            room_data_provider.own_user_id().to_owned(),
376            room_data_provider.room_version_rules(),
377            internal_id_prefix,
378            unable_to_decrypt_hook,
379            is_room_encrypted,
380        )));
381
382        Self { state, focus, room_data_provider, settings }
383    }
384
385    /// Listens to encryption state changes for the room in
386    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
387    /// existing timeline items. This will then cause a refresh of those
388    /// timeline items.
389    pub async fn handle_encryption_state_changes(&self) {
390        let mut room_info = self.room_data_provider.room_info();
391
392        // Small function helper to help mark as encrypted.
393        let mark_encrypted = || async {
394            let mut state = self.state.write().await;
395            state.meta.is_room_encrypted = true;
396            state.mark_all_events_as_encrypted();
397        };
398
399        if room_info.get().encryption_state().is_encrypted() {
400            // If the room was already encrypted, it won't toggle to unencrypted, so we can
401            // shut down this task early.
402            mark_encrypted().await;
403            return;
404        }
405
406        while let Some(info) = room_info.next().await {
407            if info.encryption_state().is_encrypted() {
408                mark_encrypted().await;
409                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
410                // here is done.
411                break;
412            }
413        }
414    }
415
416    /// Run a lazy backwards pagination (in live mode).
417    ///
418    /// It adjusts the `count` value of the `Skip` higher-order stream so that
419    /// more items are pushed front in the timeline.
420    ///
421    /// If no more items are available (i.e. if the `count` is zero), this
422    /// method returns `Some(needs)` where `needs` is the number of events that
423    /// must be unlazily backwards paginated.
424    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
425        let state = self.state.read().await;
426
427        let (count, needs) = state
428            .meta
429            .subscriber_skip_count
430            .compute_next_when_paginating_backwards(num_events.into());
431
432        // This always happens on a live timeline.
433        let is_live_timeline = true;
434        state.meta.subscriber_skip_count.update(count, is_live_timeline);
435
436        needs
437    }
438
439    /// Is this timeline receiving events from sync (aka has a live focus)?
440    pub(super) fn is_live(&self) -> bool {
441        matches!(&*self.focus, TimelineFocusKind::Live { .. })
442    }
443
444    /// Is this timeline focused on a thread?
445    pub(super) fn is_threaded(&self) -> bool {
446        self.focus.is_thread()
447    }
448
449    /// The root of the current thread, for a live thread timeline or a
450    /// permalink to a thread message.
451    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
452        self.focus.thread_root().map(ToOwned::to_owned)
453    }
454
455    /// Get a copy of the current items in the list.
456    ///
457    /// Cheap because `im::Vector` is cheap to clone.
458    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
459        self.state.read().await.items.clone_items()
460    }
461
462    #[cfg(test)]
463    pub(super) async fn subscribe_raw(
464        &self,
465    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
466        self.state.read().await.items.subscribe().into_values_and_stream()
467    }
468
469    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
470        let state = self.state.read().await;
471
472        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
473    }
474
475    pub(super) async fn subscribe_filter_map<U, F>(
476        &self,
477        f: F,
478    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
479    where
480        U: Clone,
481        F: Fn(Arc<TimelineItem>) -> Option<U>,
482    {
483        self.state.read().await.items.subscribe().filter_map(f)
484    }
485
486    /// Toggle a reaction locally.
487    ///
488    /// Returns true if the reaction was added, false if it was removed.
489    #[instrument(skip_all)]
490    pub(super) async fn toggle_reaction_local(
491        &self,
492        item_id: &TimelineEventItemId,
493        key: &str,
494    ) -> Result<bool, Error> {
495        let mut state = self.state.write().await;
496
497        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
498            warn!("Timeline item not found, can't add reaction");
499            return Err(Error::FailedToToggleReaction);
500        };
501
502        let user_id = self.room_data_provider.own_user_id();
503        let prev_status = item
504            .content()
505            .reactions()
506            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
507
508        let Some(prev_status) = prev_status else {
509            // Adding the new reaction.
510            match item.handle() {
511                TimelineItemHandle::Local(send_handle) => {
512                    if send_handle
513                        .react(key.to_owned())
514                        .await
515                        .map_err(|err| Error::SendQueueError(err.into()))?
516                        .is_some()
517                    {
518                        trace!("adding a reaction to a local echo");
519                        return Ok(true);
520                    }
521
522                    warn!("couldn't toggle reaction for local echo");
523                    return Ok(false);
524                }
525
526                TimelineItemHandle::Remote(event_id) => {
527                    // Add a reaction through the room data provider.
528                    // No need to reflect the effect locally, since the local echo handling will
529                    // take care of it.
530                    trace!("adding a reaction to a remote echo");
531                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
532                    self.room_data_provider
533                        .send(ReactionEventContent::from(annotation).into())
534                        .await?;
535                    return Ok(true);
536                }
537            }
538        };
539
540        trace!("removing a previous reaction");
541        match prev_status {
542            ReactionStatus::LocalToLocal(send_reaction_handle) => {
543                if let Some(handle) = send_reaction_handle {
544                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
545                        // Impossible state: the reaction has moved from local to echo under our
546                        // feet, but the timeline was supposed to be locked!
547                        warn!("unexpectedly unable to abort sending of local reaction");
548                    }
549                } else {
550                    warn!("no send reaction handle (this should only happen in testing contexts)");
551                }
552            }
553
554            ReactionStatus::LocalToRemote(send_handle) => {
555                // No need to reflect the change ourselves, since handling the discard of the
556                // local echo will take care of it.
557                trace!("aborting send of the previous reaction that was a local echo");
558                if let Some(handle) = send_handle {
559                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
560                        // Impossible state: the reaction has moved from local to echo under our
561                        // feet, but the timeline was supposed to be locked!
562                        warn!("unexpectedly unable to abort sending of local reaction");
563                    }
564                } else {
565                    warn!("no send handle (this should only happen in testing contexts)");
566                }
567            }
568
569            ReactionStatus::RemoteToRemote(event_id) => {
570                // Assume the redaction will work; we'll re-add the reaction if it didn't.
571                let Some(annotated_event_id) =
572                    item.as_remote().map(|event_item| event_item.event_id.clone())
573                else {
574                    warn!("remote reaction to remote event, but the associated item isn't remote");
575                    return Ok(false);
576                };
577
578                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
579                let reaction_info = reactions.remove_reaction(user_id, key);
580
581                if reaction_info.is_some() {
582                    let new_item = item.with_reactions(reactions);
583                    state.items.replace(item_pos, new_item);
584                } else {
585                    warn!(
586                        "reaction is missing on the item, not removing it locally, \
587                         but sending redaction."
588                    );
589                }
590
591                // Release the lock before running the request.
592                drop(state);
593
594                trace!("sending redact for a previous reaction");
595                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
596                    if let Some(reaction_info) = reaction_info {
597                        debug!("sending redact failed, adding the reaction back to the list");
598
599                        let mut state = self.state.write().await;
600                        if let Some((item_pos, item)) =
601                            rfind_event_by_id(&state.items, &annotated_event_id)
602                        {
603                            // Re-add the reaction to the mapping.
604                            let mut reactions =
605                                item.content().reactions().cloned().unwrap_or_default();
606                            reactions
607                                .entry(key.to_owned())
608                                .or_default()
609                                .insert(user_id.to_owned(), reaction_info);
610                            let new_item = item.with_reactions(reactions);
611                            state.items.replace(item_pos, new_item);
612                        } else {
613                            warn!(
614                                "couldn't find item to re-add reaction anymore; \
615                                 maybe it's been redacted?"
616                            );
617                        }
618                    }
619
620                    return Err(err);
621                }
622            }
623        }
624
625        Ok(false)
626    }
627
628    /// Handle updates on events as [`VectorDiff`]s.
629    pub(super) async fn handle_remote_events_with_diffs(
630        &self,
631        diffs: Vec<VectorDiff<TimelineEvent>>,
632        origin: RemoteEventOrigin,
633    ) {
634        if diffs.is_empty() {
635            return;
636        }
637
638        let mut state = self.state.write().await;
639        state
640            .handle_remote_events_with_diffs(
641                diffs,
642                origin,
643                &self.room_data_provider,
644                &self.settings,
645            )
646            .await
647    }
648
649    /// Only handle aggregations received as [`VectorDiff`]s.
650    pub(super) async fn handle_remote_aggregations(
651        &self,
652        diffs: Vec<VectorDiff<TimelineEvent>>,
653        origin: RemoteEventOrigin,
654    ) {
655        if diffs.is_empty() {
656            return;
657        }
658
659        let mut state = self.state.write().await;
660        state
661            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
662            .await
663    }
664
665    pub(super) async fn clear(&self) {
666        self.state.write().await.clear();
667    }
668
669    /// Replaces the content of the current timeline with initial events.
670    ///
671    /// Also sets up read receipts and the read marker for a live timeline of a
672    /// room.
673    ///
674    /// This is all done with a single lock guard, since we don't want the state
675    /// to be modified between the clear and re-insertion of new events.
676    pub(super) async fn replace_with_initial_remote_events<Events>(
677        &self,
678        events: Events,
679        origin: RemoteEventOrigin,
680    ) where
681        Events: IntoIterator,
682        <Events as IntoIterator>::Item: Into<TimelineEvent>,
683    {
684        let mut state = self.state.write().await;
685
686        let track_read_markers = &self.settings.track_read_receipts;
687        if track_read_markers.is_enabled() {
688            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
689            state
690                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
691                .await;
692        }
693
694        // Replace the events if either the current event list or the new one aren't
695        // empty.
696        // Previously we just had to check the new one wasn't empty because
697        // we did a clear operation before so the current one would always be empty, but
698        // now we may want to replace a populated timeline with an empty one.
699        let mut events = events.into_iter().peekable();
700        if !state.items.is_empty() || events.peek().is_some() {
701            state
702                .replace_with_remote_events(
703                    events,
704                    origin,
705                    &self.room_data_provider,
706                    &self.settings,
707                )
708                .await;
709        }
710
711        if track_read_markers.is_enabled() {
712            if let Some(fully_read_event_id) =
713                self.room_data_provider.load_fully_read_marker().await
714            {
715                state.handle_fully_read_marker(fully_read_event_id);
716            } else if let Some(latest_receipt_event_id) = state
717                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
718            {
719                // Fall back to read receipt if no fully read marker exists.
720                debug!("no `m.fully_read` marker found, falling back to read receipt");
721                state.handle_fully_read_marker(latest_receipt_event_id);
722            }
723        }
724    }
725
726    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
727        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
728    }
729
730    pub(super) async fn handle_ephemeral_events(
731        &self,
732        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
733    ) {
734        // Don't even take the lock if there are no events to process.
735        if events.is_empty() {
736            return;
737        }
738        let mut state = self.state.write().await;
739        state.handle_ephemeral_events(events, &self.room_data_provider).await;
740    }
741
742    /// Creates the local echo for an event we're sending.
743    #[instrument(skip_all)]
744    pub(super) async fn handle_local_event(
745        &self,
746        txn_id: OwnedTransactionId,
747        content: AnyMessageLikeEventContent,
748        send_handle: Option<SendHandle>,
749    ) {
750        let sender = self.room_data_provider.own_user_id().to_owned();
751        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
752
753        let date_divider_mode = self.settings.date_divider_mode.clone();
754
755        let mut state = self.state.write().await;
756        state
757            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
758            .await;
759    }
760
761    /// Update the send state of a local event represented by a transaction ID.
762    ///
763    /// If the corresponding local timeline item is missing, a warning is
764    /// raised.
765    #[instrument(skip(self))]
766    pub(super) async fn update_event_send_state(
767        &self,
768        txn_id: &TransactionId,
769        send_state: EventSendState,
770    ) {
771        let mut state = self.state.write().await;
772        let mut txn = state.transaction();
773
774        let new_event_id: Option<&EventId> =
775            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
776
777        // The local echoes are always at the end of the timeline, we must first make
778        // sure the remote echo hasn't showed up yet.
779        if rfind_event_item(&txn.items, |it| {
780            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
781        })
782        .is_some()
783        {
784            // Remote echo already received. This is very unlikely.
785            trace!("Remote echo received before send-event response");
786
787            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
788
789            // If there's both the remote echo and a local echo, that means the
790            // remote echo was received before the response *and* contained no
791            // transaction ID (and thus duplicated the local echo).
792            if let Some((idx, _)) = local_echo {
793                warn!("Message echo got duplicated, removing the local one");
794                txn.items.remove(idx);
795
796                // Adjust the date dividers, if needs be.
797                let mut adjuster =
798                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
799                adjuster.run(&mut txn.items, &mut txn.meta);
800            }
801
802            txn.commit();
803            return;
804        }
805
806        // Look for the local event by the transaction ID or event ID.
807        let result = rfind_event_item(&txn.items, |it| {
808            it.transaction_id() == Some(txn_id)
809                || new_event_id.is_some()
810                    && it.event_id() == new_event_id
811                    && it.as_local().is_some()
812        });
813
814        let Some((idx, item)) = result else {
815            // Event wasn't found as a standalone item.
816            //
817            // If it was just sent, try to find if it matches a corresponding aggregation,
818            // and mark it as sent in that case.
819            if let Some(new_event_id) = new_event_id {
820                if txn.meta.aggregations.mark_aggregation_as_sent(
821                    txn_id.to_owned(),
822                    new_event_id.to_owned(),
823                    &mut txn.items,
824                    &txn.meta.room_version_rules,
825                ) {
826                    trace!("Aggregation marked as sent");
827                    txn.commit();
828                    return;
829                }
830
831                trace!("Sent aggregation was not found");
832            }
833
834            warn!("Timeline item not found, can't update send state");
835            return;
836        };
837
838        let Some(local_item) = item.as_local() else {
839            warn!("We looked for a local item, but it transitioned to remote.");
840            return;
841        };
842
843        // The event was already marked as sent, that's a broken state, let's
844        // emit an error but also override to the given sent state.
845        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
846            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
847        }
848
849        // If the event has just been marked as sent, update the aggregations mapping to
850        // take that into account.
851        if let Some(new_event_id) = new_event_id {
852            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
853        }
854
855        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
856        txn.items.replace(idx, new_item);
857
858        txn.commit();
859    }
860
861    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
862        let mut state = self.state.write().await;
863
864        if let Some((idx, _)) =
865            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
866        {
867            let mut txn = state.transaction();
868
869            txn.items.remove(idx);
870
871            // A read marker or a date divider may have been inserted before the local echo.
872            // Ensure both are up to date.
873            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
874            adjuster.run(&mut txn.items, &mut txn.meta);
875
876            txn.meta.update_read_marker(&mut txn.items);
877
878            txn.commit();
879
880            debug!("discarded local echo");
881            return true;
882        }
883
884        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
885        // dereferencing it once.
886        let mut txn = state.transaction();
887
888        // Look if this was a local aggregation.
889        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
890            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
891            &mut txn.items,
892        ) {
893            Ok(val) => val,
894            Err(err) => {
895                warn!("error when discarding local echo for an aggregation: {err}");
896                // The aggregation has been found, it's just that we couldn't discard it.
897                true
898            }
899        };
900
901        if found_aggregation {
902            txn.commit();
903        }
904
905        found_aggregation
906    }
907
908    pub(super) async fn replace_local_echo(
909        &self,
910        txn_id: &TransactionId,
911        content: AnyMessageLikeEventContent,
912    ) -> bool {
913        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
914            // Ideally, we'd support replacing local echoes for a reaction, etc., but
915            // handling RoomMessage should be sufficient in most cases. Worst
916            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
917            // editing the event then.
918            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
919            return false;
920        };
921
922        let mut state = self.state.write().await;
923        let mut txn = state.transaction();
924
925        let Some((idx, prev_item)) =
926            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
927        else {
928            debug!("Can't find local echo to replace");
929            return false;
930        };
931
932        // Reuse the previous local echo's state, but reset the send state to not sent
933        // (per API contract).
934        let ti_kind = {
935            let Some(prev_local_item) = prev_item.as_local() else {
936                warn!("We looked for a local item, but it transitioned as remote??");
937                return false;
938            };
939            // If the local echo had an upload progress, retain it.
940            let progress = as_variant!(&prev_local_item.send_state,
941                EventSendState::NotSentYet { progress } => progress.clone())
942            .flatten();
943            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
944        };
945
946        // Replace the local-related state (kind) and the content state.
947        let new_item = TimelineItem::new(
948            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
949                content.msgtype,
950                content.mentions,
951                prev_item.content().reactions().cloned().unwrap_or_default(),
952                prev_item.content().thread_root(),
953                prev_item.content().in_reply_to(),
954                prev_item.content().thread_summary(),
955            )),
956            prev_item.internal_id.to_owned(),
957        );
958
959        txn.items.replace(idx, new_item);
960
961        // This doesn't change the original sending time, so there's no need to adjust
962        // date dividers.
963
964        txn.commit();
965
966        debug!("Replaced local echo");
967        true
968    }
969
970    pub(super) async fn compute_redecryption_candidates(
971        &self,
972    ) -> (BTreeSet<String>, BTreeSet<String>) {
973        let state = self.state.read().await;
974        compute_redecryption_candidates(&state.items)
975    }
976
977    pub(super) async fn set_sender_profiles_pending(&self) {
978        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
979    }
980
981    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
982        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
983    }
984
985    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
986        self.state.write().await.items.for_each(|mut entry| {
987            let Some(event_item) = entry.as_event() else { return };
988            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
989                let new_item = entry.with_kind(TimelineItemKind::Event(
990                    event_item.with_sender_profile(profile_state.clone()),
991                ));
992                ObservableItemsEntry::replace(&mut entry, new_item);
993            }
994        });
995    }
996
997    pub(super) async fn update_missing_sender_profiles(&self) {
998        trace!("Updating missing sender profiles");
999
1000        let mut state = self.state.write().await;
1001        let mut entries = state.items.entries();
1002        while let Some(mut entry) = entries.next() {
1003            let Some(event_item) = entry.as_event() else { continue };
1004            let event_id = event_item.event_id().map(debug);
1005            let transaction_id = event_item.transaction_id().map(debug);
1006
1007            if event_item.sender_profile().is_ready() {
1008                trace!(event_id, transaction_id, "Profile already set");
1009                continue;
1010            }
1011
1012            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1013                Some(profile) => {
1014                    trace!(event_id, transaction_id, "Adding profile");
1015                    let updated_item =
1016                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1017                    let new_item = entry.with_kind(updated_item);
1018                    ObservableItemsEntry::replace(&mut entry, new_item);
1019                }
1020                None => {
1021                    if !event_item.sender_profile().is_unavailable() {
1022                        trace!(event_id, transaction_id, "Marking profile unavailable");
1023                        let updated_item =
1024                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1025                        let new_item = entry.with_kind(updated_item);
1026                        ObservableItemsEntry::replace(&mut entry, new_item);
1027                    } else {
1028                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1029                    }
1030                }
1031            }
1032        }
1033
1034        trace!("Done updating missing sender profiles");
1035    }
1036
1037    /// Update the profiles of the given senders, even if they are ready.
1038    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1039        trace!("Forcing update of sender profiles: {sender_ids:?}");
1040
1041        let mut state = self.state.write().await;
1042        let mut entries = state.items.entries();
1043        while let Some(mut entry) = entries.next() {
1044            let Some(event_item) = entry.as_event() else { continue };
1045            if !sender_ids.contains(event_item.sender()) {
1046                continue;
1047            }
1048
1049            let event_id = event_item.event_id().map(debug);
1050            let transaction_id = event_item.transaction_id().map(debug);
1051
1052            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1053                Some(profile) => {
1054                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1055                    {
1056                        debug!(event_id, transaction_id, "Profile already up-to-date");
1057                    } else {
1058                        trace!(event_id, transaction_id, "Updating profile");
1059                        let updated_item =
1060                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1061                        let new_item = entry.with_kind(updated_item);
1062                        ObservableItemsEntry::replace(&mut entry, new_item);
1063                    }
1064                }
1065                None => {
1066                    if !event_item.sender_profile().is_unavailable() {
1067                        trace!(event_id, transaction_id, "Marking profile unavailable");
1068                        let updated_item =
1069                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1070                        let new_item = entry.with_kind(updated_item);
1071                        ObservableItemsEntry::replace(&mut entry, new_item);
1072                    } else {
1073                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1074                    }
1075                }
1076            }
1077        }
1078
1079        trace!("Done forcing update of sender profiles");
1080    }
1081
1082    #[cfg(test)]
1083    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1084        let own_user_id = self.room_data_provider.own_user_id();
1085        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1086    }
1087
1088    /// Get the latest read receipt for the given user.
1089    ///
1090    /// Useful to get the latest read receipt, whether it's private or public.
1091    pub(super) async fn latest_user_read_receipt(
1092        &self,
1093        user_id: &UserId,
1094    ) -> Option<(OwnedEventId, Receipt)> {
1095        let receipt_thread = self.focus.receipt_thread();
1096
1097        self.state
1098            .read()
1099            .await
1100            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1101            .await
1102    }
1103
1104    /// Get the ID of the timeline event with the latest read receipt for the
1105    /// given user.
1106    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1107        &self,
1108        user_id: &UserId,
1109    ) -> Option<OwnedEventId> {
1110        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1111    }
1112
1113    /// Subscribe to changes in the read receipts of our own user.
1114    pub async fn subscribe_own_user_read_receipts_changed(
1115        &self,
1116    ) -> impl Stream<Item = ()> + use<P> {
1117        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1118    }
1119
1120    /// Handle a room send update that's a new local echo.
1121    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1122        match echo.content {
1123            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1124                let content = match serialized_event.deserialize() {
1125                    Ok(d) => d,
1126                    Err(err) => {
1127                        warn!("error deserializing local echo: {err}");
1128                        return;
1129                    }
1130                };
1131
1132                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1133                    .await;
1134
1135                if let Some(send_error) = send_error {
1136                    self.update_event_send_state(
1137                        &echo.transaction_id,
1138                        EventSendState::SendingFailed {
1139                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1140                                send_error,
1141                            ))),
1142                            is_recoverable: false,
1143                        },
1144                    )
1145                    .await;
1146                }
1147            }
1148
1149            LocalEchoContent::React { key, send_handle, applies_to } => {
1150                self.handle_local_reaction(key, send_handle, applies_to).await;
1151            }
1152
1153            LocalEchoContent::Redaction { redacts, send_error, .. } => {
1154                self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
1155
1156                if let Some(send_error) = send_error {
1157                    self.update_event_send_state(
1158                        &echo.transaction_id,
1159                        EventSendState::SendingFailed {
1160                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1161                                send_error,
1162                            ))),
1163                            is_recoverable: false,
1164                        },
1165                    )
1166                    .await;
1167                }
1168            }
1169        }
1170    }
1171
1172    /// Adds a reaction (local echo) to a local echo.
1173    #[instrument(skip(self, send_handle))]
1174    async fn handle_local_reaction(
1175        &self,
1176        reaction_key: String,
1177        send_handle: SendReactionHandle,
1178        applies_to: OwnedTransactionId,
1179    ) {
1180        let mut state = self.state.write().await;
1181        let mut tr = state.transaction();
1182
1183        let target = TimelineEventItemId::TransactionId(applies_to);
1184
1185        let reaction_txn_id = send_handle.transaction_id().to_owned();
1186        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1187        let aggregation = Aggregation::new(
1188            TimelineEventItemId::TransactionId(reaction_txn_id),
1189            AggregationKind::Reaction {
1190                key: reaction_key.clone(),
1191                sender: self.room_data_provider.own_user_id().to_owned(),
1192                timestamp: MilliSecondsSinceUnixEpoch::now(),
1193                reaction_status,
1194            },
1195        );
1196
1197        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1198        find_item_and_apply_aggregation(
1199            &tr.meta.aggregations,
1200            &mut tr.items,
1201            &target,
1202            aggregation,
1203            &tr.meta.room_version_rules,
1204        );
1205
1206        tr.commit();
1207    }
1208
1209    /// Applies a local echo of a redaction.
1210    pub(super) async fn handle_local_redaction(
1211        &self,
1212        txn_id: OwnedTransactionId,
1213        redacts: OwnedEventId,
1214    ) {
1215        let mut state = self.state.write().await;
1216        let mut tr = state.transaction();
1217
1218        let target = TimelineEventItemId::EventId(redacts);
1219
1220        let aggregation = Aggregation::new(
1221            TimelineEventItemId::TransactionId(txn_id),
1222            AggregationKind::Redaction { is_local: true },
1223        );
1224
1225        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1226        find_item_and_apply_aggregation(
1227            &tr.meta.aggregations,
1228            &mut tr.items,
1229            &target,
1230            aggregation,
1231            &tr.meta.room_version_rules,
1232        );
1233
1234        tr.commit();
1235    }
1236
1237    /// Handle a single room send queue update.
1238    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1239        match update {
1240            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1241                self.handle_local_echo(echo).await;
1242            }
1243
1244            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1245                if !self.discard_local_echo(&transaction_id).await {
1246                    warn!("couldn't find the local echo to discard");
1247                }
1248            }
1249
1250            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1251                let content = match new_content.deserialize() {
1252                    Ok(d) => d,
1253                    Err(err) => {
1254                        warn!("error deserializing local echo (upon edit): {err}");
1255                        return;
1256                    }
1257                };
1258
1259                if !self.replace_local_echo(&transaction_id, content).await {
1260                    warn!("couldn't find the local echo to replace");
1261                }
1262            }
1263
1264            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1265                self.update_event_send_state(
1266                    &transaction_id,
1267                    EventSendState::SendingFailed { error, is_recoverable },
1268                )
1269                .await;
1270            }
1271
1272            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1273                self.update_event_send_state(
1274                    &transaction_id,
1275                    EventSendState::NotSentYet { progress: None },
1276                )
1277                .await;
1278            }
1279
1280            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1281                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1282                    .await;
1283            }
1284
1285            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1286                self.update_event_send_state(
1287                    &related_to,
1288                    EventSendState::NotSentYet {
1289                        progress: Some(MediaUploadProgress { index, progress }),
1290                    },
1291                )
1292                .await;
1293            }
1294        }
1295    }
1296
1297    /// Insert a timeline start item at the beginning of the room, if it's
1298    /// missing.
1299    pub async fn insert_timeline_start_if_missing(&self) {
1300        let mut state = self.state.write().await;
1301        let mut txn = state.transaction();
1302        txn.items.push_timeline_start_if_missing(
1303            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1304        );
1305        txn.commit();
1306    }
1307
1308    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1309    /// timeline or not.
1310    ///
1311    /// Can be `None` if the event cannot be represented as a standalone item,
1312    /// because it's an aggregation.
1313    pub(super) async fn make_replied_to(
1314        &self,
1315        event: TimelineEvent,
1316    ) -> Result<Option<EmbeddedEvent>, Error> {
1317        let state = self.state.read().await;
1318        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1319    }
1320}
1321
1322impl TimelineController {
1323    pub(super) fn room(&self) -> &Room {
1324        &self.room_data_provider
1325    }
1326
1327    /// Initializes the configured timeline focus with appropriate data.
1328    ///
1329    /// Should be called only once after creation of the [`TimelineController`],
1330    /// with all its fields set.
1331    pub(super) async fn init_focus(
1332        &self,
1333        focus: &TimelineFocus,
1334        room_event_cache: &RoomEventCache,
1335    ) -> Result<InitFocusResult, Error> {
1336        match focus {
1337            TimelineFocus::Live { .. } => {
1338                // Retrieve the cached events, and add them to the timeline.
1339                let events = room_event_cache.events().await?;
1340
1341                let has_events = !events.is_empty();
1342
1343                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1344
1345                match room_event_cache.pagination().status().get() {
1346                    PaginationStatus::Idle { hit_timeline_start } => {
1347                        if hit_timeline_start {
1348                            // Eagerly insert the timeline start item, since pagination claims
1349                            // we've already hit the timeline start.
1350                            self.insert_timeline_start_if_missing().await;
1351                        }
1352                    }
1353                    PaginationStatus::Paginating => {}
1354                }
1355
1356                Ok(InitFocusResult { has_events, focus_task: None })
1357            }
1358
1359            TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
1360                // Use the event-focused cache from the event cache layer.
1361                let event_cache_thread_mode = match thread_mode {
1362                    TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
1363                    TimelineEventFocusThreadMode::Automatic { .. } => {
1364                        EventFocusThreadMode::Automatic
1365                    }
1366                };
1367
1368                let cache = room_event_cache
1369                    .get_or_create_event_focused_cache(
1370                        event_id.clone(),
1371                        *num_context_events,
1372                        event_cache_thread_mode,
1373                    )
1374                    .await
1375                    .map_err(PaginationError::EventCache)?;
1376
1377                let (events, receiver) = cache.subscribe().await;
1378
1379                let has_events = !events.is_empty();
1380
1381                // Ask the cache for the thread root, if it managed to extract one or decided
1382                // that the target event was the thread root.
1383                match &*self.focus {
1384                    TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
1385                        if let Some(thread_root) = cache.thread_root().await {
1386                            focus_thread_root.get_or_init(|| thread_root);
1387                        }
1388                    }
1389                    TimelineFocusKind::Live { .. }
1390                    | TimelineFocusKind::Thread { .. }
1391                    | TimelineFocusKind::PinnedEvents => {
1392                        panic!("unexpected focus for an event-focused timeline")
1393                    }
1394                }
1395
1396                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
1397                    .await;
1398
1399                let task = self
1400                    .room_data_provider
1401                    .client()
1402                    .task_monitor()
1403                    .spawn_infinite_task(
1404                        "timeline::event_focused_cache_updates",
1405                        event_focused_task(
1406                            event_id.clone(),
1407                            (*thread_mode).into(),
1408                            room_event_cache.clone(),
1409                            self.clone(),
1410                            receiver,
1411                        ),
1412                    )
1413                    .abort_on_drop();
1414
1415                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1416            }
1417
1418            TimelineFocus::Thread { root_event_id, .. } => {
1419                let (has_events, receiver) =
1420                    self.init_with_thread_root(root_event_id, room_event_cache).await?;
1421
1422                let room = &self.room_data_provider;
1423                let span = info_span!(
1424                    parent: Span::none(),
1425                    "thread_live_update_handler",
1426                    room_id = ?room.room_id(),
1427                );
1428                span.follows_from(Span::current());
1429
1430                let task = room
1431                    .client()
1432                    .task_monitor()
1433                    .spawn_infinite_task(
1434                        "timeline::thread_event_cache_updates",
1435                        thread_updates_task(
1436                            receiver,
1437                            room_event_cache.clone(),
1438                            self.clone(),
1439                            root_event_id.clone(),
1440                        )
1441                        .instrument(span),
1442                    )
1443                    .abort_on_drop();
1444
1445                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1446            }
1447
1448            TimelineFocus::PinnedEvents => {
1449                let (initial_events, pinned_events_recv) =
1450                    room_event_cache.subscribe_to_pinned_events().await?;
1451
1452                let has_events = !initial_events.is_empty();
1453
1454                self.replace_with_initial_remote_events(
1455                    initial_events,
1456                    RemoteEventOrigin::Pagination,
1457                )
1458                .await;
1459
1460                let task = self
1461                    .room_data_provider
1462                    .client()
1463                    .task_monitor()
1464                    .spawn_infinite_task(
1465                        "timeline::pinned_event_cache_updates",
1466                        pinned_events_task(
1467                            room_event_cache.clone(),
1468                            self.clone(),
1469                            pinned_events_recv,
1470                        ),
1471                    )
1472                    .abort_on_drop();
1473
1474                Ok(InitFocusResult { has_events, focus_task: Some(task) })
1475            }
1476        }
1477    }
1478
1479    /// (Re-)initialise a timeline using [`TimelineFocus::Thread`] with cached
1480    /// threaded events and secondary relations.
1481    ///
1482    /// Returns whether there were any events added to the timeline, and a
1483    /// receiver to return updates after the initial events have been
1484    /// inserted in the timeline.
1485    pub(super) async fn init_with_thread_root(
1486        &self,
1487        root_event_id: &OwnedEventId,
1488        room_event_cache: &RoomEventCache,
1489    ) -> Result<(bool, broadcast::Receiver<TimelineVectorDiffs>), Error> {
1490        let (events, receiver) =
1491            room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
1492        let has_events = !events.is_empty();
1493
1494        // For each event, we also need to find the related events, as they don't
1495        // include the thread relationship, they won't be included in
1496        // the initial list of events.
1497        let mut related_events = Vector::new();
1498        for event_id in events.iter().filter_map(|event| event.event_id()) {
1499            if let Some((_original, related)) =
1500                room_event_cache.find_event_with_relations(&event_id, None).await?
1501            {
1502                related_events.extend(related);
1503            }
1504        }
1505
1506        self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1507
1508        // Now that we've inserted the thread events, add the aggregations too.
1509        if !related_events.is_empty() {
1510            self.handle_remote_aggregations(
1511                vec![VectorDiff::Append { values: related_events }],
1512                RemoteEventOrigin::Cache,
1513            )
1514            .await;
1515        }
1516
1517        Ok((has_events, receiver))
1518    }
1519
1520    /// Given an event identifier, will fetch the details for the event it's
1521    /// replying to, if applicable.
1522    #[instrument(skip(self))]
1523    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1524        let state_guard = self.state.write().await;
1525        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1526            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1527        let remote_item = item
1528            .as_remote()
1529            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1530            .clone();
1531
1532        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1533            debug!("Event is not a message");
1534            return Ok(());
1535        };
1536        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1537            debug!("Event is not a reply");
1538            return Ok(());
1539        };
1540        if let TimelineDetails::Pending = &in_reply_to.event {
1541            debug!("Replied-to event is already being fetched");
1542            return Ok(());
1543        }
1544        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1545            debug!("Replied-to event has already been fetched");
1546            return Ok(());
1547        }
1548
1549        let internal_id = item.internal_id.to_owned();
1550        let item = item.clone();
1551        let event = fetch_replied_to_event(
1552            state_guard,
1553            &self.state,
1554            index,
1555            &item,
1556            internal_id,
1557            &msglike,
1558            &in_reply_to.event_id,
1559            self.room(),
1560        )
1561        .await?;
1562
1563        // We need to be sure to have the latest position of the event as it might have
1564        // changed while waiting for the request.
1565        let mut state = self.state.write().await;
1566        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1567            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1568
1569        // Check the state of the event again, it might have been redacted while
1570        // the request was in-flight.
1571        let TimelineItemContent::MsgLike(MsgLikeContent {
1572            kind: MsgLikeKind::Message(message),
1573            reactions,
1574            thread_root,
1575            in_reply_to,
1576            thread_summary,
1577        }) = item.content().clone()
1578        else {
1579            info!("Event is no longer a message (redacted?)");
1580            return Ok(());
1581        };
1582        let Some(in_reply_to) = in_reply_to else {
1583            warn!("Event no longer has a reply (bug?)");
1584            return Ok(());
1585        };
1586
1587        // Now that we've received the content of the replied-to event, replace the
1588        // replied-to content in the item with it.
1589        trace!("Updating in-reply-to details");
1590        let internal_id = item.internal_id.to_owned();
1591        let mut item = item.clone();
1592        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1593            kind: MsgLikeKind::Message(message),
1594            reactions,
1595            thread_root,
1596            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1597            thread_summary,
1598        }));
1599        state.items.replace(index, TimelineItem::new(item, internal_id));
1600
1601        Ok(())
1602    }
1603
1604    /// Returns the thread that should be used for a read receipt based on the
1605    /// current focus of the timeline and the receipt type.
1606    ///
1607    /// A `SendReceiptType::FullyRead` will always use
1608    /// `ReceiptThread::Unthreaded`
1609    pub(super) fn infer_thread_for_read_receipt(
1610        &self,
1611        receipt_type: &SendReceiptType,
1612    ) -> ReceiptThread {
1613        if matches!(receipt_type, SendReceiptType::FullyRead) {
1614            ReceiptThread::Unthreaded
1615        } else {
1616            self.focus.receipt_thread()
1617        }
1618    }
1619
1620    /// Check whether the given receipt should be sent.
1621    ///
1622    /// Returns `false` if the given receipt is older than the current one.
1623    pub(super) async fn should_send_receipt(
1624        &self,
1625        receipt_type: &SendReceiptType,
1626        receipt_thread: &ReceiptThread,
1627        event_id: &EventId,
1628    ) -> bool {
1629        let own_user_id = self.room().own_user_id();
1630        let state = self.state.read().await;
1631        let room = self.room();
1632
1633        match receipt_type {
1634            SendReceiptType::Read => {
1635                if let Some((old_pub_read, _)) = state
1636                    .meta
1637                    .user_receipt(
1638                        own_user_id,
1639                        ReceiptType::Read,
1640                        receipt_thread.clone(),
1641                        room,
1642                        state.items.all_remote_events(),
1643                    )
1644                    .await
1645                {
1646                    trace!(%old_pub_read, "found a previous public receipt");
1647                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1648                        &old_pub_read,
1649                        event_id,
1650                        state.items.all_remote_events(),
1651                    ) {
1652                        trace!(
1653                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1654                        );
1655                        return relative_pos == RelativePosition::After;
1656                    }
1657                }
1658            }
1659
1660            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1661            // doesn't make sense to have a private read receipt behind a public one.
1662            SendReceiptType::ReadPrivate => {
1663                if let Some((old_priv_read, _)) =
1664                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1665                {
1666                    trace!(%old_priv_read, "found a previous private receipt");
1667                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1668                        &old_priv_read,
1669                        event_id,
1670                        state.items.all_remote_events(),
1671                    ) {
1672                        trace!(
1673                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1674                        );
1675                        return relative_pos == RelativePosition::After;
1676                    }
1677                }
1678            }
1679
1680            SendReceiptType::FullyRead => {
1681                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1682                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1683                        &prev_event_id,
1684                        event_id,
1685                        state.items.all_remote_events(),
1686                    )
1687                {
1688                    return relative_pos == RelativePosition::After;
1689                }
1690            }
1691
1692            _ => {}
1693        }
1694
1695        // Let the server handle unknown receipts.
1696        true
1697    }
1698
1699    /// Returns the latest event identifier, even if it's not visible, or if
1700    /// it's folded into another timeline item.
1701    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1702        let state = self.state.read().await;
1703        let filter_out_thread_events = match self.focus() {
1704            TimelineFocusKind::Thread { .. } => false,
1705            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
1706            TimelineFocusKind::Event { .. } => {
1707                // For event-focused timelines, filtering is handled in the event cache layer.
1708                false
1709            }
1710            TimelineFocusKind::PinnedEvents => true,
1711        };
1712
1713        state
1714            .items
1715            .all_remote_events()
1716            .iter()
1717            .rev()
1718            .filter_map(|event_meta| {
1719                if !filter_out_thread_events {
1720                    // For an unthreaded timeline, the last event is always the latest event.
1721                    Some(event_meta.event_id.clone())
1722                } else if event_meta.thread_root_id.is_none() {
1723                    // For the main-thread timeline, only non-threaded events are valid candidates
1724                    // for the latest event.
1725                    //
1726                    // But! An event could be an aggregation that relate to an in-thread
1727                    // event. In this case, it's not a valid latest event.
1728                    if let Some(TimelineEventItemId::EventId(target_event_id)) =
1729                        state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1730                            event_meta.event_id.clone(),
1731                        ))
1732                        && let Some(target_meta) =
1733                            state.items.all_remote_events().get_by_event_id(target_event_id)
1734                        && target_meta.thread_root_id.is_some()
1735                    {
1736                        // This event is an aggregation of an in-thread event, so skip it.
1737                        None
1738                    } else {
1739                        // Not in a thread, and not the aggregation of an in-thread event, so it's
1740                        // a valid candidate for the latest event.
1741                        Some(event_meta.event_id.clone())
1742                    }
1743                } else {
1744                    // An in-thread event, when we're filtering out threaded events, is never a
1745                    // valid candidate for the latest event.
1746                    None
1747                }
1748            })
1749            .next()
1750    }
1751
1752    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1753    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1754        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1755
1756        let request = DecryptionRetryRequest {
1757            room_id: self.room().room_id().to_owned(),
1758            utd_session_ids: utds,
1759            refresh_info_session_ids: decrypted,
1760        };
1761
1762        self.room().client().event_cache().request_decryption(request);
1763    }
1764
1765    /// Combine the global (event cache) pagination status with the local state
1766    /// of the timeline.
1767    ///
1768    /// This only changes the global pagination status of this room, in one
1769    /// case: if the timeline has a skip count greater than 0, it will
1770    /// ensure that the pagination status says that we haven't reached the
1771    /// timeline start yet.
1772    pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1773        match status {
1774            PaginationStatus::Idle { hit_timeline_start } => {
1775                if hit_timeline_start {
1776                    let state = self.state.read().await;
1777                    // If the skip count is greater than 0, it means that a subsequent pagination
1778                    // could return more items, so pretend we didn't get the information that the
1779                    // timeline start was hit.
1780                    if state.meta.subscriber_skip_count.get() > 0 {
1781                        return PaginationStatus::Idle { hit_timeline_start: false };
1782                    }
1783                }
1784            }
1785            PaginationStatus::Paginating => {}
1786        }
1787
1788        // You're perfect, just the way you are.
1789        status
1790    }
1791}
1792
1793impl<P: RoomDataProvider> TimelineController<P> {
1794    /// Returns the timeline focus of the [`TimelineController`].
1795    pub(super) fn focus(&self) -> &TimelineFocusKind {
1796        &self.focus
1797    }
1798}
1799
1800#[allow(clippy::too_many_arguments)]
1801async fn fetch_replied_to_event<P: RoomDataProvider>(
1802    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1803    state_lock: &RwLock<TimelineState<P>>,
1804    index: usize,
1805    item: &EventTimelineItem,
1806    internal_id: TimelineUniqueId,
1807    msglike: &MsgLikeContent,
1808    in_reply_to: &EventId,
1809    room: &Room,
1810) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1811    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1812        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1813        trace!("Found replied-to event locally");
1814        return Ok(details);
1815    }
1816
1817    // Replace the item with a new timeline item that has the fetching status of the
1818    // replied-to event to pending.
1819    trace!("Setting in-reply-to details to pending");
1820    let in_reply_to_details =
1821        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1822
1823    let event_item = item
1824        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1825
1826    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1827    state_guard.items.replace(index, new_timeline_item);
1828
1829    // Don't hold the state lock while the network request is made.
1830    drop(state_guard);
1831
1832    trace!("Fetching replied-to event");
1833    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1834        Ok(timeline_event) => {
1835            let state = state_lock.read().await;
1836
1837            let replied_to_item =
1838                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1839
1840            if let Some(item) = replied_to_item {
1841                TimelineDetails::Ready(Box::new(item))
1842            } else {
1843                // The replied-to item is an aggregation, not a standalone item.
1844                return Err(Error::UnsupportedEvent);
1845            }
1846        }
1847
1848        Err(e) => TimelineDetails::Error(Arc::new(e)),
1849    };
1850
1851    Ok(res)
1852}