matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25    config::RequestConfig,
26    deserialized_responses::TimelineEvent,
27    event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
28    paginators::{PaginationResult, PaginationToken, Paginator},
29    send_queue::{
30        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31    },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40        AnySyncTimelineEvent, MessageLikeEventType,
41        poll::unstable_start::UnstablePollStartEventContent,
42        reaction::ReactionEventContent,
43        receipt::{Receipt, ReceiptThread, ReceiptType},
44        relation::{Annotation, RelationType},
45        room::message::{MessageType, Relation},
46    },
47    room_version_rules::RoomVersionRules,
48    serde::Raw,
49};
50use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54    metadata::{RelativePosition, TimelineMetadata},
55    observable_items::{
56        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57        ObservableItemsTransactionEntry,
58    },
59    state::TimelineState,
60    state_transaction::TimelineStateTransaction,
61};
62use super::{
63    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
66    TimelineReadReceiptTracking, VirtualTimelineItem,
67    algorithms::{rfind_event_by_id, rfind_event_item},
68    event_item::{ReactionStatus, RemoteEventOrigin},
69    item::TimelineUniqueId,
70    subscriber::TimelineSubscriber,
71    traits::RoomDataProvider,
72};
73use crate::{
74    timeline::{
75        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
76        algorithms::rfind_event_by_item_id,
77        controller::decryption_retry_task::compute_redecryption_candidates,
78        date_dividers::DateDividerAdjuster,
79        event_item::TimelineItemHandle,
80        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
81    },
82    unable_to_decrypt_hook::UtdHookManager,
83};
84
85pub(in crate::timeline) mod aggregations;
86mod decryption_retry_task;
87mod metadata;
88mod observable_items;
89mod read_receipts;
90mod state;
91mod state_transaction;
92
93pub(super) use aggregations::*;
94pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
95use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
96use matrix_sdk_common::serde_helpers::extract_thread_root;
97
98/// Data associated to the current timeline focus.
99///
100/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
101/// version of it, including extra state that makes it useful over the lifetime
102/// of a timeline.
103#[derive(Debug)]
104pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
105    /// The timeline receives live events from the sync.
106    Live {
107        /// Whether to hide in-thread events from the timeline.
108        hide_threaded_events: bool,
109    },
110
111    /// The timeline is focused on a single event, and it can expand in one
112    /// direction or another.
113    Event {
114        /// The paginator instance.
115        paginator: OnceCell<AnyPaginator<P>>,
116    },
117
118    /// A live timeline for a thread.
119    Thread {
120        /// The root event for the current thread.
121        root_event_id: OwnedEventId,
122    },
123
124    PinnedEvents {
125        loader: PinnedEventsLoader,
126    },
127}
128
129#[derive(Debug)]
130pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
131    Unthreaded {
132        /// The actual event paginator.
133        paginator: Paginator<P>,
134        /// Whether to hide in-thread events from the timeline.
135        hide_threaded_events: bool,
136    },
137    Threaded(ThreadedEventsLoader<P>),
138}
139
140impl<P: RoomDataProvider> AnyPaginator<P> {
141    /// Runs a backward pagination (requesting `num_events` to the server), from
142    /// the current state of the object.
143    ///
144    /// Will return immediately if we have already hit the start of the
145    /// timeline.
146    ///
147    /// May return an error if it's already paginating, or if the call to
148    /// the homeserver endpoints failed.
149    pub async fn paginate_backwards(
150        &self,
151        num_events: u16,
152    ) -> Result<PaginationResult, PaginatorError> {
153        match self {
154            Self::Unthreaded { paginator, .. } => {
155                paginator.paginate_backward(num_events.into()).await
156            }
157            Self::Threaded(threaded_paginator) => {
158                threaded_paginator.paginate_backwards(num_events.into()).await
159            }
160        }
161    }
162
163    /// Runs a forward pagination (requesting `num_events` to the server), from
164    /// the current state of the object.
165    ///
166    /// Will return immediately if we have already hit the end of the timeline.
167    ///
168    /// May return an error if it's already paginating, or if the call to
169    /// the homeserver endpoints failed.
170    pub async fn paginate_forwards(
171        &self,
172        num_events: u16,
173    ) -> Result<PaginationResult, PaginatorError> {
174        match self {
175            Self::Unthreaded { paginator, .. } => {
176                paginator.paginate_forward(num_events.into()).await
177            }
178            Self::Threaded(threaded_paginator) => {
179                threaded_paginator.paginate_forwards(num_events.into()).await
180            }
181        }
182    }
183
184    /// Whether to hide in-thread events from the timeline.
185    pub fn hide_threaded_events(&self) -> bool {
186        match self {
187            Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
188            Self::Threaded(_) => false,
189        }
190    }
191
192    /// Returns the root event id of the thread, if the paginator is
193    /// [`AnyPaginator::Threaded`].
194    pub fn thread_root(&self) -> Option<&EventId> {
195        match self {
196            Self::Unthreaded { .. } => None,
197            Self::Threaded(thread_events_loader) => {
198                Some(thread_events_loader.thread_root_event_id())
199            }
200        }
201    }
202}
203
204impl<P: RoomDataProvider> TimelineFocusKind<P> {
205    /// Returns the [`ReceiptThread`] that should be used for the current
206    /// timeline focus.
207    ///
208    /// Live and event timelines will use the unthreaded read receipt type in
209    /// general, unless they hide in-thread events, in which case they will
210    /// use the main thread.
211    pub(super) fn receipt_thread(&self) -> ReceiptThread {
212        if let Some(thread_root) = self.thread_root() {
213            ReceiptThread::Thread(thread_root.to_owned())
214        } else if self.hide_threaded_events() {
215            ReceiptThread::Main
216        } else {
217            ReceiptThread::Unthreaded
218        }
219    }
220
221    /// Whether to hide in-thread events from the timeline.
222    fn hide_threaded_events(&self) -> bool {
223        match self {
224            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
225            TimelineFocusKind::Event { paginator } => {
226                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
227            }
228            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
229        }
230    }
231
232    /// Whether the focus is on a thread (from a live thread or a thread
233    /// permalink).
234    fn is_thread(&self) -> bool {
235        self.thread_root().is_some()
236    }
237
238    /// If the focus is a thread, returns its root event ID.
239    fn thread_root(&self) -> Option<&EventId> {
240        match self {
241            TimelineFocusKind::Event { paginator, .. } => {
242                paginator.get().and_then(|paginator| paginator.thread_root())
243            }
244            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
245            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
246        }
247    }
248}
249
250#[derive(Clone, Debug)]
251pub(super) struct TimelineController<P: RoomDataProvider = Room> {
252    /// Inner mutable state.
253    state: Arc<RwLock<TimelineState<P>>>,
254
255    /// Focus data.
256    focus: Arc<TimelineFocusKind<P>>,
257
258    /// A [`RoomDataProvider`] implementation, providing data.
259    ///
260    /// The type is a `RoomDataProvider` to allow testing. In the real world,
261    /// this would normally be a [`Room`].
262    pub(crate) room_data_provider: P,
263
264    /// Settings applied to this timeline.
265    pub(super) settings: TimelineSettings,
266}
267
268#[derive(Clone)]
269pub(super) struct TimelineSettings {
270    /// Should the read receipts and read markers be handled and on which event
271    /// types?
272    pub(super) track_read_receipts: TimelineReadReceiptTracking,
273
274    /// Event filter that controls what's rendered as a timeline item (and thus
275    /// what can carry read receipts).
276    pub(super) event_filter: Arc<TimelineEventFilterFn>,
277
278    /// Are unparsable events added as timeline items of their own kind?
279    pub(super) add_failed_to_parse: bool,
280
281    /// Should the timeline items be grouped by day or month?
282    pub(super) date_divider_mode: DateDividerMode,
283}
284
285#[cfg(not(tarpaulin_include))]
286impl fmt::Debug for TimelineSettings {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        f.debug_struct("TimelineSettings")
289            .field("track_read_receipts", &self.track_read_receipts)
290            .field("add_failed_to_parse", &self.add_failed_to_parse)
291            .finish_non_exhaustive()
292    }
293}
294
295impl Default for TimelineSettings {
296    fn default() -> Self {
297        Self {
298            track_read_receipts: TimelineReadReceiptTracking::Disabled,
299            event_filter: Arc::new(default_event_filter),
300            add_failed_to_parse: true,
301            date_divider_mode: DateDividerMode::Daily,
302        }
303    }
304}
305
306/// The default event filter for
307/// [`crate::timeline::TimelineBuilder::event_filter`].
308///
309/// It filters out events that are not rendered by the timeline, including but
310/// not limited to: reactions, edits, redactions on existing messages.
311///
312/// If you have a custom filter, it may be best to chain yours with this one if
313/// you do not want to run into situations where a read receipt is not visible
314/// because it's living on an event that doesn't have a matching timeline item.
315pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
316    match event {
317        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
318            if ev.redacts(&rules.redaction).is_some() {
319                // This is a redaction of an existing message, we'll only update the previous
320                // message and not render a new entry.
321                false
322            } else {
323                // This is a redacted entry, that we'll show only if the redacted entity wasn't
324                // a reaction.
325                ev.event_type() != MessageLikeEventType::Reaction
326            }
327        }
328
329        AnySyncTimelineEvent::MessageLike(msg) => {
330            match msg.original_content() {
331                None => {
332                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
333                    // a reaction.
334                    msg.event_type() != MessageLikeEventType::Reaction
335                }
336
337                Some(original_content) => {
338                    match original_content {
339                        AnyMessageLikeEventContent::RoomMessage(content) => {
340                            if content
341                                .relates_to
342                                .as_ref()
343                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
344                            {
345                                // Edits aren't visible by default.
346                                return false;
347                            }
348
349                            match content.msgtype {
350                                MessageType::Audio(_)
351                                | MessageType::Emote(_)
352                                | MessageType::File(_)
353                                | MessageType::Image(_)
354                                | MessageType::Location(_)
355                                | MessageType::Notice(_)
356                                | MessageType::ServerNotice(_)
357                                | MessageType::Text(_)
358                                | MessageType::Video(_)
359                                | MessageType::VerificationRequest(_) => true,
360                                #[cfg(feature = "unstable-msc4274")]
361                                MessageType::Gallery(_) => true,
362                                _ => false,
363                            }
364                        }
365
366                        AnyMessageLikeEventContent::Sticker(_)
367                        | AnyMessageLikeEventContent::UnstablePollStart(
368                            UnstablePollStartEventContent::New(_),
369                        )
370                        | AnyMessageLikeEventContent::CallInvite(_)
371                        | AnyMessageLikeEventContent::RtcNotification(_)
372                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
373
374                        _ => false,
375                    }
376                }
377            }
378        }
379
380        AnySyncTimelineEvent::State(_) => {
381            // All the state events may get displayed by default.
382            true
383        }
384    }
385}
386
387impl<P: RoomDataProvider> TimelineController<P> {
388    pub(super) fn new(
389        room_data_provider: P,
390        focus: TimelineFocus,
391        internal_id_prefix: Option<String>,
392        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
393        is_room_encrypted: bool,
394        settings: TimelineSettings,
395    ) -> Self {
396        let focus = match focus {
397            TimelineFocus::Live { hide_threaded_events } => {
398                TimelineFocusKind::Live { hide_threaded_events }
399            }
400
401            TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
402
403            TimelineFocus::Thread { root_event_id, .. } => {
404                TimelineFocusKind::Thread { root_event_id }
405            }
406
407            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
408                TimelineFocusKind::PinnedEvents {
409                    loader: PinnedEventsLoader::new(
410                        Arc::new(room_data_provider.clone()),
411                        max_events_to_load as usize,
412                        max_concurrent_requests as usize,
413                    ),
414                }
415            }
416        };
417
418        let focus = Arc::new(focus);
419        let state = Arc::new(RwLock::new(TimelineState::new(
420            focus.clone(),
421            room_data_provider.own_user_id().to_owned(),
422            room_data_provider.room_version_rules(),
423            internal_id_prefix,
424            unable_to_decrypt_hook,
425            is_room_encrypted,
426        )));
427
428        Self { state, focus, room_data_provider, settings }
429    }
430
431    /// Initializes the configured focus with appropriate data.
432    ///
433    /// Should be called only once after creation of the [`TimelineInner`], with
434    /// all its fields set.
435    ///
436    /// Returns whether there were any events added to the timeline.
437    pub(super) async fn init_focus(
438        &self,
439        focus: &TimelineFocus,
440        room_event_cache: &RoomEventCache,
441    ) -> Result<bool, Error> {
442        match focus {
443            TimelineFocus::Live { .. } => {
444                // Retrieve the cached events, and add them to the timeline.
445                let events = room_event_cache.events().await?;
446
447                let has_events = !events.is_empty();
448
449                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
450
451                match room_event_cache.pagination().status().get() {
452                    RoomPaginationStatus::Idle { hit_timeline_start } => {
453                        if hit_timeline_start {
454                            // Eagerly insert the timeline start item, since pagination claims
455                            // we've already hit the timeline start.
456                            self.insert_timeline_start_if_missing().await;
457                        }
458                    }
459                    RoomPaginationStatus::Paginating => {}
460                }
461
462                Ok(has_events)
463            }
464
465            TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
466                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
467                    // NOTE: this is sync'd with code in the ctor.
468                    unreachable!();
469                };
470
471                let event_paginator = Paginator::new(self.room_data_provider.clone());
472
473                let load_events_with_context = || async {
474                    // Start a /context request to load the focused event and surrounding events.
475                    event_paginator
476                        .start_from(event_id, (*num_context_events).into())
477                        .await
478                        .map(|r| r.events)
479                        .map_err(PaginationError::Paginator)
480                };
481
482                let events = if *num_context_events == 0 {
483                    // If no context is requested, try to load the event from the cache first and
484                    // include common relations such as reactions and edits.
485                    let request_config = Some(RequestConfig::default().retry_limit(3));
486                    let relations_filter =
487                        Some(vec![RelationType::Annotation, RelationType::Replacement]);
488
489                    // Load the event from the cache or, failing that, the server.
490                    match self
491                        .room_data_provider
492                        .load_event_with_relations(event_id, request_config, relations_filter)
493                        .await
494                    {
495                        Ok((event, related_events)) => {
496                            let mut events = vec![event];
497                            events.extend(related_events);
498                            events
499                        }
500                        Err(err) => {
501                            error!("error when loading focussed event: {err}");
502                            // Fall back to load the focused event using /context.
503                            load_events_with_context().await?
504                        }
505                    }
506                } else {
507                    // Start a /context request to load the focussed event and surrounding events.
508                    load_events_with_context().await?
509                };
510
511                // Find the target event, and see if it's part of a thread.
512                let extracted_thread_root = events
513                    .iter()
514                    .find(
515                        |event| {
516                            if let Some(id) = event.event_id() { id == *event_id } else { false }
517                        },
518                    )
519                    .and_then(|event| extract_thread_root(event.raw()));
520
521                // Determine the timeline's threading behavior.
522                let (thread_root_event_id, hide_threaded_events) = match thread_mode {
523                    TimelineEventFocusThreadMode::ForceThread => {
524                        // If the event is part of a thread, use its thread root. Otherwise,
525                        // assume the event itself is the thread root.
526                        (extracted_thread_root.or_else(|| Some(event_id.clone())), false)
527                    }
528                    TimelineEventFocusThreadMode::Automatic { hide_threaded_events } => {
529                        (extracted_thread_root, *hide_threaded_events)
530                    }
531                };
532
533                let _ = paginator.set(match thread_root_event_id {
534                    Some(root_id) => {
535                        let mut tokens = event_paginator.tokens();
536
537                        // Look if the thread root event is part of the /context response. This
538                        // allows us to spare some backwards pagination with
539                        // /relations.
540                        let includes_root_event = events.iter().any(|event| {
541                            if let Some(id) = event.event_id() { id == root_id } else { false }
542                        });
543
544                        if includes_root_event {
545                            // If we have the root event, there's no need to do back-paginations
546                            // with /relations, since we are at the start of the thread.
547                            tokens.previous = PaginationToken::HitEnd;
548                        }
549
550                        AnyPaginator::Threaded(ThreadedEventsLoader::new(
551                            self.room_data_provider.clone(),
552                            root_id,
553                            tokens,
554                        ))
555                    }
556
557                    None => AnyPaginator::Unthreaded {
558                        paginator: event_paginator,
559                        hide_threaded_events,
560                    },
561                });
562
563                let has_events = !events.is_empty();
564
565                match paginator.get().expect("Paginator was not instantiated") {
566                    AnyPaginator::Unthreaded { .. } => {
567                        self.replace_with_initial_remote_events(
568                            events,
569                            RemoteEventOrigin::Pagination,
570                        )
571                        .await;
572                    }
573
574                    AnyPaginator::Threaded(threaded_events_loader) => {
575                        // We filter only events that are part of the thread (including the root),
576                        // since /context will return adjacent events without filters.
577                        let thread_root = threaded_events_loader.thread_root_event_id();
578                        let events_in_thread = events.into_iter().filter(|event| {
579                            extract_thread_root(event.raw())
580                                .is_some_and(|event_thread_root| event_thread_root == thread_root)
581                                || event.event_id().as_deref() == Some(thread_root)
582                        });
583
584                        self.replace_with_initial_remote_events(
585                            events_in_thread,
586                            RemoteEventOrigin::Pagination,
587                        )
588                        .await;
589                    }
590                }
591
592                Ok(has_events)
593            }
594
595            TimelineFocus::Thread { root_event_id, .. } => {
596                let (events, _) =
597                    room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
598                let has_events = !events.is_empty();
599
600                // For each event, we also need to find the related events, as they don't
601                // include the thread relationship, they won't be included in
602                // the initial list of events.
603                let mut related_events = Vector::new();
604                for event_id in events.iter().filter_map(|event| event.event_id()) {
605                    if let Some((_original, related)) =
606                        room_event_cache.find_event_with_relations(&event_id, None).await?
607                    {
608                        related_events.extend(related);
609                    }
610                }
611
612                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
613
614                // Now that we've inserted the thread events, add the aggregations too.
615                if !related_events.is_empty() {
616                    self.handle_remote_aggregations(
617                        vec![VectorDiff::Append { values: related_events }],
618                        RemoteEventOrigin::Cache,
619                    )
620                    .await;
621                }
622
623                Ok(has_events)
624            }
625
626            TimelineFocus::PinnedEvents { .. } => {
627                let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
628                    // NOTE: this is sync'd with code in the ctor.
629                    unreachable!();
630                };
631
632                let Some(loaded_events) =
633                    loader.load_events().await.map_err(Error::PinnedEventsError)?
634                else {
635                    // There wasn't any events.
636                    return Ok(false);
637                };
638
639                let has_events = !loaded_events.is_empty();
640
641                self.replace_with_initial_remote_events(
642                    loaded_events,
643                    RemoteEventOrigin::Pagination,
644                )
645                .await;
646
647                Ok(has_events)
648            }
649        }
650    }
651
652    /// Listens to encryption state changes for the room in
653    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
654    /// existing timeline items. This will then cause a refresh of those
655    /// timeline items.
656    pub async fn handle_encryption_state_changes(&self) {
657        let mut room_info = self.room_data_provider.room_info();
658
659        // Small function helper to help mark as encrypted.
660        let mark_encrypted = || async {
661            let mut state = self.state.write().await;
662            state.meta.is_room_encrypted = true;
663            state.mark_all_events_as_encrypted();
664        };
665
666        if room_info.get().encryption_state().is_encrypted() {
667            // If the room was already encrypted, it won't toggle to unencrypted, so we can
668            // shut down this task early.
669            mark_encrypted().await;
670            return;
671        }
672
673        while let Some(info) = room_info.next().await {
674            if info.encryption_state().is_encrypted() {
675                mark_encrypted().await;
676                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
677                // here is done.
678                break;
679            }
680        }
681    }
682
683    pub(crate) async fn reload_pinned_events(
684        &self,
685    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
686        if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
687            loader.load_events().await
688        } else {
689            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
690        }
691    }
692
693    /// Run a lazy backwards pagination (in live mode).
694    ///
695    /// It adjusts the `count` value of the `Skip` higher-order stream so that
696    /// more items are pushed front in the timeline.
697    ///
698    /// If no more items are available (i.e. if the `count` is zero), this
699    /// method returns `Some(needs)` where `needs` is the number of events that
700    /// must be unlazily backwards paginated.
701    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
702        let state = self.state.read().await;
703
704        let (count, needs) = state
705            .meta
706            .subscriber_skip_count
707            .compute_next_when_paginating_backwards(num_events.into());
708
709        // This always happens on a live timeline.
710        let is_live_timeline = true;
711        state.meta.subscriber_skip_count.update(count, is_live_timeline);
712
713        needs
714    }
715
716    /// Run a backwards pagination (in focused mode) and append the results to
717    /// the timeline.
718    ///
719    /// Returns whether we hit the start of the timeline.
720    pub(super) async fn focused_paginate_backwards(
721        &self,
722        num_events: u16,
723    ) -> Result<bool, PaginationError> {
724        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
725            TimelineFocusKind::Live { .. }
726            | TimelineFocusKind::PinnedEvents { .. }
727            | TimelineFocusKind::Thread { .. } => {
728                return Err(PaginationError::NotSupported);
729            }
730            TimelineFocusKind::Event { paginator, .. } => paginator
731                .get()
732                .expect("Paginator was not instantiated")
733                .paginate_backwards(num_events)
734                .await
735                .map_err(PaginationError::Paginator)?,
736        };
737
738        // Events are in reverse topological order.
739        // We can push front each event individually.
740        self.handle_remote_events_with_diffs(
741            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
742            RemoteEventOrigin::Pagination,
743        )
744        .await;
745
746        Ok(hit_end_of_timeline)
747    }
748
749    /// Run a forwards pagination (in focused mode) and append the results to
750    /// the timeline.
751    ///
752    /// Returns whether we hit the end of the timeline.
753    pub(super) async fn focused_paginate_forwards(
754        &self,
755        num_events: u16,
756    ) -> Result<bool, PaginationError> {
757        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
758            TimelineFocusKind::Live { .. }
759            | TimelineFocusKind::PinnedEvents { .. }
760            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
761
762            TimelineFocusKind::Event { paginator, .. } => paginator
763                .get()
764                .expect("Paginator was not instantiated")
765                .paginate_forwards(num_events)
766                .await
767                .map_err(PaginationError::Paginator)?,
768        };
769
770        // Events are in topological order.
771        // We can append all events with no transformation.
772        self.handle_remote_events_with_diffs(
773            vec![VectorDiff::Append { values: events.into() }],
774            RemoteEventOrigin::Pagination,
775        )
776        .await;
777
778        Ok(hit_end_of_timeline)
779    }
780
781    /// Is this timeline receiving events from sync (aka has a live focus)?
782    pub(super) fn is_live(&self) -> bool {
783        matches!(&*self.focus, TimelineFocusKind::Live { .. })
784    }
785
786    /// Is this timeline focused on a thread?
787    pub(super) fn is_threaded(&self) -> bool {
788        self.focus.is_thread()
789    }
790
791    /// The root of the current thread, for a live thread timeline or a
792    /// permalink to a thread message.
793    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
794        self.focus.thread_root().map(ToOwned::to_owned)
795    }
796
797    /// Get a copy of the current items in the list.
798    ///
799    /// Cheap because `im::Vector` is cheap to clone.
800    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
801        self.state.read().await.items.clone_items()
802    }
803
804    #[cfg(test)]
805    pub(super) async fn subscribe_raw(
806        &self,
807    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
808        self.state.read().await.items.subscribe().into_values_and_stream()
809    }
810
811    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
812        let state = self.state.read().await;
813
814        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
815    }
816
817    pub(super) async fn subscribe_filter_map<U, F>(
818        &self,
819        f: F,
820    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
821    where
822        U: Clone,
823        F: Fn(Arc<TimelineItem>) -> Option<U>,
824    {
825        self.state.read().await.items.subscribe().filter_map(f)
826    }
827
828    /// Toggle a reaction locally.
829    ///
830    /// Returns true if the reaction was added, false if it was removed.
831    #[instrument(skip_all)]
832    pub(super) async fn toggle_reaction_local(
833        &self,
834        item_id: &TimelineEventItemId,
835        key: &str,
836    ) -> Result<bool, Error> {
837        let mut state = self.state.write().await;
838
839        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
840            warn!("Timeline item not found, can't add reaction");
841            return Err(Error::FailedToToggleReaction);
842        };
843
844        let user_id = self.room_data_provider.own_user_id();
845        let prev_status = item
846            .content()
847            .reactions()
848            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
849
850        let Some(prev_status) = prev_status else {
851            // Adding the new reaction.
852            match item.handle() {
853                TimelineItemHandle::Local(send_handle) => {
854                    if send_handle
855                        .react(key.to_owned())
856                        .await
857                        .map_err(|err| Error::SendQueueError(err.into()))?
858                        .is_some()
859                    {
860                        trace!("adding a reaction to a local echo");
861                        return Ok(true);
862                    }
863
864                    warn!("couldn't toggle reaction for local echo");
865                    return Ok(false);
866                }
867
868                TimelineItemHandle::Remote(event_id) => {
869                    // Add a reaction through the room data provider.
870                    // No need to reflect the effect locally, since the local echo handling will
871                    // take care of it.
872                    trace!("adding a reaction to a remote echo");
873                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
874                    self.room_data_provider
875                        .send(ReactionEventContent::from(annotation).into())
876                        .await?;
877                    return Ok(true);
878                }
879            }
880        };
881
882        trace!("removing a previous reaction");
883        match prev_status {
884            ReactionStatus::LocalToLocal(send_reaction_handle) => {
885                if let Some(handle) = send_reaction_handle {
886                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
887                        // Impossible state: the reaction has moved from local to echo under our
888                        // feet, but the timeline was supposed to be locked!
889                        warn!("unexpectedly unable to abort sending of local reaction");
890                    }
891                } else {
892                    warn!("no send reaction handle (this should only happen in testing contexts)");
893                }
894            }
895
896            ReactionStatus::LocalToRemote(send_handle) => {
897                // No need to reflect the change ourselves, since handling the discard of the
898                // local echo will take care of it.
899                trace!("aborting send of the previous reaction that was a local echo");
900                if let Some(handle) = send_handle {
901                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
902                        // Impossible state: the reaction has moved from local to echo under our
903                        // feet, but the timeline was supposed to be locked!
904                        warn!("unexpectedly unable to abort sending of local reaction");
905                    }
906                } else {
907                    warn!("no send handle (this should only happen in testing contexts)");
908                }
909            }
910
911            ReactionStatus::RemoteToRemote(event_id) => {
912                // Assume the redaction will work; we'll re-add the reaction if it didn't.
913                let Some(annotated_event_id) =
914                    item.as_remote().map(|event_item| event_item.event_id.clone())
915                else {
916                    warn!("remote reaction to remote event, but the associated item isn't remote");
917                    return Ok(false);
918                };
919
920                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
921                let reaction_info = reactions.remove_reaction(user_id, key);
922
923                if reaction_info.is_some() {
924                    let new_item = item.with_reactions(reactions);
925                    state.items.replace(item_pos, new_item);
926                } else {
927                    warn!(
928                        "reaction is missing on the item, not removing it locally, \
929                         but sending redaction."
930                    );
931                }
932
933                // Release the lock before running the request.
934                drop(state);
935
936                trace!("sending redact for a previous reaction");
937                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
938                    if let Some(reaction_info) = reaction_info {
939                        debug!("sending redact failed, adding the reaction back to the list");
940
941                        let mut state = self.state.write().await;
942                        if let Some((item_pos, item)) =
943                            rfind_event_by_id(&state.items, &annotated_event_id)
944                        {
945                            // Re-add the reaction to the mapping.
946                            let mut reactions =
947                                item.content().reactions().cloned().unwrap_or_default();
948                            reactions
949                                .entry(key.to_owned())
950                                .or_default()
951                                .insert(user_id.to_owned(), reaction_info);
952                            let new_item = item.with_reactions(reactions);
953                            state.items.replace(item_pos, new_item);
954                        } else {
955                            warn!(
956                                "couldn't find item to re-add reaction anymore; \
957                                 maybe it's been redacted?"
958                            );
959                        }
960                    }
961
962                    return Err(err);
963                }
964            }
965        }
966
967        Ok(false)
968    }
969
970    /// Handle updates on events as [`VectorDiff`]s.
971    pub(super) async fn handle_remote_events_with_diffs(
972        &self,
973        diffs: Vec<VectorDiff<TimelineEvent>>,
974        origin: RemoteEventOrigin,
975    ) {
976        if diffs.is_empty() {
977            return;
978        }
979
980        let mut state = self.state.write().await;
981        state
982            .handle_remote_events_with_diffs(
983                diffs,
984                origin,
985                &self.room_data_provider,
986                &self.settings,
987            )
988            .await
989    }
990
991    /// Only handle aggregations received as [`VectorDiff`]s.
992    pub(super) async fn handle_remote_aggregations(
993        &self,
994        diffs: Vec<VectorDiff<TimelineEvent>>,
995        origin: RemoteEventOrigin,
996    ) {
997        if diffs.is_empty() {
998            return;
999        }
1000
1001        let mut state = self.state.write().await;
1002        state
1003            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
1004            .await
1005    }
1006
1007    pub(super) async fn clear(&self) {
1008        self.state.write().await.clear();
1009    }
1010
1011    /// Replaces the content of the current timeline with initial events.
1012    ///
1013    /// Also sets up read receipts and the read marker for a live timeline of a
1014    /// room.
1015    ///
1016    /// This is all done with a single lock guard, since we don't want the state
1017    /// to be modified between the clear and re-insertion of new events.
1018    pub(super) async fn replace_with_initial_remote_events<Events>(
1019        &self,
1020        events: Events,
1021        origin: RemoteEventOrigin,
1022    ) where
1023        Events: IntoIterator,
1024        <Events as IntoIterator>::Item: Into<TimelineEvent>,
1025    {
1026        let mut state = self.state.write().await;
1027
1028        let track_read_markers = &self.settings.track_read_receipts;
1029        if track_read_markers.is_enabled() {
1030            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
1031            state
1032                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
1033                .await;
1034        }
1035
1036        // Replace the events if either the current event list or the new one aren't
1037        // empty.
1038        // Previously we just had to check the new one wasn't empty because
1039        // we did a clear operation before so the current one would always be empty, but
1040        // now we may want to replace a populated timeline with an empty one.
1041        let mut events = events.into_iter().peekable();
1042        if !state.items.is_empty() || events.peek().is_some() {
1043            state
1044                .replace_with_remote_events(
1045                    events,
1046                    origin,
1047                    &self.room_data_provider,
1048                    &self.settings,
1049                )
1050                .await;
1051        }
1052
1053        if track_read_markers.is_enabled() {
1054            if let Some(fully_read_event_id) =
1055                self.room_data_provider.load_fully_read_marker().await
1056            {
1057                state.handle_fully_read_marker(fully_read_event_id);
1058            } else if let Some(latest_receipt_event_id) = state
1059                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1060            {
1061                // Fall back to read receipt if no fully read marker exists.
1062                debug!("no `m.fully_read` marker found, falling back to read receipt");
1063                state.handle_fully_read_marker(latest_receipt_event_id);
1064            }
1065        }
1066    }
1067
1068    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1069        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1070    }
1071
1072    pub(super) async fn handle_ephemeral_events(
1073        &self,
1074        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1075    ) {
1076        // Don't even take the lock if there are no events to process.
1077        if events.is_empty() {
1078            return;
1079        }
1080        let mut state = self.state.write().await;
1081        state.handle_ephemeral_events(events, &self.room_data_provider).await;
1082    }
1083
1084    /// Creates the local echo for an event we're sending.
1085    #[instrument(skip_all)]
1086    pub(super) async fn handle_local_event(
1087        &self,
1088        txn_id: OwnedTransactionId,
1089        content: AnyMessageLikeEventContent,
1090        send_handle: Option<SendHandle>,
1091    ) {
1092        let sender = self.room_data_provider.own_user_id().to_owned();
1093        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1094
1095        let date_divider_mode = self.settings.date_divider_mode.clone();
1096
1097        let mut state = self.state.write().await;
1098        state
1099            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1100            .await;
1101    }
1102
1103    /// Update the send state of a local event represented by a transaction ID.
1104    ///
1105    /// If the corresponding local timeline item is missing, a warning is
1106    /// raised.
1107    #[instrument(skip(self))]
1108    pub(super) async fn update_event_send_state(
1109        &self,
1110        txn_id: &TransactionId,
1111        send_state: EventSendState,
1112    ) {
1113        let mut state = self.state.write().await;
1114        let mut txn = state.transaction();
1115
1116        let new_event_id: Option<&EventId> =
1117            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1118
1119        // The local echoes are always at the end of the timeline, we must first make
1120        // sure the remote echo hasn't showed up yet.
1121        if rfind_event_item(&txn.items, |it| {
1122            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1123        })
1124        .is_some()
1125        {
1126            // Remote echo already received. This is very unlikely.
1127            trace!("Remote echo received before send-event response");
1128
1129            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1130
1131            // If there's both the remote echo and a local echo, that means the
1132            // remote echo was received before the response *and* contained no
1133            // transaction ID (and thus duplicated the local echo).
1134            if let Some((idx, _)) = local_echo {
1135                warn!("Message echo got duplicated, removing the local one");
1136                txn.items.remove(idx);
1137
1138                // Adjust the date dividers, if needs be.
1139                let mut adjuster =
1140                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1141                adjuster.run(&mut txn.items, &mut txn.meta);
1142            }
1143
1144            txn.commit();
1145            return;
1146        }
1147
1148        // Look for the local event by the transaction ID or event ID.
1149        let result = rfind_event_item(&txn.items, |it| {
1150            it.transaction_id() == Some(txn_id)
1151                || new_event_id.is_some()
1152                    && it.event_id() == new_event_id
1153                    && it.as_local().is_some()
1154        });
1155
1156        let Some((idx, item)) = result else {
1157            // Event wasn't found as a standalone item.
1158            //
1159            // If it was just sent, try to find if it matches a corresponding aggregation,
1160            // and mark it as sent in that case.
1161            if let Some(new_event_id) = new_event_id {
1162                if txn.meta.aggregations.mark_aggregation_as_sent(
1163                    txn_id.to_owned(),
1164                    new_event_id.to_owned(),
1165                    &mut txn.items,
1166                    &txn.meta.room_version_rules,
1167                ) {
1168                    trace!("Aggregation marked as sent");
1169                    txn.commit();
1170                    return;
1171                }
1172
1173                trace!("Sent aggregation was not found");
1174            }
1175
1176            warn!("Timeline item not found, can't update send state");
1177            return;
1178        };
1179
1180        let Some(local_item) = item.as_local() else {
1181            warn!("We looked for a local item, but it transitioned to remote.");
1182            return;
1183        };
1184
1185        // The event was already marked as sent, that's a broken state, let's
1186        // emit an error but also override to the given sent state.
1187        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1188            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1189        }
1190
1191        // If the event has just been marked as sent, update the aggregations mapping to
1192        // take that into account.
1193        if let Some(new_event_id) = new_event_id {
1194            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1195        }
1196
1197        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1198        txn.items.replace(idx, new_item);
1199
1200        txn.commit();
1201    }
1202
1203    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1204        let mut state = self.state.write().await;
1205
1206        if let Some((idx, _)) =
1207            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1208        {
1209            let mut txn = state.transaction();
1210
1211            txn.items.remove(idx);
1212
1213            // A read marker or a date divider may have been inserted before the local echo.
1214            // Ensure both are up to date.
1215            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1216            adjuster.run(&mut txn.items, &mut txn.meta);
1217
1218            txn.meta.update_read_marker(&mut txn.items);
1219
1220            txn.commit();
1221
1222            debug!("discarded local echo");
1223            return true;
1224        }
1225
1226        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1227        // dereferencing it once.
1228        let mut txn = state.transaction();
1229
1230        // Look if this was a local aggregation.
1231        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1232            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1233            &mut txn.items,
1234        ) {
1235            Ok(val) => val,
1236            Err(err) => {
1237                warn!("error when discarding local echo for an aggregation: {err}");
1238                // The aggregation has been found, it's just that we couldn't discard it.
1239                true
1240            }
1241        };
1242
1243        if found_aggregation {
1244            txn.commit();
1245        }
1246
1247        found_aggregation
1248    }
1249
1250    pub(super) async fn replace_local_echo(
1251        &self,
1252        txn_id: &TransactionId,
1253        content: AnyMessageLikeEventContent,
1254    ) -> bool {
1255        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1256            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1257            // handling RoomMessage should be sufficient in most cases. Worst
1258            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1259            // editing the event then.
1260            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1261            return false;
1262        };
1263
1264        let mut state = self.state.write().await;
1265        let mut txn = state.transaction();
1266
1267        let Some((idx, prev_item)) =
1268            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1269        else {
1270            debug!("Can't find local echo to replace");
1271            return false;
1272        };
1273
1274        // Reuse the previous local echo's state, but reset the send state to not sent
1275        // (per API contract).
1276        let ti_kind = {
1277            let Some(prev_local_item) = prev_item.as_local() else {
1278                warn!("We looked for a local item, but it transitioned as remote??");
1279                return false;
1280            };
1281            // If the local echo had an upload progress, retain it.
1282            let progress = as_variant!(&prev_local_item.send_state,
1283                EventSendState::NotSentYet { progress } => progress.clone())
1284            .flatten();
1285            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1286        };
1287
1288        // Replace the local-related state (kind) and the content state.
1289        let new_item = TimelineItem::new(
1290            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1291                content.msgtype,
1292                content.mentions,
1293                prev_item.content().reactions().cloned().unwrap_or_default(),
1294                prev_item.content().thread_root(),
1295                prev_item.content().in_reply_to(),
1296                prev_item.content().thread_summary(),
1297            )),
1298            prev_item.internal_id.to_owned(),
1299        );
1300
1301        txn.items.replace(idx, new_item);
1302
1303        // This doesn't change the original sending time, so there's no need to adjust
1304        // date dividers.
1305
1306        txn.commit();
1307
1308        debug!("Replaced local echo");
1309        true
1310    }
1311
1312    pub(super) async fn compute_redecryption_candidates(
1313        &self,
1314    ) -> (BTreeSet<String>, BTreeSet<String>) {
1315        let state = self.state.read().await;
1316        compute_redecryption_candidates(&state.items)
1317    }
1318
1319    pub(super) async fn set_sender_profiles_pending(&self) {
1320        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1321    }
1322
1323    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1324        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1325    }
1326
1327    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1328        self.state.write().await.items.for_each(|mut entry| {
1329            let Some(event_item) = entry.as_event() else { return };
1330            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1331                let new_item = entry.with_kind(TimelineItemKind::Event(
1332                    event_item.with_sender_profile(profile_state.clone()),
1333                ));
1334                ObservableItemsEntry::replace(&mut entry, new_item);
1335            }
1336        });
1337    }
1338
1339    pub(super) async fn update_missing_sender_profiles(&self) {
1340        trace!("Updating missing sender profiles");
1341
1342        let mut state = self.state.write().await;
1343        let mut entries = state.items.entries();
1344        while let Some(mut entry) = entries.next() {
1345            let Some(event_item) = entry.as_event() else { continue };
1346            let event_id = event_item.event_id().map(debug);
1347            let transaction_id = event_item.transaction_id().map(debug);
1348
1349            if event_item.sender_profile().is_ready() {
1350                trace!(event_id, transaction_id, "Profile already set");
1351                continue;
1352            }
1353
1354            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1355                Some(profile) => {
1356                    trace!(event_id, transaction_id, "Adding profile");
1357                    let updated_item =
1358                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1359                    let new_item = entry.with_kind(updated_item);
1360                    ObservableItemsEntry::replace(&mut entry, new_item);
1361                }
1362                None => {
1363                    if !event_item.sender_profile().is_unavailable() {
1364                        trace!(event_id, transaction_id, "Marking profile unavailable");
1365                        let updated_item =
1366                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1367                        let new_item = entry.with_kind(updated_item);
1368                        ObservableItemsEntry::replace(&mut entry, new_item);
1369                    } else {
1370                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1371                    }
1372                }
1373            }
1374        }
1375
1376        trace!("Done updating missing sender profiles");
1377    }
1378
1379    /// Update the profiles of the given senders, even if they are ready.
1380    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1381        trace!("Forcing update of sender profiles: {sender_ids:?}");
1382
1383        let mut state = self.state.write().await;
1384        let mut entries = state.items.entries();
1385        while let Some(mut entry) = entries.next() {
1386            let Some(event_item) = entry.as_event() else { continue };
1387            if !sender_ids.contains(event_item.sender()) {
1388                continue;
1389            }
1390
1391            let event_id = event_item.event_id().map(debug);
1392            let transaction_id = event_item.transaction_id().map(debug);
1393
1394            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1395                Some(profile) => {
1396                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1397                    {
1398                        debug!(event_id, transaction_id, "Profile already up-to-date");
1399                    } else {
1400                        trace!(event_id, transaction_id, "Updating profile");
1401                        let updated_item =
1402                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1403                        let new_item = entry.with_kind(updated_item);
1404                        ObservableItemsEntry::replace(&mut entry, new_item);
1405                    }
1406                }
1407                None => {
1408                    if !event_item.sender_profile().is_unavailable() {
1409                        trace!(event_id, transaction_id, "Marking profile unavailable");
1410                        let updated_item =
1411                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1412                        let new_item = entry.with_kind(updated_item);
1413                        ObservableItemsEntry::replace(&mut entry, new_item);
1414                    } else {
1415                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1416                    }
1417                }
1418            }
1419        }
1420
1421        trace!("Done forcing update of sender profiles");
1422    }
1423
1424    #[cfg(test)]
1425    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1426        let own_user_id = self.room_data_provider.own_user_id();
1427        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1428    }
1429
1430    /// Get the latest read receipt for the given user.
1431    ///
1432    /// Useful to get the latest read receipt, whether it's private or public.
1433    pub(super) async fn latest_user_read_receipt(
1434        &self,
1435        user_id: &UserId,
1436    ) -> Option<(OwnedEventId, Receipt)> {
1437        let receipt_thread = self.focus.receipt_thread();
1438
1439        self.state
1440            .read()
1441            .await
1442            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1443            .await
1444    }
1445
1446    /// Get the ID of the timeline event with the latest read receipt for the
1447    /// given user.
1448    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1449        &self,
1450        user_id: &UserId,
1451    ) -> Option<OwnedEventId> {
1452        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1453    }
1454
1455    /// Subscribe to changes in the read receipts of our own user.
1456    pub async fn subscribe_own_user_read_receipts_changed(
1457        &self,
1458    ) -> impl Stream<Item = ()> + use<P> {
1459        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1460    }
1461
1462    /// Handle a room send update that's a new local echo.
1463    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1464        match echo.content {
1465            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1466                let content = match serialized_event.deserialize() {
1467                    Ok(d) => d,
1468                    Err(err) => {
1469                        warn!("error deserializing local echo: {err}");
1470                        return;
1471                    }
1472                };
1473
1474                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1475                    .await;
1476
1477                if let Some(send_error) = send_error {
1478                    self.update_event_send_state(
1479                        &echo.transaction_id,
1480                        EventSendState::SendingFailed {
1481                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1482                                send_error,
1483                            ))),
1484                            is_recoverable: false,
1485                        },
1486                    )
1487                    .await;
1488                }
1489            }
1490
1491            LocalEchoContent::React { key, send_handle, applies_to } => {
1492                self.handle_local_reaction(key, send_handle, applies_to).await;
1493            }
1494        }
1495    }
1496
1497    /// Adds a reaction (local echo) to a local echo.
1498    #[instrument(skip(self, send_handle))]
1499    async fn handle_local_reaction(
1500        &self,
1501        reaction_key: String,
1502        send_handle: SendReactionHandle,
1503        applies_to: OwnedTransactionId,
1504    ) {
1505        let mut state = self.state.write().await;
1506        let mut tr = state.transaction();
1507
1508        let target = TimelineEventItemId::TransactionId(applies_to);
1509
1510        let reaction_txn_id = send_handle.transaction_id().to_owned();
1511        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1512        let aggregation = Aggregation::new(
1513            TimelineEventItemId::TransactionId(reaction_txn_id),
1514            AggregationKind::Reaction {
1515                key: reaction_key.clone(),
1516                sender: self.room_data_provider.own_user_id().to_owned(),
1517                timestamp: MilliSecondsSinceUnixEpoch::now(),
1518                reaction_status,
1519            },
1520        );
1521
1522        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1523        find_item_and_apply_aggregation(
1524            &tr.meta.aggregations,
1525            &mut tr.items,
1526            &target,
1527            aggregation,
1528            &tr.meta.room_version_rules,
1529        );
1530
1531        tr.commit();
1532    }
1533
1534    /// Handle a single room send queue update.
1535    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1536        match update {
1537            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1538                self.handle_local_echo(echo).await;
1539            }
1540
1541            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1542                if !self.discard_local_echo(&transaction_id).await {
1543                    warn!("couldn't find the local echo to discard");
1544                }
1545            }
1546
1547            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1548                let content = match new_content.deserialize() {
1549                    Ok(d) => d,
1550                    Err(err) => {
1551                        warn!("error deserializing local echo (upon edit): {err}");
1552                        return;
1553                    }
1554                };
1555
1556                if !self.replace_local_echo(&transaction_id, content).await {
1557                    warn!("couldn't find the local echo to replace");
1558                }
1559            }
1560
1561            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1562                self.update_event_send_state(
1563                    &transaction_id,
1564                    EventSendState::SendingFailed { error, is_recoverable },
1565                )
1566                .await;
1567            }
1568
1569            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1570                self.update_event_send_state(
1571                    &transaction_id,
1572                    EventSendState::NotSentYet { progress: None },
1573                )
1574                .await;
1575            }
1576
1577            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1578                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1579                    .await;
1580            }
1581
1582            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1583                self.update_event_send_state(
1584                    &related_to,
1585                    EventSendState::NotSentYet {
1586                        progress: Some(MediaUploadProgress { index, progress }),
1587                    },
1588                )
1589                .await;
1590            }
1591        }
1592    }
1593
1594    /// Insert a timeline start item at the beginning of the room, if it's
1595    /// missing.
1596    pub async fn insert_timeline_start_if_missing(&self) {
1597        let mut state = self.state.write().await;
1598        let mut txn = state.transaction();
1599        txn.items.push_timeline_start_if_missing(
1600            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1601        );
1602        txn.commit();
1603    }
1604
1605    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1606    /// timeline or not.
1607    ///
1608    /// Can be `None` if the event cannot be represented as a standalone item,
1609    /// because it's an aggregation.
1610    pub(super) async fn make_replied_to(
1611        &self,
1612        event: TimelineEvent,
1613    ) -> Result<Option<EmbeddedEvent>, Error> {
1614        let state = self.state.read().await;
1615        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1616    }
1617}
1618
1619impl TimelineController {
1620    pub(super) fn room(&self) -> &Room {
1621        &self.room_data_provider
1622    }
1623
1624    /// Given an event identifier, will fetch the details for the event it's
1625    /// replying to, if applicable.
1626    #[instrument(skip(self))]
1627    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1628        let state_guard = self.state.write().await;
1629        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1630            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1631        let remote_item = item
1632            .as_remote()
1633            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1634            .clone();
1635
1636        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1637            debug!("Event is not a message");
1638            return Ok(());
1639        };
1640        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1641            debug!("Event is not a reply");
1642            return Ok(());
1643        };
1644        if let TimelineDetails::Pending = &in_reply_to.event {
1645            debug!("Replied-to event is already being fetched");
1646            return Ok(());
1647        }
1648        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1649            debug!("Replied-to event has already been fetched");
1650            return Ok(());
1651        }
1652
1653        let internal_id = item.internal_id.to_owned();
1654        let item = item.clone();
1655        let event = fetch_replied_to_event(
1656            state_guard,
1657            &self.state,
1658            index,
1659            &item,
1660            internal_id,
1661            &msglike,
1662            &in_reply_to.event_id,
1663            self.room(),
1664        )
1665        .await?;
1666
1667        // We need to be sure to have the latest position of the event as it might have
1668        // changed while waiting for the request.
1669        let mut state = self.state.write().await;
1670        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1671            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1672
1673        // Check the state of the event again, it might have been redacted while
1674        // the request was in-flight.
1675        let TimelineItemContent::MsgLike(MsgLikeContent {
1676            kind: MsgLikeKind::Message(message),
1677            reactions,
1678            thread_root,
1679            in_reply_to,
1680            thread_summary,
1681        }) = item.content().clone()
1682        else {
1683            info!("Event is no longer a message (redacted?)");
1684            return Ok(());
1685        };
1686        let Some(in_reply_to) = in_reply_to else {
1687            warn!("Event no longer has a reply (bug?)");
1688            return Ok(());
1689        };
1690
1691        // Now that we've received the content of the replied-to event, replace the
1692        // replied-to content in the item with it.
1693        trace!("Updating in-reply-to details");
1694        let internal_id = item.internal_id.to_owned();
1695        let mut item = item.clone();
1696        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1697            kind: MsgLikeKind::Message(message),
1698            reactions,
1699            thread_root,
1700            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1701            thread_summary,
1702        }));
1703        state.items.replace(index, TimelineItem::new(item, internal_id));
1704
1705        Ok(())
1706    }
1707
1708    /// Returns the thread that should be used for a read receipt based on the
1709    /// current focus of the timeline and the receipt type.
1710    ///
1711    /// A `SendReceiptType::FullyRead` will always use
1712    /// `ReceiptThread::Unthreaded`
1713    pub(super) fn infer_thread_for_read_receipt(
1714        &self,
1715        receipt_type: &SendReceiptType,
1716    ) -> ReceiptThread {
1717        if matches!(receipt_type, SendReceiptType::FullyRead) {
1718            ReceiptThread::Unthreaded
1719        } else {
1720            self.focus.receipt_thread()
1721        }
1722    }
1723
1724    /// Check whether the given receipt should be sent.
1725    ///
1726    /// Returns `false` if the given receipt is older than the current one.
1727    pub(super) async fn should_send_receipt(
1728        &self,
1729        receipt_type: &SendReceiptType,
1730        receipt_thread: &ReceiptThread,
1731        event_id: &EventId,
1732    ) -> bool {
1733        let own_user_id = self.room().own_user_id();
1734        let state = self.state.read().await;
1735        let room = self.room();
1736
1737        match receipt_type {
1738            SendReceiptType::Read => {
1739                if let Some((old_pub_read, _)) = state
1740                    .meta
1741                    .user_receipt(
1742                        own_user_id,
1743                        ReceiptType::Read,
1744                        receipt_thread.clone(),
1745                        room,
1746                        state.items.all_remote_events(),
1747                    )
1748                    .await
1749                {
1750                    trace!(%old_pub_read, "found a previous public receipt");
1751                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1752                        &old_pub_read,
1753                        event_id,
1754                        state.items.all_remote_events(),
1755                    ) {
1756                        trace!(
1757                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1758                        );
1759                        return relative_pos == RelativePosition::After;
1760                    }
1761                }
1762            }
1763
1764            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1765            // doesn't make sense to have a private read receipt behind a public one.
1766            SendReceiptType::ReadPrivate => {
1767                if let Some((old_priv_read, _)) =
1768                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1769                {
1770                    trace!(%old_priv_read, "found a previous private receipt");
1771                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1772                        &old_priv_read,
1773                        event_id,
1774                        state.items.all_remote_events(),
1775                    ) {
1776                        trace!(
1777                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1778                        );
1779                        return relative_pos == RelativePosition::After;
1780                    }
1781                }
1782            }
1783
1784            SendReceiptType::FullyRead => {
1785                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1786                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1787                        &prev_event_id,
1788                        event_id,
1789                        state.items.all_remote_events(),
1790                    )
1791                {
1792                    return relative_pos == RelativePosition::After;
1793                }
1794            }
1795
1796            _ => {}
1797        }
1798
1799        // Let the server handle unknown receipts.
1800        true
1801    }
1802
1803    /// Returns the latest event identifier, even if it's not visible, or if
1804    /// it's folded into another timeline item.
1805    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1806        let state = self.state.read().await;
1807        let filter_out_thread_events = match self.focus() {
1808            TimelineFocusKind::Thread { .. } => false,
1809            TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1810            TimelineFocusKind::Event { paginator } => {
1811                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1812            }
1813            _ => true,
1814        };
1815
1816        // In some timelines, threaded events are added to the `AllRemoteEvents`
1817        // collection since they need to be taken into account to calculate read
1818        // receipts, but we don't want to actually take them into account for returning
1819        // the latest event id since they're not visibly in the timeline
1820        state
1821            .items
1822            .all_remote_events()
1823            .iter()
1824            .rev()
1825            .filter_map(|item| {
1826                if !filter_out_thread_events || item.thread_root_id.is_none() {
1827                    Some(item.event_id.clone())
1828                } else {
1829                    None
1830                }
1831            })
1832            .next()
1833    }
1834
1835    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1836    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1837        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1838
1839        let request = DecryptionRetryRequest {
1840            room_id: self.room().room_id().to_owned(),
1841            utd_session_ids: utds,
1842            refresh_info_session_ids: decrypted,
1843        };
1844
1845        self.room().client().event_cache().request_decryption(request);
1846    }
1847
1848    /// Combine the global (event cache) pagination status with the local state
1849    /// of the timeline.
1850    ///
1851    /// This only changes the global pagination status of this room, in one
1852    /// case: if the timeline has a skip count greater than 0, it will
1853    /// ensure that the pagination status says that we haven't reached the
1854    /// timeline start yet.
1855    pub(super) async fn map_pagination_status(
1856        &self,
1857        status: RoomPaginationStatus,
1858    ) -> RoomPaginationStatus {
1859        match status {
1860            RoomPaginationStatus::Idle { hit_timeline_start } => {
1861                if hit_timeline_start {
1862                    let state = self.state.read().await;
1863                    // If the skip count is greater than 0, it means that a subsequent pagination
1864                    // could return more items, so pretend we didn't get the information that the
1865                    // timeline start was hit.
1866                    if state.meta.subscriber_skip_count.get() > 0 {
1867                        return RoomPaginationStatus::Idle { hit_timeline_start: false };
1868                    }
1869                }
1870            }
1871            RoomPaginationStatus::Paginating => {}
1872        }
1873
1874        // You're perfect, just the way you are.
1875        status
1876    }
1877}
1878
1879impl<P: RoomDataProvider> TimelineController<P> {
1880    /// Returns the timeline focus of the [`TimelineController`].
1881    pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1882        &self.focus
1883    }
1884}
1885
1886#[allow(clippy::too_many_arguments)]
1887async fn fetch_replied_to_event<P: RoomDataProvider>(
1888    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1889    state_lock: &RwLock<TimelineState<P>>,
1890    index: usize,
1891    item: &EventTimelineItem,
1892    internal_id: TimelineUniqueId,
1893    msglike: &MsgLikeContent,
1894    in_reply_to: &EventId,
1895    room: &Room,
1896) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1897    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1898        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1899        trace!("Found replied-to event locally");
1900        return Ok(details);
1901    }
1902
1903    // Replace the item with a new timeline item that has the fetching status of the
1904    // replied-to event to pending.
1905    trace!("Setting in-reply-to details to pending");
1906    let in_reply_to_details =
1907        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1908
1909    let event_item = item
1910        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1911
1912    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1913    state_guard.items.replace(index, new_timeline_item);
1914
1915    // Don't hold the state lock while the network request is made.
1916    drop(state_guard);
1917
1918    trace!("Fetching replied-to event");
1919    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1920        Ok(timeline_event) => {
1921            let state = state_lock.read().await;
1922
1923            let replied_to_item =
1924                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1925
1926            if let Some(item) = replied_to_item {
1927                TimelineDetails::Ready(Box::new(item))
1928            } else {
1929                // The replied-to item is an aggregation, not a standalone item.
1930                return Err(Error::UnsupportedEvent);
1931            }
1932        }
1933
1934        Err(e) => TimelineDetails::Error(Arc::new(e)),
1935    };
1936
1937    Ok(res)
1938}