Skip to main content

matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeSet,
17    fmt,
18    sync::{Arc, OnceLock},
19};
20
21use as_variant::as_variant;
22use eyeball_im::{VectorDiff, VectorSubscriberStream};
23use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26#[cfg(test)]
27use matrix_sdk::Result;
28use matrix_sdk::{
29    deserialized_responses::TimelineEvent,
30    event_cache::{DecryptionRetryRequest, EventFocusThreadMode, PaginationStatus, RoomEventCache},
31    send_queue::{
32        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
33    },
34};
35#[cfg(test)]
36use ruma::events::receipt::ReceiptEventContent;
37use ruma::{
38    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
39    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
40    events::{
41        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
42        AnySyncTimelineEvent, MessageLikeEventType,
43        poll::unstable_start::UnstablePollStartEventContent,
44        reaction::ReactionEventContent,
45        receipt::{Receipt, ReceiptThread, ReceiptType},
46        relation::Annotation,
47        room::message::{MessageType, Relation},
48    },
49    room_version_rules::RoomVersionRules,
50    serde::Raw,
51};
52use tokio::sync::{RwLock, RwLockWriteGuard};
53use tracing::{debug, error, field::debug, info, instrument, trace, warn};
54
55pub(super) use self::{
56    metadata::{RelativePosition, TimelineMetadata},
57    observable_items::{
58        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
59        ObservableItemsTransactionEntry,
60    },
61    state::TimelineState,
62    state_transaction::TimelineStateTransaction,
63};
64use super::{
65    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
66    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
67    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
68    TimelineReadReceiptTracking, VirtualTimelineItem,
69    algorithms::{rfind_event_by_id, rfind_event_item},
70    event_item::{ReactionStatus, RemoteEventOrigin},
71    item::TimelineUniqueId,
72    subscriber::TimelineSubscriber,
73    traits::RoomDataProvider,
74};
75use crate::{
76    timeline::{
77        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
78        algorithms::rfind_event_by_item_id,
79        controller::decryption_retry_task::compute_redecryption_candidates,
80        date_dividers::DateDividerAdjuster, event_item::TimelineItemHandle,
81    },
82    unable_to_decrypt_hook::UtdHookManager,
83};
84
85pub(in crate::timeline) mod aggregations;
86mod decryption_retry_task;
87mod metadata;
88mod observable_items;
89mod read_receipts;
90mod state;
91mod state_transaction;
92
93pub(super) use aggregations::*;
94pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
95
96/// Data associated to the current timeline focus.
97///
98/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
99/// version of it, including extra state that makes it useful over the lifetime
100/// of a timeline.
101#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind {
103    /// The timeline receives live events from the sync.
104    Live {
105        /// Whether to hide in-thread events from the timeline.
106        hide_threaded_events: bool,
107    },
108
109    /// The timeline is focused on a single event, and it can expand in one
110    /// direction or another.
111    Event {
112        /// The focused event ID.
113        focused_event_id: OwnedEventId,
114
115        /// If the focused event is part or the root of a thread, what's the
116        /// thread root?
117        ///
118        /// This is determined once when initializing the event-focused cache,
119        /// and then it won't change for the duration of this timeline.
120        thread_root: OnceLock<OwnedEventId>,
121
122        /// The thread mode to use for this event-focused timeline, which is
123        /// part of the key for the memoized event-focused cache.
124        thread_mode: TimelineEventFocusThreadMode,
125    },
126
127    /// A live timeline for a thread.
128    Thread {
129        /// The root event for the current thread.
130        root_event_id: OwnedEventId,
131    },
132
133    PinnedEvents,
134}
135
136impl TimelineFocusKind {
137    /// Returns the [`ReceiptThread`] that should be used for the current
138    /// timeline focus.
139    ///
140    /// Live and event timelines will use the unthreaded read receipt type in
141    /// general, unless they hide in-thread events, in which case they will
142    /// use the main thread.
143    pub(super) fn receipt_thread(&self) -> ReceiptThread {
144        if let Some(thread_root) = self.thread_root() {
145            ReceiptThread::Thread(thread_root.to_owned())
146        } else if self.hide_threaded_events() {
147            ReceiptThread::Main
148        } else {
149            ReceiptThread::Unthreaded
150        }
151    }
152
153    /// Whether to hide in-thread events from the timeline.
154    fn hide_threaded_events(&self) -> bool {
155        match self {
156            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
157            TimelineFocusKind::Event { thread_mode, .. } => {
158                matches!(
159                    thread_mode,
160                    TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
161                )
162            }
163            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
164        }
165    }
166
167    /// Whether the focus is on a thread (from a live thread or a thread
168    /// permalink).
169    fn is_thread(&self) -> bool {
170        self.thread_root().is_some()
171    }
172
173    /// If the focus is a thread, returns its root event ID.
174    fn thread_root(&self) -> Option<&EventId> {
175        match self {
176            TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
177            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
178            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
179        }
180    }
181}
182
183#[derive(Clone, Debug)]
184pub(super) struct TimelineController<P: RoomDataProvider = Room> {
185    /// Inner mutable state.
186    state: Arc<RwLock<TimelineState<P>>>,
187
188    /// Focus data.
189    focus: Arc<TimelineFocusKind>,
190
191    /// A [`RoomDataProvider`] implementation, providing data.
192    ///
193    /// The type is a `RoomDataProvider` to allow testing. In the real world,
194    /// this would normally be a [`Room`].
195    pub(crate) room_data_provider: P,
196
197    /// Settings applied to this timeline.
198    pub(super) settings: TimelineSettings,
199}
200
201#[derive(Clone)]
202pub(super) struct TimelineSettings {
203    /// Should the read receipts and read markers be handled and on which event
204    /// types?
205    pub(super) track_read_receipts: TimelineReadReceiptTracking,
206
207    /// Event filter that controls what's rendered as a timeline item (and thus
208    /// what can carry read receipts).
209    pub(super) event_filter: Arc<TimelineEventFilterFn>,
210
211    /// Are unparsable events added as timeline items of their own kind?
212    pub(super) add_failed_to_parse: bool,
213
214    /// Should the timeline items be grouped by day or month?
215    pub(super) date_divider_mode: DateDividerMode,
216}
217
218#[cfg(not(tarpaulin_include))]
219impl fmt::Debug for TimelineSettings {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("TimelineSettings")
222            .field("track_read_receipts", &self.track_read_receipts)
223            .field("add_failed_to_parse", &self.add_failed_to_parse)
224            .finish_non_exhaustive()
225    }
226}
227
228impl Default for TimelineSettings {
229    fn default() -> Self {
230        Self {
231            track_read_receipts: TimelineReadReceiptTracking::Disabled,
232            event_filter: Arc::new(default_event_filter),
233            add_failed_to_parse: true,
234            date_divider_mode: DateDividerMode::Daily,
235        }
236    }
237}
238
239/// The default event filter for
240/// [`crate::timeline::TimelineBuilder::event_filter`].
241///
242/// It filters out events that are not rendered by the timeline, including but
243/// not limited to: reactions, edits, redactions on existing messages.
244///
245/// If you have a custom filter, it may be best to chain yours with this one if
246/// you do not want to run into situations where a read receipt is not visible
247/// because it's living on an event that doesn't have a matching timeline item.
248pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
249    match event {
250        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
251            if ev.redacts(&rules.redaction).is_some() {
252                // This is a redaction of an existing message, we'll only update the previous
253                // message and not render a new entry.
254                false
255            } else {
256                // This is a redacted entry, that we'll show only if the redacted entity wasn't
257                // a reaction.
258                ev.event_type() != MessageLikeEventType::Reaction
259            }
260        }
261
262        AnySyncTimelineEvent::MessageLike(msg) => {
263            match msg.original_content() {
264                None => {
265                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
266                    // a reaction.
267                    msg.event_type() != MessageLikeEventType::Reaction
268                }
269
270                Some(original_content) => {
271                    match original_content {
272                        AnyMessageLikeEventContent::RoomMessage(content) => {
273                            if content
274                                .relates_to
275                                .as_ref()
276                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
277                            {
278                                // Edits aren't visible by default.
279                                return false;
280                            }
281
282                            match content.msgtype {
283                                MessageType::Audio(_)
284                                | MessageType::Emote(_)
285                                | MessageType::File(_)
286                                | MessageType::Image(_)
287                                | MessageType::Location(_)
288                                | MessageType::Notice(_)
289                                | MessageType::ServerNotice(_)
290                                | MessageType::Text(_)
291                                | MessageType::Video(_)
292                                | MessageType::VerificationRequest(_) => true,
293                                #[cfg(feature = "unstable-msc4274")]
294                                MessageType::Gallery(_) => true,
295                                _ => false,
296                            }
297                        }
298
299                        AnyMessageLikeEventContent::Sticker(_)
300                        | AnyMessageLikeEventContent::UnstablePollStart(
301                            UnstablePollStartEventContent::New(_),
302                        )
303                        | AnyMessageLikeEventContent::CallInvite(_)
304                        | AnyMessageLikeEventContent::RtcNotification(_)
305                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
306
307                        // Beacon location-update events are aggregated onto
308                        // their parent `beacon_info` state event's timeline
309                        // item. They are never rendered as standalone items.
310                        AnyMessageLikeEventContent::Beacon(_) => false,
311
312                        _ => false,
313                    }
314                }
315            }
316        }
317
318        AnySyncTimelineEvent::State(_) => {
319            // All the state events may get displayed by default.
320            true
321        }
322    }
323}
324
325impl<P: RoomDataProvider> TimelineController<P> {
326    pub(super) fn new(
327        room_data_provider: P,
328        focus: TimelineFocus,
329        internal_id_prefix: Option<String>,
330        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
331        is_room_encrypted: bool,
332        settings: TimelineSettings,
333    ) -> Self {
334        let focus = match focus {
335            TimelineFocus::Live { hide_threaded_events } => {
336                TimelineFocusKind::Live { hide_threaded_events }
337            }
338
339            TimelineFocus::Event { target, thread_mode, .. } => {
340                TimelineFocusKind::Event {
341                    focused_event_id: target,
342                    // This will be initialized in `Self::init_focus`.
343                    thread_root: OnceLock::new(),
344                    thread_mode,
345                }
346            }
347
348            TimelineFocus::Thread { root_event_id, .. } => {
349                TimelineFocusKind::Thread { root_event_id }
350            }
351
352            TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
353        };
354
355        let focus = Arc::new(focus);
356        let state = Arc::new(RwLock::new(TimelineState::new(
357            focus.clone(),
358            room_data_provider.own_user_id().to_owned(),
359            room_data_provider.room_version_rules(),
360            internal_id_prefix,
361            unable_to_decrypt_hook,
362            is_room_encrypted,
363        )));
364
365        Self { state, focus, room_data_provider, settings }
366    }
367
368    /// Initializes the configured focus with appropriate data.
369    ///
370    /// Should be called only once after creation of the [`TimelineInner`], with
371    /// all its fields set.
372    ///
373    /// Returns whether there were any events added to the timeline.
374    pub(super) async fn init_focus(
375        &self,
376        focus: &TimelineFocus,
377        room_event_cache: &RoomEventCache,
378    ) -> Result<bool, Error> {
379        match focus {
380            TimelineFocus::Live { .. } => {
381                // Retrieve the cached events, and add them to the timeline.
382                let events = room_event_cache.events().await?;
383
384                let has_events = !events.is_empty();
385
386                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
387
388                match room_event_cache.pagination().status().get() {
389                    PaginationStatus::Idle { hit_timeline_start } => {
390                        if hit_timeline_start {
391                            // Eagerly insert the timeline start item, since pagination claims
392                            // we've already hit the timeline start.
393                            self.insert_timeline_start_if_missing().await;
394                        }
395                    }
396                    PaginationStatus::Paginating => {}
397                }
398
399                Ok(has_events)
400            }
401
402            TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
403                // Use the event-focused cache from the event cache layer.
404                let event_cache_thread_mode = match thread_mode {
405                    TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
406                    TimelineEventFocusThreadMode::Automatic { .. } => {
407                        EventFocusThreadMode::Automatic
408                    }
409                };
410
411                let cache = room_event_cache
412                    .get_or_create_event_focused_cache(
413                        event_id.clone(),
414                        *num_context_events,
415                        event_cache_thread_mode,
416                    )
417                    .await
418                    .map_err(PaginationError::EventCache)?;
419
420                let (events, _receiver) = cache.subscribe().await;
421
422                let has_events = !events.is_empty();
423
424                // Ask the cache for the thread root, if it managed to extract one or decided
425                // that the target event was the thread root.
426                match &*self.focus {
427                    TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
428                        if let Some(thread_root) = cache.thread_root().await {
429                            focus_thread_root.get_or_init(|| thread_root);
430                        }
431                    }
432                    TimelineFocusKind::Live { .. }
433                    | TimelineFocusKind::Thread { .. }
434                    | TimelineFocusKind::PinnedEvents => {
435                        panic!("unexpected focus for an event-focused timeline")
436                    }
437                }
438
439                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
440                    .await;
441
442                Ok(has_events)
443            }
444
445            TimelineFocus::Thread { root_event_id, .. } => {
446                self.init_with_thread_root(root_event_id, room_event_cache).await
447            }
448
449            TimelineFocus::PinnedEvents => {
450                let (initial_events, _update_receiver) =
451                    room_event_cache.subscribe_to_pinned_events().await?;
452
453                let has_events = !initial_events.is_empty();
454
455                self.replace_with_initial_remote_events(
456                    initial_events,
457                    RemoteEventOrigin::Pagination,
458                )
459                .await;
460
461                Ok(has_events)
462            }
463        }
464    }
465
466    /// (Re-)initialise a timeline using [`TimelineFocus::Thread`] with cached
467    /// threaded events and secondary relations.
468    ///
469    /// Returns whether there were any events added to the timeline.
470    pub(super) async fn init_with_thread_root(
471        &self,
472        root_event_id: &OwnedEventId,
473        room_event_cache: &RoomEventCache,
474    ) -> Result<bool, Error> {
475        let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
476        let has_events = !events.is_empty();
477
478        // For each event, we also need to find the related events, as they don't
479        // include the thread relationship, they won't be included in
480        // the initial list of events.
481        let mut related_events = Vector::new();
482        for event_id in events.iter().filter_map(|event| event.event_id()) {
483            if let Some((_original, related)) =
484                room_event_cache.find_event_with_relations(&event_id, None).await?
485            {
486                related_events.extend(related);
487            }
488        }
489
490        self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
491
492        // Now that we've inserted the thread events, add the aggregations too.
493        if !related_events.is_empty() {
494            self.handle_remote_aggregations(
495                vec![VectorDiff::Append { values: related_events }],
496                RemoteEventOrigin::Cache,
497            )
498            .await;
499        }
500
501        Ok(has_events)
502    }
503
504    /// Listens to encryption state changes for the room in
505    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
506    /// existing timeline items. This will then cause a refresh of those
507    /// timeline items.
508    pub async fn handle_encryption_state_changes(&self) {
509        let mut room_info = self.room_data_provider.room_info();
510
511        // Small function helper to help mark as encrypted.
512        let mark_encrypted = || async {
513            let mut state = self.state.write().await;
514            state.meta.is_room_encrypted = true;
515            state.mark_all_events_as_encrypted();
516        };
517
518        if room_info.get().encryption_state().is_encrypted() {
519            // If the room was already encrypted, it won't toggle to unencrypted, so we can
520            // shut down this task early.
521            mark_encrypted().await;
522            return;
523        }
524
525        while let Some(info) = room_info.next().await {
526            if info.encryption_state().is_encrypted() {
527                mark_encrypted().await;
528                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
529                // here is done.
530                break;
531            }
532        }
533    }
534
535    /// Run a lazy backwards pagination (in live mode).
536    ///
537    /// It adjusts the `count` value of the `Skip` higher-order stream so that
538    /// more items are pushed front in the timeline.
539    ///
540    /// If no more items are available (i.e. if the `count` is zero), this
541    /// method returns `Some(needs)` where `needs` is the number of events that
542    /// must be unlazily backwards paginated.
543    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
544        let state = self.state.read().await;
545
546        let (count, needs) = state
547            .meta
548            .subscriber_skip_count
549            .compute_next_when_paginating_backwards(num_events.into());
550
551        // This always happens on a live timeline.
552        let is_live_timeline = true;
553        state.meta.subscriber_skip_count.update(count, is_live_timeline);
554
555        needs
556    }
557
558    /// Is this timeline receiving events from sync (aka has a live focus)?
559    pub(super) fn is_live(&self) -> bool {
560        matches!(&*self.focus, TimelineFocusKind::Live { .. })
561    }
562
563    /// Is this timeline focused on a thread?
564    pub(super) fn is_threaded(&self) -> bool {
565        self.focus.is_thread()
566    }
567
568    /// The root of the current thread, for a live thread timeline or a
569    /// permalink to a thread message.
570    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
571        self.focus.thread_root().map(ToOwned::to_owned)
572    }
573
574    /// Get a copy of the current items in the list.
575    ///
576    /// Cheap because `im::Vector` is cheap to clone.
577    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
578        self.state.read().await.items.clone_items()
579    }
580
581    #[cfg(test)]
582    pub(super) async fn subscribe_raw(
583        &self,
584    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
585        self.state.read().await.items.subscribe().into_values_and_stream()
586    }
587
588    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
589        let state = self.state.read().await;
590
591        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
592    }
593
594    pub(super) async fn subscribe_filter_map<U, F>(
595        &self,
596        f: F,
597    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
598    where
599        U: Clone,
600        F: Fn(Arc<TimelineItem>) -> Option<U>,
601    {
602        self.state.read().await.items.subscribe().filter_map(f)
603    }
604
605    /// Toggle a reaction locally.
606    ///
607    /// Returns true if the reaction was added, false if it was removed.
608    #[instrument(skip_all)]
609    pub(super) async fn toggle_reaction_local(
610        &self,
611        item_id: &TimelineEventItemId,
612        key: &str,
613    ) -> Result<bool, Error> {
614        let mut state = self.state.write().await;
615
616        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
617            warn!("Timeline item not found, can't add reaction");
618            return Err(Error::FailedToToggleReaction);
619        };
620
621        let user_id = self.room_data_provider.own_user_id();
622        let prev_status = item
623            .content()
624            .reactions()
625            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
626
627        let Some(prev_status) = prev_status else {
628            // Adding the new reaction.
629            match item.handle() {
630                TimelineItemHandle::Local(send_handle) => {
631                    if send_handle
632                        .react(key.to_owned())
633                        .await
634                        .map_err(|err| Error::SendQueueError(err.into()))?
635                        .is_some()
636                    {
637                        trace!("adding a reaction to a local echo");
638                        return Ok(true);
639                    }
640
641                    warn!("couldn't toggle reaction for local echo");
642                    return Ok(false);
643                }
644
645                TimelineItemHandle::Remote(event_id) => {
646                    // Add a reaction through the room data provider.
647                    // No need to reflect the effect locally, since the local echo handling will
648                    // take care of it.
649                    trace!("adding a reaction to a remote echo");
650                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
651                    self.room_data_provider
652                        .send(ReactionEventContent::from(annotation).into())
653                        .await?;
654                    return Ok(true);
655                }
656            }
657        };
658
659        trace!("removing a previous reaction");
660        match prev_status {
661            ReactionStatus::LocalToLocal(send_reaction_handle) => {
662                if let Some(handle) = send_reaction_handle {
663                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
664                        // Impossible state: the reaction has moved from local to echo under our
665                        // feet, but the timeline was supposed to be locked!
666                        warn!("unexpectedly unable to abort sending of local reaction");
667                    }
668                } else {
669                    warn!("no send reaction handle (this should only happen in testing contexts)");
670                }
671            }
672
673            ReactionStatus::LocalToRemote(send_handle) => {
674                // No need to reflect the change ourselves, since handling the discard of the
675                // local echo will take care of it.
676                trace!("aborting send of the previous reaction that was a local echo");
677                if let Some(handle) = send_handle {
678                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
679                        // Impossible state: the reaction has moved from local to echo under our
680                        // feet, but the timeline was supposed to be locked!
681                        warn!("unexpectedly unable to abort sending of local reaction");
682                    }
683                } else {
684                    warn!("no send handle (this should only happen in testing contexts)");
685                }
686            }
687
688            ReactionStatus::RemoteToRemote(event_id) => {
689                // Assume the redaction will work; we'll re-add the reaction if it didn't.
690                let Some(annotated_event_id) =
691                    item.as_remote().map(|event_item| event_item.event_id.clone())
692                else {
693                    warn!("remote reaction to remote event, but the associated item isn't remote");
694                    return Ok(false);
695                };
696
697                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
698                let reaction_info = reactions.remove_reaction(user_id, key);
699
700                if reaction_info.is_some() {
701                    let new_item = item.with_reactions(reactions);
702                    state.items.replace(item_pos, new_item);
703                } else {
704                    warn!(
705                        "reaction is missing on the item, not removing it locally, \
706                         but sending redaction."
707                    );
708                }
709
710                // Release the lock before running the request.
711                drop(state);
712
713                trace!("sending redact for a previous reaction");
714                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
715                    if let Some(reaction_info) = reaction_info {
716                        debug!("sending redact failed, adding the reaction back to the list");
717
718                        let mut state = self.state.write().await;
719                        if let Some((item_pos, item)) =
720                            rfind_event_by_id(&state.items, &annotated_event_id)
721                        {
722                            // Re-add the reaction to the mapping.
723                            let mut reactions =
724                                item.content().reactions().cloned().unwrap_or_default();
725                            reactions
726                                .entry(key.to_owned())
727                                .or_default()
728                                .insert(user_id.to_owned(), reaction_info);
729                            let new_item = item.with_reactions(reactions);
730                            state.items.replace(item_pos, new_item);
731                        } else {
732                            warn!(
733                                "couldn't find item to re-add reaction anymore; \
734                                 maybe it's been redacted?"
735                            );
736                        }
737                    }
738
739                    return Err(err);
740                }
741            }
742        }
743
744        Ok(false)
745    }
746
747    /// Handle updates on events as [`VectorDiff`]s.
748    pub(super) async fn handle_remote_events_with_diffs(
749        &self,
750        diffs: Vec<VectorDiff<TimelineEvent>>,
751        origin: RemoteEventOrigin,
752    ) {
753        if diffs.is_empty() {
754            return;
755        }
756
757        let mut state = self.state.write().await;
758        state
759            .handle_remote_events_with_diffs(
760                diffs,
761                origin,
762                &self.room_data_provider,
763                &self.settings,
764            )
765            .await
766    }
767
768    /// Only handle aggregations received as [`VectorDiff`]s.
769    pub(super) async fn handle_remote_aggregations(
770        &self,
771        diffs: Vec<VectorDiff<TimelineEvent>>,
772        origin: RemoteEventOrigin,
773    ) {
774        if diffs.is_empty() {
775            return;
776        }
777
778        let mut state = self.state.write().await;
779        state
780            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
781            .await
782    }
783
784    pub(super) async fn clear(&self) {
785        self.state.write().await.clear();
786    }
787
788    /// Replaces the content of the current timeline with initial events.
789    ///
790    /// Also sets up read receipts and the read marker for a live timeline of a
791    /// room.
792    ///
793    /// This is all done with a single lock guard, since we don't want the state
794    /// to be modified between the clear and re-insertion of new events.
795    pub(super) async fn replace_with_initial_remote_events<Events>(
796        &self,
797        events: Events,
798        origin: RemoteEventOrigin,
799    ) where
800        Events: IntoIterator,
801        <Events as IntoIterator>::Item: Into<TimelineEvent>,
802    {
803        let mut state = self.state.write().await;
804
805        let track_read_markers = &self.settings.track_read_receipts;
806        if track_read_markers.is_enabled() {
807            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
808            state
809                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
810                .await;
811        }
812
813        // Replace the events if either the current event list or the new one aren't
814        // empty.
815        // Previously we just had to check the new one wasn't empty because
816        // we did a clear operation before so the current one would always be empty, but
817        // now we may want to replace a populated timeline with an empty one.
818        let mut events = events.into_iter().peekable();
819        if !state.items.is_empty() || events.peek().is_some() {
820            state
821                .replace_with_remote_events(
822                    events,
823                    origin,
824                    &self.room_data_provider,
825                    &self.settings,
826                )
827                .await;
828        }
829
830        if track_read_markers.is_enabled() {
831            if let Some(fully_read_event_id) =
832                self.room_data_provider.load_fully_read_marker().await
833            {
834                state.handle_fully_read_marker(fully_read_event_id);
835            } else if let Some(latest_receipt_event_id) = state
836                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
837            {
838                // Fall back to read receipt if no fully read marker exists.
839                debug!("no `m.fully_read` marker found, falling back to read receipt");
840                state.handle_fully_read_marker(latest_receipt_event_id);
841            }
842        }
843    }
844
845    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
846        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
847    }
848
849    pub(super) async fn handle_ephemeral_events(
850        &self,
851        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
852    ) {
853        // Don't even take the lock if there are no events to process.
854        if events.is_empty() {
855            return;
856        }
857        let mut state = self.state.write().await;
858        state.handle_ephemeral_events(events, &self.room_data_provider).await;
859    }
860
861    /// Creates the local echo for an event we're sending.
862    #[instrument(skip_all)]
863    pub(super) async fn handle_local_event(
864        &self,
865        txn_id: OwnedTransactionId,
866        content: AnyMessageLikeEventContent,
867        send_handle: Option<SendHandle>,
868    ) {
869        let sender = self.room_data_provider.own_user_id().to_owned();
870        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
871
872        let date_divider_mode = self.settings.date_divider_mode.clone();
873
874        let mut state = self.state.write().await;
875        state
876            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
877            .await;
878    }
879
880    /// Update the send state of a local event represented by a transaction ID.
881    ///
882    /// If the corresponding local timeline item is missing, a warning is
883    /// raised.
884    #[instrument(skip(self))]
885    pub(super) async fn update_event_send_state(
886        &self,
887        txn_id: &TransactionId,
888        send_state: EventSendState,
889    ) {
890        let mut state = self.state.write().await;
891        let mut txn = state.transaction();
892
893        let new_event_id: Option<&EventId> =
894            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
895
896        // The local echoes are always at the end of the timeline, we must first make
897        // sure the remote echo hasn't showed up yet.
898        if rfind_event_item(&txn.items, |it| {
899            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
900        })
901        .is_some()
902        {
903            // Remote echo already received. This is very unlikely.
904            trace!("Remote echo received before send-event response");
905
906            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
907
908            // If there's both the remote echo and a local echo, that means the
909            // remote echo was received before the response *and* contained no
910            // transaction ID (and thus duplicated the local echo).
911            if let Some((idx, _)) = local_echo {
912                warn!("Message echo got duplicated, removing the local one");
913                txn.items.remove(idx);
914
915                // Adjust the date dividers, if needs be.
916                let mut adjuster =
917                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
918                adjuster.run(&mut txn.items, &mut txn.meta);
919            }
920
921            txn.commit();
922            return;
923        }
924
925        // Look for the local event by the transaction ID or event ID.
926        let result = rfind_event_item(&txn.items, |it| {
927            it.transaction_id() == Some(txn_id)
928                || new_event_id.is_some()
929                    && it.event_id() == new_event_id
930                    && it.as_local().is_some()
931        });
932
933        let Some((idx, item)) = result else {
934            // Event wasn't found as a standalone item.
935            //
936            // If it was just sent, try to find if it matches a corresponding aggregation,
937            // and mark it as sent in that case.
938            if let Some(new_event_id) = new_event_id {
939                if txn.meta.aggregations.mark_aggregation_as_sent(
940                    txn_id.to_owned(),
941                    new_event_id.to_owned(),
942                    &mut txn.items,
943                    &txn.meta.room_version_rules,
944                ) {
945                    trace!("Aggregation marked as sent");
946                    txn.commit();
947                    return;
948                }
949
950                trace!("Sent aggregation was not found");
951            }
952
953            warn!("Timeline item not found, can't update send state");
954            return;
955        };
956
957        let Some(local_item) = item.as_local() else {
958            warn!("We looked for a local item, but it transitioned to remote.");
959            return;
960        };
961
962        // The event was already marked as sent, that's a broken state, let's
963        // emit an error but also override to the given sent state.
964        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
965            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
966        }
967
968        // If the event has just been marked as sent, update the aggregations mapping to
969        // take that into account.
970        if let Some(new_event_id) = new_event_id {
971            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
972        }
973
974        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
975        txn.items.replace(idx, new_item);
976
977        txn.commit();
978    }
979
980    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
981        let mut state = self.state.write().await;
982
983        if let Some((idx, _)) =
984            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
985        {
986            let mut txn = state.transaction();
987
988            txn.items.remove(idx);
989
990            // A read marker or a date divider may have been inserted before the local echo.
991            // Ensure both are up to date.
992            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
993            adjuster.run(&mut txn.items, &mut txn.meta);
994
995            txn.meta.update_read_marker(&mut txn.items);
996
997            txn.commit();
998
999            debug!("discarded local echo");
1000            return true;
1001        }
1002
1003        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1004        // dereferencing it once.
1005        let mut txn = state.transaction();
1006
1007        // Look if this was a local aggregation.
1008        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1009            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1010            &mut txn.items,
1011        ) {
1012            Ok(val) => val,
1013            Err(err) => {
1014                warn!("error when discarding local echo for an aggregation: {err}");
1015                // The aggregation has been found, it's just that we couldn't discard it.
1016                true
1017            }
1018        };
1019
1020        if found_aggregation {
1021            txn.commit();
1022        }
1023
1024        found_aggregation
1025    }
1026
1027    pub(super) async fn replace_local_echo(
1028        &self,
1029        txn_id: &TransactionId,
1030        content: AnyMessageLikeEventContent,
1031    ) -> bool {
1032        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1033            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1034            // handling RoomMessage should be sufficient in most cases. Worst
1035            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1036            // editing the event then.
1037            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1038            return false;
1039        };
1040
1041        let mut state = self.state.write().await;
1042        let mut txn = state.transaction();
1043
1044        let Some((idx, prev_item)) =
1045            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1046        else {
1047            debug!("Can't find local echo to replace");
1048            return false;
1049        };
1050
1051        // Reuse the previous local echo's state, but reset the send state to not sent
1052        // (per API contract).
1053        let ti_kind = {
1054            let Some(prev_local_item) = prev_item.as_local() else {
1055                warn!("We looked for a local item, but it transitioned as remote??");
1056                return false;
1057            };
1058            // If the local echo had an upload progress, retain it.
1059            let progress = as_variant!(&prev_local_item.send_state,
1060                EventSendState::NotSentYet { progress } => progress.clone())
1061            .flatten();
1062            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1063        };
1064
1065        // Replace the local-related state (kind) and the content state.
1066        let new_item = TimelineItem::new(
1067            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1068                content.msgtype,
1069                content.mentions,
1070                prev_item.content().reactions().cloned().unwrap_or_default(),
1071                prev_item.content().thread_root(),
1072                prev_item.content().in_reply_to(),
1073                prev_item.content().thread_summary(),
1074            )),
1075            prev_item.internal_id.to_owned(),
1076        );
1077
1078        txn.items.replace(idx, new_item);
1079
1080        // This doesn't change the original sending time, so there's no need to adjust
1081        // date dividers.
1082
1083        txn.commit();
1084
1085        debug!("Replaced local echo");
1086        true
1087    }
1088
1089    pub(super) async fn compute_redecryption_candidates(
1090        &self,
1091    ) -> (BTreeSet<String>, BTreeSet<String>) {
1092        let state = self.state.read().await;
1093        compute_redecryption_candidates(&state.items)
1094    }
1095
1096    pub(super) async fn set_sender_profiles_pending(&self) {
1097        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1098    }
1099
1100    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1101        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1102    }
1103
1104    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1105        self.state.write().await.items.for_each(|mut entry| {
1106            let Some(event_item) = entry.as_event() else { return };
1107            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1108                let new_item = entry.with_kind(TimelineItemKind::Event(
1109                    event_item.with_sender_profile(profile_state.clone()),
1110                ));
1111                ObservableItemsEntry::replace(&mut entry, new_item);
1112            }
1113        });
1114    }
1115
1116    pub(super) async fn update_missing_sender_profiles(&self) {
1117        trace!("Updating missing sender profiles");
1118
1119        let mut state = self.state.write().await;
1120        let mut entries = state.items.entries();
1121        while let Some(mut entry) = entries.next() {
1122            let Some(event_item) = entry.as_event() else { continue };
1123            let event_id = event_item.event_id().map(debug);
1124            let transaction_id = event_item.transaction_id().map(debug);
1125
1126            if event_item.sender_profile().is_ready() {
1127                trace!(event_id, transaction_id, "Profile already set");
1128                continue;
1129            }
1130
1131            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1132                Some(profile) => {
1133                    trace!(event_id, transaction_id, "Adding profile");
1134                    let updated_item =
1135                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1136                    let new_item = entry.with_kind(updated_item);
1137                    ObservableItemsEntry::replace(&mut entry, new_item);
1138                }
1139                None => {
1140                    if !event_item.sender_profile().is_unavailable() {
1141                        trace!(event_id, transaction_id, "Marking profile unavailable");
1142                        let updated_item =
1143                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1144                        let new_item = entry.with_kind(updated_item);
1145                        ObservableItemsEntry::replace(&mut entry, new_item);
1146                    } else {
1147                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1148                    }
1149                }
1150            }
1151        }
1152
1153        trace!("Done updating missing sender profiles");
1154    }
1155
1156    /// Update the profiles of the given senders, even if they are ready.
1157    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1158        trace!("Forcing update of sender profiles: {sender_ids:?}");
1159
1160        let mut state = self.state.write().await;
1161        let mut entries = state.items.entries();
1162        while let Some(mut entry) = entries.next() {
1163            let Some(event_item) = entry.as_event() else { continue };
1164            if !sender_ids.contains(event_item.sender()) {
1165                continue;
1166            }
1167
1168            let event_id = event_item.event_id().map(debug);
1169            let transaction_id = event_item.transaction_id().map(debug);
1170
1171            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1172                Some(profile) => {
1173                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1174                    {
1175                        debug!(event_id, transaction_id, "Profile already up-to-date");
1176                    } else {
1177                        trace!(event_id, transaction_id, "Updating profile");
1178                        let updated_item =
1179                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1180                        let new_item = entry.with_kind(updated_item);
1181                        ObservableItemsEntry::replace(&mut entry, new_item);
1182                    }
1183                }
1184                None => {
1185                    if !event_item.sender_profile().is_unavailable() {
1186                        trace!(event_id, transaction_id, "Marking profile unavailable");
1187                        let updated_item =
1188                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1189                        let new_item = entry.with_kind(updated_item);
1190                        ObservableItemsEntry::replace(&mut entry, new_item);
1191                    } else {
1192                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1193                    }
1194                }
1195            }
1196        }
1197
1198        trace!("Done forcing update of sender profiles");
1199    }
1200
1201    #[cfg(test)]
1202    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1203        let own_user_id = self.room_data_provider.own_user_id();
1204        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1205    }
1206
1207    /// Get the latest read receipt for the given user.
1208    ///
1209    /// Useful to get the latest read receipt, whether it's private or public.
1210    pub(super) async fn latest_user_read_receipt(
1211        &self,
1212        user_id: &UserId,
1213    ) -> Option<(OwnedEventId, Receipt)> {
1214        let receipt_thread = self.focus.receipt_thread();
1215
1216        self.state
1217            .read()
1218            .await
1219            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1220            .await
1221    }
1222
1223    /// Get the ID of the timeline event with the latest read receipt for the
1224    /// given user.
1225    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1226        &self,
1227        user_id: &UserId,
1228    ) -> Option<OwnedEventId> {
1229        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1230    }
1231
1232    /// Subscribe to changes in the read receipts of our own user.
1233    pub async fn subscribe_own_user_read_receipts_changed(
1234        &self,
1235    ) -> impl Stream<Item = ()> + use<P> {
1236        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1237    }
1238
1239    /// Handle a room send update that's a new local echo.
1240    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1241        match echo.content {
1242            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1243                let content = match serialized_event.deserialize() {
1244                    Ok(d) => d,
1245                    Err(err) => {
1246                        warn!("error deserializing local echo: {err}");
1247                        return;
1248                    }
1249                };
1250
1251                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1252                    .await;
1253
1254                if let Some(send_error) = send_error {
1255                    self.update_event_send_state(
1256                        &echo.transaction_id,
1257                        EventSendState::SendingFailed {
1258                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1259                                send_error,
1260                            ))),
1261                            is_recoverable: false,
1262                        },
1263                    )
1264                    .await;
1265                }
1266            }
1267
1268            LocalEchoContent::React { key, send_handle, applies_to } => {
1269                self.handle_local_reaction(key, send_handle, applies_to).await;
1270            }
1271        }
1272    }
1273
1274    /// Adds a reaction (local echo) to a local echo.
1275    #[instrument(skip(self, send_handle))]
1276    async fn handle_local_reaction(
1277        &self,
1278        reaction_key: String,
1279        send_handle: SendReactionHandle,
1280        applies_to: OwnedTransactionId,
1281    ) {
1282        let mut state = self.state.write().await;
1283        let mut tr = state.transaction();
1284
1285        let target = TimelineEventItemId::TransactionId(applies_to);
1286
1287        let reaction_txn_id = send_handle.transaction_id().to_owned();
1288        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1289        let aggregation = Aggregation::new(
1290            TimelineEventItemId::TransactionId(reaction_txn_id),
1291            AggregationKind::Reaction {
1292                key: reaction_key.clone(),
1293                sender: self.room_data_provider.own_user_id().to_owned(),
1294                timestamp: MilliSecondsSinceUnixEpoch::now(),
1295                reaction_status,
1296            },
1297        );
1298
1299        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1300        find_item_and_apply_aggregation(
1301            &tr.meta.aggregations,
1302            &mut tr.items,
1303            &target,
1304            aggregation,
1305            &tr.meta.room_version_rules,
1306        );
1307
1308        tr.commit();
1309    }
1310
1311    /// Handle a single room send queue update.
1312    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1313        match update {
1314            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1315                self.handle_local_echo(echo).await;
1316            }
1317
1318            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1319                if !self.discard_local_echo(&transaction_id).await {
1320                    warn!("couldn't find the local echo to discard");
1321                }
1322            }
1323
1324            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1325                let content = match new_content.deserialize() {
1326                    Ok(d) => d,
1327                    Err(err) => {
1328                        warn!("error deserializing local echo (upon edit): {err}");
1329                        return;
1330                    }
1331                };
1332
1333                if !self.replace_local_echo(&transaction_id, content).await {
1334                    warn!("couldn't find the local echo to replace");
1335                }
1336            }
1337
1338            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1339                self.update_event_send_state(
1340                    &transaction_id,
1341                    EventSendState::SendingFailed { error, is_recoverable },
1342                )
1343                .await;
1344            }
1345
1346            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1347                self.update_event_send_state(
1348                    &transaction_id,
1349                    EventSendState::NotSentYet { progress: None },
1350                )
1351                .await;
1352            }
1353
1354            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1355                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1356                    .await;
1357            }
1358
1359            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1360                self.update_event_send_state(
1361                    &related_to,
1362                    EventSendState::NotSentYet {
1363                        progress: Some(MediaUploadProgress { index, progress }),
1364                    },
1365                )
1366                .await;
1367            }
1368        }
1369    }
1370
1371    /// Insert a timeline start item at the beginning of the room, if it's
1372    /// missing.
1373    pub async fn insert_timeline_start_if_missing(&self) {
1374        let mut state = self.state.write().await;
1375        let mut txn = state.transaction();
1376        txn.items.push_timeline_start_if_missing(
1377            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1378        );
1379        txn.commit();
1380    }
1381
1382    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1383    /// timeline or not.
1384    ///
1385    /// Can be `None` if the event cannot be represented as a standalone item,
1386    /// because it's an aggregation.
1387    pub(super) async fn make_replied_to(
1388        &self,
1389        event: TimelineEvent,
1390    ) -> Result<Option<EmbeddedEvent>, Error> {
1391        let state = self.state.read().await;
1392        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1393    }
1394}
1395
1396impl TimelineController {
1397    pub(super) fn room(&self) -> &Room {
1398        &self.room_data_provider
1399    }
1400
1401    /// Given an event identifier, will fetch the details for the event it's
1402    /// replying to, if applicable.
1403    #[instrument(skip(self))]
1404    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1405        let state_guard = self.state.write().await;
1406        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1407            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1408        let remote_item = item
1409            .as_remote()
1410            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1411            .clone();
1412
1413        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1414            debug!("Event is not a message");
1415            return Ok(());
1416        };
1417        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1418            debug!("Event is not a reply");
1419            return Ok(());
1420        };
1421        if let TimelineDetails::Pending = &in_reply_to.event {
1422            debug!("Replied-to event is already being fetched");
1423            return Ok(());
1424        }
1425        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1426            debug!("Replied-to event has already been fetched");
1427            return Ok(());
1428        }
1429
1430        let internal_id = item.internal_id.to_owned();
1431        let item = item.clone();
1432        let event = fetch_replied_to_event(
1433            state_guard,
1434            &self.state,
1435            index,
1436            &item,
1437            internal_id,
1438            &msglike,
1439            &in_reply_to.event_id,
1440            self.room(),
1441        )
1442        .await?;
1443
1444        // We need to be sure to have the latest position of the event as it might have
1445        // changed while waiting for the request.
1446        let mut state = self.state.write().await;
1447        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1448            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1449
1450        // Check the state of the event again, it might have been redacted while
1451        // the request was in-flight.
1452        let TimelineItemContent::MsgLike(MsgLikeContent {
1453            kind: MsgLikeKind::Message(message),
1454            reactions,
1455            thread_root,
1456            in_reply_to,
1457            thread_summary,
1458        }) = item.content().clone()
1459        else {
1460            info!("Event is no longer a message (redacted?)");
1461            return Ok(());
1462        };
1463        let Some(in_reply_to) = in_reply_to else {
1464            warn!("Event no longer has a reply (bug?)");
1465            return Ok(());
1466        };
1467
1468        // Now that we've received the content of the replied-to event, replace the
1469        // replied-to content in the item with it.
1470        trace!("Updating in-reply-to details");
1471        let internal_id = item.internal_id.to_owned();
1472        let mut item = item.clone();
1473        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1474            kind: MsgLikeKind::Message(message),
1475            reactions,
1476            thread_root,
1477            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1478            thread_summary,
1479        }));
1480        state.items.replace(index, TimelineItem::new(item, internal_id));
1481
1482        Ok(())
1483    }
1484
1485    /// Returns the thread that should be used for a read receipt based on the
1486    /// current focus of the timeline and the receipt type.
1487    ///
1488    /// A `SendReceiptType::FullyRead` will always use
1489    /// `ReceiptThread::Unthreaded`
1490    pub(super) fn infer_thread_for_read_receipt(
1491        &self,
1492        receipt_type: &SendReceiptType,
1493    ) -> ReceiptThread {
1494        if matches!(receipt_type, SendReceiptType::FullyRead) {
1495            ReceiptThread::Unthreaded
1496        } else {
1497            self.focus.receipt_thread()
1498        }
1499    }
1500
1501    /// Check whether the given receipt should be sent.
1502    ///
1503    /// Returns `false` if the given receipt is older than the current one.
1504    pub(super) async fn should_send_receipt(
1505        &self,
1506        receipt_type: &SendReceiptType,
1507        receipt_thread: &ReceiptThread,
1508        event_id: &EventId,
1509    ) -> bool {
1510        let own_user_id = self.room().own_user_id();
1511        let state = self.state.read().await;
1512        let room = self.room();
1513
1514        match receipt_type {
1515            SendReceiptType::Read => {
1516                if let Some((old_pub_read, _)) = state
1517                    .meta
1518                    .user_receipt(
1519                        own_user_id,
1520                        ReceiptType::Read,
1521                        receipt_thread.clone(),
1522                        room,
1523                        state.items.all_remote_events(),
1524                    )
1525                    .await
1526                {
1527                    trace!(%old_pub_read, "found a previous public receipt");
1528                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1529                        &old_pub_read,
1530                        event_id,
1531                        state.items.all_remote_events(),
1532                    ) {
1533                        trace!(
1534                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1535                        );
1536                        return relative_pos == RelativePosition::After;
1537                    }
1538                }
1539            }
1540
1541            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1542            // doesn't make sense to have a private read receipt behind a public one.
1543            SendReceiptType::ReadPrivate => {
1544                if let Some((old_priv_read, _)) =
1545                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1546                {
1547                    trace!(%old_priv_read, "found a previous private receipt");
1548                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1549                        &old_priv_read,
1550                        event_id,
1551                        state.items.all_remote_events(),
1552                    ) {
1553                        trace!(
1554                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1555                        );
1556                        return relative_pos == RelativePosition::After;
1557                    }
1558                }
1559            }
1560
1561            SendReceiptType::FullyRead => {
1562                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1563                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1564                        &prev_event_id,
1565                        event_id,
1566                        state.items.all_remote_events(),
1567                    )
1568                {
1569                    return relative_pos == RelativePosition::After;
1570                }
1571            }
1572
1573            _ => {}
1574        }
1575
1576        // Let the server handle unknown receipts.
1577        true
1578    }
1579
1580    /// Returns the latest event identifier, even if it's not visible, or if
1581    /// it's folded into another timeline item.
1582    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1583        let state = self.state.read().await;
1584        let filter_out_thread_events = match self.focus() {
1585            TimelineFocusKind::Thread { .. } => false,
1586            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
1587            TimelineFocusKind::Event { .. } => {
1588                // For event-focused timelines, filtering is handled in the event cache layer.
1589                false
1590            }
1591            TimelineFocusKind::PinnedEvents => true,
1592        };
1593
1594        state
1595            .items
1596            .all_remote_events()
1597            .iter()
1598            .rev()
1599            .filter_map(|event_meta| {
1600                if !filter_out_thread_events {
1601                    // For an unthreaded timeline, the last event is always the latest event.
1602                    Some(event_meta.event_id.clone())
1603                } else if event_meta.thread_root_id.is_none() {
1604                    // For the main-thread timeline, only non-threaded events are valid candidates
1605                    // for the latest event.
1606                    //
1607                    // But! An event could be an aggregation that relate to an in-thread
1608                    // event. In this case, it's not a valid latest event.
1609                    if let Some(TimelineEventItemId::EventId(target_event_id)) =
1610                        state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1611                            event_meta.event_id.clone(),
1612                        ))
1613                        && let Some(target_meta) =
1614                            state.items.all_remote_events().get_by_event_id(target_event_id)
1615                        && target_meta.thread_root_id.is_some()
1616                    {
1617                        // This event is an aggregation of an in-thread event, so skip it.
1618                        None
1619                    } else {
1620                        // Not in a thread, and not the aggregation of an in-thread event, so it's
1621                        // a valid candidate for the latest event.
1622                        Some(event_meta.event_id.clone())
1623                    }
1624                } else {
1625                    // An in-thread event, when we're filtering out threaded events, is never a
1626                    // valid candidate for the latest event.
1627                    None
1628                }
1629            })
1630            .next()
1631    }
1632
1633    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1634    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1635        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1636
1637        let request = DecryptionRetryRequest {
1638            room_id: self.room().room_id().to_owned(),
1639            utd_session_ids: utds,
1640            refresh_info_session_ids: decrypted,
1641        };
1642
1643        self.room().client().event_cache().request_decryption(request);
1644    }
1645
1646    /// Combine the global (event cache) pagination status with the local state
1647    /// of the timeline.
1648    ///
1649    /// This only changes the global pagination status of this room, in one
1650    /// case: if the timeline has a skip count greater than 0, it will
1651    /// ensure that the pagination status says that we haven't reached the
1652    /// timeline start yet.
1653    pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1654        match status {
1655            PaginationStatus::Idle { hit_timeline_start } => {
1656                if hit_timeline_start {
1657                    let state = self.state.read().await;
1658                    // If the skip count is greater than 0, it means that a subsequent pagination
1659                    // could return more items, so pretend we didn't get the information that the
1660                    // timeline start was hit.
1661                    if state.meta.subscriber_skip_count.get() > 0 {
1662                        return PaginationStatus::Idle { hit_timeline_start: false };
1663                    }
1664                }
1665            }
1666            PaginationStatus::Paginating => {}
1667        }
1668
1669        // You're perfect, just the way you are.
1670        status
1671    }
1672}
1673
1674impl<P: RoomDataProvider> TimelineController<P> {
1675    /// Returns the timeline focus of the [`TimelineController`].
1676    pub(super) fn focus(&self) -> &TimelineFocusKind {
1677        &self.focus
1678    }
1679}
1680
1681#[allow(clippy::too_many_arguments)]
1682async fn fetch_replied_to_event<P: RoomDataProvider>(
1683    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1684    state_lock: &RwLock<TimelineState<P>>,
1685    index: usize,
1686    item: &EventTimelineItem,
1687    internal_id: TimelineUniqueId,
1688    msglike: &MsgLikeContent,
1689    in_reply_to: &EventId,
1690    room: &Room,
1691) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1692    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1693        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1694        trace!("Found replied-to event locally");
1695        return Ok(details);
1696    }
1697
1698    // Replace the item with a new timeline item that has the fetching status of the
1699    // replied-to event to pending.
1700    trace!("Setting in-reply-to details to pending");
1701    let in_reply_to_details =
1702        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1703
1704    let event_item = item
1705        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1706
1707    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1708    state_guard.items.replace(index, new_timeline_item);
1709
1710    // Don't hold the state lock while the network request is made.
1711    drop(state_guard);
1712
1713    trace!("Fetching replied-to event");
1714    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1715        Ok(timeline_event) => {
1716            let state = state_lock.read().await;
1717
1718            let replied_to_item =
1719                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1720
1721            if let Some(item) = replied_to_item {
1722                TimelineDetails::Ready(Box::new(item))
1723            } else {
1724                // The replied-to item is an aggregation, not a standalone item.
1725                return Err(Error::UnsupportedEvent);
1726            }
1727        }
1728
1729        Err(e) => TimelineDetails::Error(Arc::new(e)),
1730    };
1731
1732    Ok(res)
1733}