Skip to main content

matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25    config::RequestConfig,
26    deserialized_responses::TimelineEvent,
27    event_cache::{DecryptionRetryRequest, PaginationStatus, RoomEventCache},
28    paginators::{PaginationResult, PaginationToken, Paginator},
29    send_queue::{
30        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31    },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40        AnySyncTimelineEvent, MessageLikeEventType,
41        poll::unstable_start::UnstablePollStartEventContent,
42        reaction::ReactionEventContent,
43        receipt::{Receipt, ReceiptThread, ReceiptType},
44        relation::{Annotation, RelationType},
45        room::message::{MessageType, Relation},
46    },
47    room_version_rules::RoomVersionRules,
48    serde::Raw,
49};
50use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54    metadata::{RelativePosition, TimelineMetadata},
55    observable_items::{
56        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57        ObservableItemsTransactionEntry,
58    },
59    state::TimelineState,
60    state_transaction::TimelineStateTransaction,
61};
62use super::{
63    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
66    TimelineReadReceiptTracking, VirtualTimelineItem,
67    algorithms::{rfind_event_by_id, rfind_event_item},
68    event_item::{ReactionStatus, RemoteEventOrigin},
69    item::TimelineUniqueId,
70    subscriber::TimelineSubscriber,
71    traits::RoomDataProvider,
72};
73use crate::{
74    timeline::{
75        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
76        algorithms::rfind_event_by_item_id,
77        controller::decryption_retry_task::compute_redecryption_candidates,
78        date_dividers::DateDividerAdjuster, event_item::TimelineItemHandle,
79    },
80    unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
94use matrix_sdk_common::serde_helpers::extract_thread_root;
95
96/// Data associated to the current timeline focus.
97///
98/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
99/// version of it, including extra state that makes it useful over the lifetime
100/// of a timeline.
101#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
103    /// The timeline receives live events from the sync.
104    Live {
105        /// Whether to hide in-thread events from the timeline.
106        hide_threaded_events: bool,
107    },
108
109    /// The timeline is focused on a single event, and it can expand in one
110    /// direction or another.
111    Event {
112        /// The paginator instance.
113        paginator: OnceCell<AnyPaginator<P>>,
114    },
115
116    /// A live timeline for a thread.
117    Thread {
118        /// The root event for the current thread.
119        root_event_id: OwnedEventId,
120    },
121
122    PinnedEvents,
123}
124
125#[derive(Debug)]
126pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
127    Unthreaded {
128        /// The actual event paginator.
129        paginator: Paginator<P>,
130        /// Whether to hide in-thread events from the timeline.
131        hide_threaded_events: bool,
132    },
133    Threaded(ThreadedEventsLoader<P>),
134}
135
136impl<P: RoomDataProvider> AnyPaginator<P> {
137    /// Runs a backward pagination (requesting `num_events` to the server), from
138    /// the current state of the object.
139    ///
140    /// Will return immediately if we have already hit the start of the
141    /// timeline.
142    ///
143    /// May return an error if it's already paginating, or if the call to
144    /// the homeserver endpoints failed.
145    pub async fn paginate_backwards(
146        &self,
147        num_events: u16,
148    ) -> Result<PaginationResult, PaginatorError> {
149        match self {
150            Self::Unthreaded { paginator, .. } => {
151                paginator.paginate_backward(num_events.into()).await
152            }
153            Self::Threaded(threaded_paginator) => {
154                threaded_paginator.paginate_backwards(num_events.into()).await
155            }
156        }
157    }
158
159    /// Runs a forward pagination (requesting `num_events` to the server), from
160    /// the current state of the object.
161    ///
162    /// Will return immediately if we have already hit the end of the timeline.
163    ///
164    /// May return an error if it's already paginating, or if the call to
165    /// the homeserver endpoints failed.
166    pub async fn paginate_forwards(
167        &self,
168        num_events: u16,
169    ) -> Result<PaginationResult, PaginatorError> {
170        match self {
171            Self::Unthreaded { paginator, .. } => {
172                paginator.paginate_forward(num_events.into()).await
173            }
174            Self::Threaded(threaded_paginator) => {
175                threaded_paginator.paginate_forwards(num_events.into()).await
176            }
177        }
178    }
179
180    /// Whether to hide in-thread events from the timeline.
181    pub fn hide_threaded_events(&self) -> bool {
182        match self {
183            Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
184            Self::Threaded(_) => false,
185        }
186    }
187
188    /// Returns the root event id of the thread, if the paginator is
189    /// [`AnyPaginator::Threaded`].
190    pub fn thread_root(&self) -> Option<&EventId> {
191        match self {
192            Self::Unthreaded { .. } => None,
193            Self::Threaded(thread_events_loader) => {
194                Some(thread_events_loader.thread_root_event_id())
195            }
196        }
197    }
198}
199
200impl<P: RoomDataProvider> TimelineFocusKind<P> {
201    /// Returns the [`ReceiptThread`] that should be used for the current
202    /// timeline focus.
203    ///
204    /// Live and event timelines will use the unthreaded read receipt type in
205    /// general, unless they hide in-thread events, in which case they will
206    /// use the main thread.
207    pub(super) fn receipt_thread(&self) -> ReceiptThread {
208        if let Some(thread_root) = self.thread_root() {
209            ReceiptThread::Thread(thread_root.to_owned())
210        } else if self.hide_threaded_events() {
211            ReceiptThread::Main
212        } else {
213            ReceiptThread::Unthreaded
214        }
215    }
216
217    /// Whether to hide in-thread events from the timeline.
218    fn hide_threaded_events(&self) -> bool {
219        match self {
220            TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
221            TimelineFocusKind::Event { paginator } => {
222                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
223            }
224            TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
225        }
226    }
227
228    /// Whether the focus is on a thread (from a live thread or a thread
229    /// permalink).
230    fn is_thread(&self) -> bool {
231        self.thread_root().is_some()
232    }
233
234    /// If the focus is a thread, returns its root event ID.
235    fn thread_root(&self) -> Option<&EventId> {
236        match self {
237            TimelineFocusKind::Event { paginator, .. } => {
238                paginator.get().and_then(|paginator| paginator.thread_root())
239            }
240            TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
241            TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
242        }
243    }
244}
245
246#[derive(Clone, Debug)]
247pub(super) struct TimelineController<P: RoomDataProvider = Room> {
248    /// Inner mutable state.
249    state: Arc<RwLock<TimelineState<P>>>,
250
251    /// Focus data.
252    focus: Arc<TimelineFocusKind<P>>,
253
254    /// A [`RoomDataProvider`] implementation, providing data.
255    ///
256    /// The type is a `RoomDataProvider` to allow testing. In the real world,
257    /// this would normally be a [`Room`].
258    pub(crate) room_data_provider: P,
259
260    /// Settings applied to this timeline.
261    pub(super) settings: TimelineSettings,
262}
263
264#[derive(Clone)]
265pub(super) struct TimelineSettings {
266    /// Should the read receipts and read markers be handled and on which event
267    /// types?
268    pub(super) track_read_receipts: TimelineReadReceiptTracking,
269
270    /// Event filter that controls what's rendered as a timeline item (and thus
271    /// what can carry read receipts).
272    pub(super) event_filter: Arc<TimelineEventFilterFn>,
273
274    /// Are unparsable events added as timeline items of their own kind?
275    pub(super) add_failed_to_parse: bool,
276
277    /// Should the timeline items be grouped by day or month?
278    pub(super) date_divider_mode: DateDividerMode,
279}
280
281#[cfg(not(tarpaulin_include))]
282impl fmt::Debug for TimelineSettings {
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        f.debug_struct("TimelineSettings")
285            .field("track_read_receipts", &self.track_read_receipts)
286            .field("add_failed_to_parse", &self.add_failed_to_parse)
287            .finish_non_exhaustive()
288    }
289}
290
291impl Default for TimelineSettings {
292    fn default() -> Self {
293        Self {
294            track_read_receipts: TimelineReadReceiptTracking::Disabled,
295            event_filter: Arc::new(default_event_filter),
296            add_failed_to_parse: true,
297            date_divider_mode: DateDividerMode::Daily,
298        }
299    }
300}
301
302/// The default event filter for
303/// [`crate::timeline::TimelineBuilder::event_filter`].
304///
305/// It filters out events that are not rendered by the timeline, including but
306/// not limited to: reactions, edits, redactions on existing messages.
307///
308/// If you have a custom filter, it may be best to chain yours with this one if
309/// you do not want to run into situations where a read receipt is not visible
310/// because it's living on an event that doesn't have a matching timeline item.
311pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
312    match event {
313        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
314            if ev.redacts(&rules.redaction).is_some() {
315                // This is a redaction of an existing message, we'll only update the previous
316                // message and not render a new entry.
317                false
318            } else {
319                // This is a redacted entry, that we'll show only if the redacted entity wasn't
320                // a reaction.
321                ev.event_type() != MessageLikeEventType::Reaction
322            }
323        }
324
325        AnySyncTimelineEvent::MessageLike(msg) => {
326            match msg.original_content() {
327                None => {
328                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
329                    // a reaction.
330                    msg.event_type() != MessageLikeEventType::Reaction
331                }
332
333                Some(original_content) => {
334                    match original_content {
335                        AnyMessageLikeEventContent::RoomMessage(content) => {
336                            if content
337                                .relates_to
338                                .as_ref()
339                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
340                            {
341                                // Edits aren't visible by default.
342                                return false;
343                            }
344
345                            match content.msgtype {
346                                MessageType::Audio(_)
347                                | MessageType::Emote(_)
348                                | MessageType::File(_)
349                                | MessageType::Image(_)
350                                | MessageType::Location(_)
351                                | MessageType::Notice(_)
352                                | MessageType::ServerNotice(_)
353                                | MessageType::Text(_)
354                                | MessageType::Video(_)
355                                | MessageType::VerificationRequest(_) => true,
356                                #[cfg(feature = "unstable-msc4274")]
357                                MessageType::Gallery(_) => true,
358                                _ => false,
359                            }
360                        }
361
362                        AnyMessageLikeEventContent::Sticker(_)
363                        | AnyMessageLikeEventContent::UnstablePollStart(
364                            UnstablePollStartEventContent::New(_),
365                        )
366                        | AnyMessageLikeEventContent::CallInvite(_)
367                        | AnyMessageLikeEventContent::RtcNotification(_)
368                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
369
370                        _ => false,
371                    }
372                }
373            }
374        }
375
376        AnySyncTimelineEvent::State(_) => {
377            // All the state events may get displayed by default.
378            true
379        }
380    }
381}
382
383impl<P: RoomDataProvider> TimelineController<P> {
384    pub(super) fn new(
385        room_data_provider: P,
386        focus: TimelineFocus,
387        internal_id_prefix: Option<String>,
388        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
389        is_room_encrypted: bool,
390        settings: TimelineSettings,
391    ) -> Self {
392        let focus = match focus {
393            TimelineFocus::Live { hide_threaded_events } => {
394                TimelineFocusKind::Live { hide_threaded_events }
395            }
396
397            TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
398
399            TimelineFocus::Thread { root_event_id, .. } => {
400                TimelineFocusKind::Thread { root_event_id }
401            }
402
403            TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
404        };
405
406        let focus = Arc::new(focus);
407        let state = Arc::new(RwLock::new(TimelineState::new(
408            focus.clone(),
409            room_data_provider.own_user_id().to_owned(),
410            room_data_provider.room_version_rules(),
411            internal_id_prefix,
412            unable_to_decrypt_hook,
413            is_room_encrypted,
414        )));
415
416        Self { state, focus, room_data_provider, settings }
417    }
418
419    /// Initializes the configured focus with appropriate data.
420    ///
421    /// Should be called only once after creation of the [`TimelineInner`], with
422    /// all its fields set.
423    ///
424    /// Returns whether there were any events added to the timeline.
425    pub(super) async fn init_focus(
426        &self,
427        focus: &TimelineFocus,
428        room_event_cache: &RoomEventCache,
429    ) -> Result<bool, Error> {
430        match focus {
431            TimelineFocus::Live { .. } => {
432                // Retrieve the cached events, and add them to the timeline.
433                let events = room_event_cache.events().await?;
434
435                let has_events = !events.is_empty();
436
437                self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
438
439                match room_event_cache.pagination().status().get() {
440                    PaginationStatus::Idle { hit_timeline_start } => {
441                        if hit_timeline_start {
442                            // Eagerly insert the timeline start item, since pagination claims
443                            // we've already hit the timeline start.
444                            self.insert_timeline_start_if_missing().await;
445                        }
446                    }
447                    PaginationStatus::Paginating => {}
448                }
449
450                Ok(has_events)
451            }
452
453            TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
454                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
455                    // NOTE: this is sync'd with code in the ctor.
456                    unreachable!();
457                };
458
459                let event_paginator = Paginator::new(self.room_data_provider.clone());
460
461                let load_events_with_context = || async {
462                    // Start a /context request to load the focused event and surrounding events.
463                    event_paginator
464                        .start_from(event_id, (*num_context_events).into())
465                        .await
466                        .map(|r| r.events)
467                        .map_err(PaginationError::Paginator)
468                };
469
470                let events = if *num_context_events == 0 {
471                    // If no context is requested, try to load the event from the cache first and
472                    // include common relations such as reactions and edits.
473                    let request_config = Some(RequestConfig::default().retry_limit(3));
474                    let relations_filter =
475                        Some(vec![RelationType::Annotation, RelationType::Replacement]);
476
477                    // Load the event from the cache or, failing that, the server.
478                    match self
479                        .room_data_provider
480                        .load_event_with_relations(event_id, request_config, relations_filter)
481                        .await
482                    {
483                        Ok((event, related_events)) => {
484                            let mut events = vec![event];
485                            events.extend(related_events);
486                            events
487                        }
488                        Err(err) => {
489                            error!("error when loading focussed event: {err}");
490                            // Fall back to load the focused event using /context.
491                            load_events_with_context().await?
492                        }
493                    }
494                } else {
495                    // Start a /context request to load the focussed event and surrounding events.
496                    load_events_with_context().await?
497                };
498
499                // Find the target event, and see if it's part of a thread.
500                let extracted_thread_root = events
501                    .iter()
502                    .find(
503                        |event| {
504                            if let Some(id) = event.event_id() { id == *event_id } else { false }
505                        },
506                    )
507                    .and_then(|event| extract_thread_root(event.raw()));
508
509                // Determine the timeline's threading behavior.
510                let (thread_root_event_id, hide_threaded_events) = match thread_mode {
511                    TimelineEventFocusThreadMode::ForceThread => {
512                        // If the event is part of a thread, use its thread root. Otherwise,
513                        // assume the event itself is the thread root.
514                        (extracted_thread_root.or_else(|| Some(event_id.clone())), false)
515                    }
516                    TimelineEventFocusThreadMode::Automatic { hide_threaded_events } => {
517                        (extracted_thread_root, *hide_threaded_events)
518                    }
519                };
520
521                let _ = paginator.set(match thread_root_event_id {
522                    Some(root_id) => {
523                        let mut tokens = event_paginator.tokens();
524
525                        // Look if the thread root event is part of the /context response. This
526                        // allows us to spare some backwards pagination with
527                        // /relations.
528                        let includes_root_event = events.iter().any(|event| {
529                            if let Some(id) = event.event_id() { id == root_id } else { false }
530                        });
531
532                        if includes_root_event {
533                            // If we have the root event, there's no need to do back-paginations
534                            // with /relations, since we are at the start of the thread.
535                            tokens.previous = PaginationToken::HitEnd;
536                        }
537
538                        AnyPaginator::Threaded(ThreadedEventsLoader::new(
539                            self.room_data_provider.clone(),
540                            root_id,
541                            tokens,
542                        ))
543                    }
544
545                    None => AnyPaginator::Unthreaded {
546                        paginator: event_paginator,
547                        hide_threaded_events,
548                    },
549                });
550
551                let has_events = !events.is_empty();
552
553                match paginator.get().expect("Paginator was not instantiated") {
554                    AnyPaginator::Unthreaded { .. } => {
555                        self.replace_with_initial_remote_events(
556                            events,
557                            RemoteEventOrigin::Pagination,
558                        )
559                        .await;
560                    }
561
562                    AnyPaginator::Threaded(threaded_events_loader) => {
563                        // We filter only events that are part of the thread (including the root),
564                        // since /context will return adjacent events without filters.
565                        let thread_root = threaded_events_loader.thread_root_event_id();
566                        let events_in_thread = events.into_iter().filter(|event| {
567                            extract_thread_root(event.raw())
568                                .is_some_and(|event_thread_root| event_thread_root == thread_root)
569                                || event.event_id().as_deref() == Some(thread_root)
570                        });
571
572                        self.replace_with_initial_remote_events(
573                            events_in_thread,
574                            RemoteEventOrigin::Pagination,
575                        )
576                        .await;
577                    }
578                }
579
580                Ok(has_events)
581            }
582
583            TimelineFocus::Thread { root_event_id, .. } => {
584                self.init_with_thread_root(root_event_id, room_event_cache).await
585            }
586
587            TimelineFocus::PinnedEvents => {
588                let (initial_events, _update_receiver) =
589                    room_event_cache.subscribe_to_pinned_events().await?;
590
591                let has_events = !initial_events.is_empty();
592
593                self.replace_with_initial_remote_events(
594                    initial_events,
595                    RemoteEventOrigin::Pagination,
596                )
597                .await;
598
599                Ok(has_events)
600            }
601        }
602    }
603
604    /// (Re-)initialise a timeline using [`TimelineFocus::Thread`] with cached
605    /// threaded events and secondary relations.
606    ///
607    /// Returns whether there were any events added to the timeline.
608    pub(super) async fn init_with_thread_root(
609        &self,
610        root_event_id: &OwnedEventId,
611        room_event_cache: &RoomEventCache,
612    ) -> Result<bool, Error> {
613        let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
614        let has_events = !events.is_empty();
615
616        // For each event, we also need to find the related events, as they don't
617        // include the thread relationship, they won't be included in
618        // the initial list of events.
619        let mut related_events = Vector::new();
620        for event_id in events.iter().filter_map(|event| event.event_id()) {
621            if let Some((_original, related)) =
622                room_event_cache.find_event_with_relations(&event_id, None).await?
623            {
624                related_events.extend(related);
625            }
626        }
627
628        self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
629
630        // Now that we've inserted the thread events, add the aggregations too.
631        if !related_events.is_empty() {
632            self.handle_remote_aggregations(
633                vec![VectorDiff::Append { values: related_events }],
634                RemoteEventOrigin::Cache,
635            )
636            .await;
637        }
638
639        Ok(has_events)
640    }
641
642    /// Listens to encryption state changes for the room in
643    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
644    /// existing timeline items. This will then cause a refresh of those
645    /// timeline items.
646    pub async fn handle_encryption_state_changes(&self) {
647        let mut room_info = self.room_data_provider.room_info();
648
649        // Small function helper to help mark as encrypted.
650        let mark_encrypted = || async {
651            let mut state = self.state.write().await;
652            state.meta.is_room_encrypted = true;
653            state.mark_all_events_as_encrypted();
654        };
655
656        if room_info.get().encryption_state().is_encrypted() {
657            // If the room was already encrypted, it won't toggle to unencrypted, so we can
658            // shut down this task early.
659            mark_encrypted().await;
660            return;
661        }
662
663        while let Some(info) = room_info.next().await {
664            if info.encryption_state().is_encrypted() {
665                mark_encrypted().await;
666                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
667                // here is done.
668                break;
669            }
670        }
671    }
672
673    /// Run a lazy backwards pagination (in live mode).
674    ///
675    /// It adjusts the `count` value of the `Skip` higher-order stream so that
676    /// more items are pushed front in the timeline.
677    ///
678    /// If no more items are available (i.e. if the `count` is zero), this
679    /// method returns `Some(needs)` where `needs` is the number of events that
680    /// must be unlazily backwards paginated.
681    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
682        let state = self.state.read().await;
683
684        let (count, needs) = state
685            .meta
686            .subscriber_skip_count
687            .compute_next_when_paginating_backwards(num_events.into());
688
689        // This always happens on a live timeline.
690        let is_live_timeline = true;
691        state.meta.subscriber_skip_count.update(count, is_live_timeline);
692
693        needs
694    }
695
696    /// Run a backwards pagination (in focused mode) and append the results to
697    /// the timeline.
698    ///
699    /// Returns whether we hit the start of the timeline.
700    pub(super) async fn focused_paginate_backwards(
701        &self,
702        num_events: u16,
703    ) -> Result<bool, PaginationError> {
704        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
705            TimelineFocusKind::Live { .. }
706            | TimelineFocusKind::PinnedEvents
707            | TimelineFocusKind::Thread { .. } => {
708                return Err(PaginationError::NotSupported);
709            }
710            TimelineFocusKind::Event { paginator, .. } => paginator
711                .get()
712                .expect("Paginator was not instantiated")
713                .paginate_backwards(num_events)
714                .await
715                .map_err(PaginationError::Paginator)?,
716        };
717
718        // Events are in reverse topological order.
719        // We can push front each event individually.
720        self.handle_remote_events_with_diffs(
721            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
722            RemoteEventOrigin::Pagination,
723        )
724        .await;
725
726        Ok(hit_end_of_timeline)
727    }
728
729    /// Run a forwards pagination (in focused mode) and append the results to
730    /// the timeline.
731    ///
732    /// Returns whether we hit the end of the timeline.
733    pub(super) async fn focused_paginate_forwards(
734        &self,
735        num_events: u16,
736    ) -> Result<bool, PaginationError> {
737        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
738            TimelineFocusKind::Live { .. }
739            | TimelineFocusKind::PinnedEvents
740            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
741
742            TimelineFocusKind::Event { paginator, .. } => paginator
743                .get()
744                .expect("Paginator was not instantiated")
745                .paginate_forwards(num_events)
746                .await
747                .map_err(PaginationError::Paginator)?,
748        };
749
750        // Events are in topological order.
751        // We can append all events with no transformation.
752        self.handle_remote_events_with_diffs(
753            vec![VectorDiff::Append { values: events.into() }],
754            RemoteEventOrigin::Pagination,
755        )
756        .await;
757
758        Ok(hit_end_of_timeline)
759    }
760
761    /// Is this timeline receiving events from sync (aka has a live focus)?
762    pub(super) fn is_live(&self) -> bool {
763        matches!(&*self.focus, TimelineFocusKind::Live { .. })
764    }
765
766    /// Is this timeline focused on a thread?
767    pub(super) fn is_threaded(&self) -> bool {
768        self.focus.is_thread()
769    }
770
771    /// The root of the current thread, for a live thread timeline or a
772    /// permalink to a thread message.
773    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
774        self.focus.thread_root().map(ToOwned::to_owned)
775    }
776
777    /// Get a copy of the current items in the list.
778    ///
779    /// Cheap because `im::Vector` is cheap to clone.
780    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
781        self.state.read().await.items.clone_items()
782    }
783
784    #[cfg(test)]
785    pub(super) async fn subscribe_raw(
786        &self,
787    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
788        self.state.read().await.items.subscribe().into_values_and_stream()
789    }
790
791    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
792        let state = self.state.read().await;
793
794        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
795    }
796
797    pub(super) async fn subscribe_filter_map<U, F>(
798        &self,
799        f: F,
800    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
801    where
802        U: Clone,
803        F: Fn(Arc<TimelineItem>) -> Option<U>,
804    {
805        self.state.read().await.items.subscribe().filter_map(f)
806    }
807
808    /// Toggle a reaction locally.
809    ///
810    /// Returns true if the reaction was added, false if it was removed.
811    #[instrument(skip_all)]
812    pub(super) async fn toggle_reaction_local(
813        &self,
814        item_id: &TimelineEventItemId,
815        key: &str,
816    ) -> Result<bool, Error> {
817        let mut state = self.state.write().await;
818
819        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
820            warn!("Timeline item not found, can't add reaction");
821            return Err(Error::FailedToToggleReaction);
822        };
823
824        let user_id = self.room_data_provider.own_user_id();
825        let prev_status = item
826            .content()
827            .reactions()
828            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
829
830        let Some(prev_status) = prev_status else {
831            // Adding the new reaction.
832            match item.handle() {
833                TimelineItemHandle::Local(send_handle) => {
834                    if send_handle
835                        .react(key.to_owned())
836                        .await
837                        .map_err(|err| Error::SendQueueError(err.into()))?
838                        .is_some()
839                    {
840                        trace!("adding a reaction to a local echo");
841                        return Ok(true);
842                    }
843
844                    warn!("couldn't toggle reaction for local echo");
845                    return Ok(false);
846                }
847
848                TimelineItemHandle::Remote(event_id) => {
849                    // Add a reaction through the room data provider.
850                    // No need to reflect the effect locally, since the local echo handling will
851                    // take care of it.
852                    trace!("adding a reaction to a remote echo");
853                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
854                    self.room_data_provider
855                        .send(ReactionEventContent::from(annotation).into())
856                        .await?;
857                    return Ok(true);
858                }
859            }
860        };
861
862        trace!("removing a previous reaction");
863        match prev_status {
864            ReactionStatus::LocalToLocal(send_reaction_handle) => {
865                if let Some(handle) = send_reaction_handle {
866                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
867                        // Impossible state: the reaction has moved from local to echo under our
868                        // feet, but the timeline was supposed to be locked!
869                        warn!("unexpectedly unable to abort sending of local reaction");
870                    }
871                } else {
872                    warn!("no send reaction handle (this should only happen in testing contexts)");
873                }
874            }
875
876            ReactionStatus::LocalToRemote(send_handle) => {
877                // No need to reflect the change ourselves, since handling the discard of the
878                // local echo will take care of it.
879                trace!("aborting send of the previous reaction that was a local echo");
880                if let Some(handle) = send_handle {
881                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
882                        // Impossible state: the reaction has moved from local to echo under our
883                        // feet, but the timeline was supposed to be locked!
884                        warn!("unexpectedly unable to abort sending of local reaction");
885                    }
886                } else {
887                    warn!("no send handle (this should only happen in testing contexts)");
888                }
889            }
890
891            ReactionStatus::RemoteToRemote(event_id) => {
892                // Assume the redaction will work; we'll re-add the reaction if it didn't.
893                let Some(annotated_event_id) =
894                    item.as_remote().map(|event_item| event_item.event_id.clone())
895                else {
896                    warn!("remote reaction to remote event, but the associated item isn't remote");
897                    return Ok(false);
898                };
899
900                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
901                let reaction_info = reactions.remove_reaction(user_id, key);
902
903                if reaction_info.is_some() {
904                    let new_item = item.with_reactions(reactions);
905                    state.items.replace(item_pos, new_item);
906                } else {
907                    warn!(
908                        "reaction is missing on the item, not removing it locally, \
909                         but sending redaction."
910                    );
911                }
912
913                // Release the lock before running the request.
914                drop(state);
915
916                trace!("sending redact for a previous reaction");
917                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
918                    if let Some(reaction_info) = reaction_info {
919                        debug!("sending redact failed, adding the reaction back to the list");
920
921                        let mut state = self.state.write().await;
922                        if let Some((item_pos, item)) =
923                            rfind_event_by_id(&state.items, &annotated_event_id)
924                        {
925                            // Re-add the reaction to the mapping.
926                            let mut reactions =
927                                item.content().reactions().cloned().unwrap_or_default();
928                            reactions
929                                .entry(key.to_owned())
930                                .or_default()
931                                .insert(user_id.to_owned(), reaction_info);
932                            let new_item = item.with_reactions(reactions);
933                            state.items.replace(item_pos, new_item);
934                        } else {
935                            warn!(
936                                "couldn't find item to re-add reaction anymore; \
937                                 maybe it's been redacted?"
938                            );
939                        }
940                    }
941
942                    return Err(err);
943                }
944            }
945        }
946
947        Ok(false)
948    }
949
950    /// Handle updates on events as [`VectorDiff`]s.
951    pub(super) async fn handle_remote_events_with_diffs(
952        &self,
953        diffs: Vec<VectorDiff<TimelineEvent>>,
954        origin: RemoteEventOrigin,
955    ) {
956        if diffs.is_empty() {
957            return;
958        }
959
960        let mut state = self.state.write().await;
961        state
962            .handle_remote_events_with_diffs(
963                diffs,
964                origin,
965                &self.room_data_provider,
966                &self.settings,
967            )
968            .await
969    }
970
971    /// Only handle aggregations received as [`VectorDiff`]s.
972    pub(super) async fn handle_remote_aggregations(
973        &self,
974        diffs: Vec<VectorDiff<TimelineEvent>>,
975        origin: RemoteEventOrigin,
976    ) {
977        if diffs.is_empty() {
978            return;
979        }
980
981        let mut state = self.state.write().await;
982        state
983            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
984            .await
985    }
986
987    pub(super) async fn clear(&self) {
988        self.state.write().await.clear();
989    }
990
991    /// Replaces the content of the current timeline with initial events.
992    ///
993    /// Also sets up read receipts and the read marker for a live timeline of a
994    /// room.
995    ///
996    /// This is all done with a single lock guard, since we don't want the state
997    /// to be modified between the clear and re-insertion of new events.
998    pub(super) async fn replace_with_initial_remote_events<Events>(
999        &self,
1000        events: Events,
1001        origin: RemoteEventOrigin,
1002    ) where
1003        Events: IntoIterator,
1004        <Events as IntoIterator>::Item: Into<TimelineEvent>,
1005    {
1006        let mut state = self.state.write().await;
1007
1008        let track_read_markers = &self.settings.track_read_receipts;
1009        if track_read_markers.is_enabled() {
1010            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
1011            state
1012                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
1013                .await;
1014        }
1015
1016        // Replace the events if either the current event list or the new one aren't
1017        // empty.
1018        // Previously we just had to check the new one wasn't empty because
1019        // we did a clear operation before so the current one would always be empty, but
1020        // now we may want to replace a populated timeline with an empty one.
1021        let mut events = events.into_iter().peekable();
1022        if !state.items.is_empty() || events.peek().is_some() {
1023            state
1024                .replace_with_remote_events(
1025                    events,
1026                    origin,
1027                    &self.room_data_provider,
1028                    &self.settings,
1029                )
1030                .await;
1031        }
1032
1033        if track_read_markers.is_enabled() {
1034            if let Some(fully_read_event_id) =
1035                self.room_data_provider.load_fully_read_marker().await
1036            {
1037                state.handle_fully_read_marker(fully_read_event_id);
1038            } else if let Some(latest_receipt_event_id) = state
1039                .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1040            {
1041                // Fall back to read receipt if no fully read marker exists.
1042                debug!("no `m.fully_read` marker found, falling back to read receipt");
1043                state.handle_fully_read_marker(latest_receipt_event_id);
1044            }
1045        }
1046    }
1047
1048    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1049        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1050    }
1051
1052    pub(super) async fn handle_ephemeral_events(
1053        &self,
1054        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1055    ) {
1056        // Don't even take the lock if there are no events to process.
1057        if events.is_empty() {
1058            return;
1059        }
1060        let mut state = self.state.write().await;
1061        state.handle_ephemeral_events(events, &self.room_data_provider).await;
1062    }
1063
1064    /// Creates the local echo for an event we're sending.
1065    #[instrument(skip_all)]
1066    pub(super) async fn handle_local_event(
1067        &self,
1068        txn_id: OwnedTransactionId,
1069        content: AnyMessageLikeEventContent,
1070        send_handle: Option<SendHandle>,
1071    ) {
1072        let sender = self.room_data_provider.own_user_id().to_owned();
1073        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1074
1075        let date_divider_mode = self.settings.date_divider_mode.clone();
1076
1077        let mut state = self.state.write().await;
1078        state
1079            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1080            .await;
1081    }
1082
1083    /// Update the send state of a local event represented by a transaction ID.
1084    ///
1085    /// If the corresponding local timeline item is missing, a warning is
1086    /// raised.
1087    #[instrument(skip(self))]
1088    pub(super) async fn update_event_send_state(
1089        &self,
1090        txn_id: &TransactionId,
1091        send_state: EventSendState,
1092    ) {
1093        let mut state = self.state.write().await;
1094        let mut txn = state.transaction();
1095
1096        let new_event_id: Option<&EventId> =
1097            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1098
1099        // The local echoes are always at the end of the timeline, we must first make
1100        // sure the remote echo hasn't showed up yet.
1101        if rfind_event_item(&txn.items, |it| {
1102            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1103        })
1104        .is_some()
1105        {
1106            // Remote echo already received. This is very unlikely.
1107            trace!("Remote echo received before send-event response");
1108
1109            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1110
1111            // If there's both the remote echo and a local echo, that means the
1112            // remote echo was received before the response *and* contained no
1113            // transaction ID (and thus duplicated the local echo).
1114            if let Some((idx, _)) = local_echo {
1115                warn!("Message echo got duplicated, removing the local one");
1116                txn.items.remove(idx);
1117
1118                // Adjust the date dividers, if needs be.
1119                let mut adjuster =
1120                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1121                adjuster.run(&mut txn.items, &mut txn.meta);
1122            }
1123
1124            txn.commit();
1125            return;
1126        }
1127
1128        // Look for the local event by the transaction ID or event ID.
1129        let result = rfind_event_item(&txn.items, |it| {
1130            it.transaction_id() == Some(txn_id)
1131                || new_event_id.is_some()
1132                    && it.event_id() == new_event_id
1133                    && it.as_local().is_some()
1134        });
1135
1136        let Some((idx, item)) = result else {
1137            // Event wasn't found as a standalone item.
1138            //
1139            // If it was just sent, try to find if it matches a corresponding aggregation,
1140            // and mark it as sent in that case.
1141            if let Some(new_event_id) = new_event_id {
1142                if txn.meta.aggregations.mark_aggregation_as_sent(
1143                    txn_id.to_owned(),
1144                    new_event_id.to_owned(),
1145                    &mut txn.items,
1146                    &txn.meta.room_version_rules,
1147                ) {
1148                    trace!("Aggregation marked as sent");
1149                    txn.commit();
1150                    return;
1151                }
1152
1153                trace!("Sent aggregation was not found");
1154            }
1155
1156            warn!("Timeline item not found, can't update send state");
1157            return;
1158        };
1159
1160        let Some(local_item) = item.as_local() else {
1161            warn!("We looked for a local item, but it transitioned to remote.");
1162            return;
1163        };
1164
1165        // The event was already marked as sent, that's a broken state, let's
1166        // emit an error but also override to the given sent state.
1167        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1168            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1169        }
1170
1171        // If the event has just been marked as sent, update the aggregations mapping to
1172        // take that into account.
1173        if let Some(new_event_id) = new_event_id {
1174            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1175        }
1176
1177        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1178        txn.items.replace(idx, new_item);
1179
1180        txn.commit();
1181    }
1182
1183    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1184        let mut state = self.state.write().await;
1185
1186        if let Some((idx, _)) =
1187            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1188        {
1189            let mut txn = state.transaction();
1190
1191            txn.items.remove(idx);
1192
1193            // A read marker or a date divider may have been inserted before the local echo.
1194            // Ensure both are up to date.
1195            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1196            adjuster.run(&mut txn.items, &mut txn.meta);
1197
1198            txn.meta.update_read_marker(&mut txn.items);
1199
1200            txn.commit();
1201
1202            debug!("discarded local echo");
1203            return true;
1204        }
1205
1206        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1207        // dereferencing it once.
1208        let mut txn = state.transaction();
1209
1210        // Look if this was a local aggregation.
1211        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1212            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1213            &mut txn.items,
1214        ) {
1215            Ok(val) => val,
1216            Err(err) => {
1217                warn!("error when discarding local echo for an aggregation: {err}");
1218                // The aggregation has been found, it's just that we couldn't discard it.
1219                true
1220            }
1221        };
1222
1223        if found_aggregation {
1224            txn.commit();
1225        }
1226
1227        found_aggregation
1228    }
1229
1230    pub(super) async fn replace_local_echo(
1231        &self,
1232        txn_id: &TransactionId,
1233        content: AnyMessageLikeEventContent,
1234    ) -> bool {
1235        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1236            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1237            // handling RoomMessage should be sufficient in most cases. Worst
1238            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1239            // editing the event then.
1240            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1241            return false;
1242        };
1243
1244        let mut state = self.state.write().await;
1245        let mut txn = state.transaction();
1246
1247        let Some((idx, prev_item)) =
1248            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1249        else {
1250            debug!("Can't find local echo to replace");
1251            return false;
1252        };
1253
1254        // Reuse the previous local echo's state, but reset the send state to not sent
1255        // (per API contract).
1256        let ti_kind = {
1257            let Some(prev_local_item) = prev_item.as_local() else {
1258                warn!("We looked for a local item, but it transitioned as remote??");
1259                return false;
1260            };
1261            // If the local echo had an upload progress, retain it.
1262            let progress = as_variant!(&prev_local_item.send_state,
1263                EventSendState::NotSentYet { progress } => progress.clone())
1264            .flatten();
1265            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1266        };
1267
1268        // Replace the local-related state (kind) and the content state.
1269        let new_item = TimelineItem::new(
1270            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1271                content.msgtype,
1272                content.mentions,
1273                prev_item.content().reactions().cloned().unwrap_or_default(),
1274                prev_item.content().thread_root(),
1275                prev_item.content().in_reply_to(),
1276                prev_item.content().thread_summary(),
1277            )),
1278            prev_item.internal_id.to_owned(),
1279        );
1280
1281        txn.items.replace(idx, new_item);
1282
1283        // This doesn't change the original sending time, so there's no need to adjust
1284        // date dividers.
1285
1286        txn.commit();
1287
1288        debug!("Replaced local echo");
1289        true
1290    }
1291
1292    pub(super) async fn compute_redecryption_candidates(
1293        &self,
1294    ) -> (BTreeSet<String>, BTreeSet<String>) {
1295        let state = self.state.read().await;
1296        compute_redecryption_candidates(&state.items)
1297    }
1298
1299    pub(super) async fn set_sender_profiles_pending(&self) {
1300        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1301    }
1302
1303    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1304        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1305    }
1306
1307    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1308        self.state.write().await.items.for_each(|mut entry| {
1309            let Some(event_item) = entry.as_event() else { return };
1310            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1311                let new_item = entry.with_kind(TimelineItemKind::Event(
1312                    event_item.with_sender_profile(profile_state.clone()),
1313                ));
1314                ObservableItemsEntry::replace(&mut entry, new_item);
1315            }
1316        });
1317    }
1318
1319    pub(super) async fn update_missing_sender_profiles(&self) {
1320        trace!("Updating missing sender profiles");
1321
1322        let mut state = self.state.write().await;
1323        let mut entries = state.items.entries();
1324        while let Some(mut entry) = entries.next() {
1325            let Some(event_item) = entry.as_event() else { continue };
1326            let event_id = event_item.event_id().map(debug);
1327            let transaction_id = event_item.transaction_id().map(debug);
1328
1329            if event_item.sender_profile().is_ready() {
1330                trace!(event_id, transaction_id, "Profile already set");
1331                continue;
1332            }
1333
1334            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1335                Some(profile) => {
1336                    trace!(event_id, transaction_id, "Adding profile");
1337                    let updated_item =
1338                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1339                    let new_item = entry.with_kind(updated_item);
1340                    ObservableItemsEntry::replace(&mut entry, new_item);
1341                }
1342                None => {
1343                    if !event_item.sender_profile().is_unavailable() {
1344                        trace!(event_id, transaction_id, "Marking profile unavailable");
1345                        let updated_item =
1346                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1347                        let new_item = entry.with_kind(updated_item);
1348                        ObservableItemsEntry::replace(&mut entry, new_item);
1349                    } else {
1350                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1351                    }
1352                }
1353            }
1354        }
1355
1356        trace!("Done updating missing sender profiles");
1357    }
1358
1359    /// Update the profiles of the given senders, even if they are ready.
1360    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1361        trace!("Forcing update of sender profiles: {sender_ids:?}");
1362
1363        let mut state = self.state.write().await;
1364        let mut entries = state.items.entries();
1365        while let Some(mut entry) = entries.next() {
1366            let Some(event_item) = entry.as_event() else { continue };
1367            if !sender_ids.contains(event_item.sender()) {
1368                continue;
1369            }
1370
1371            let event_id = event_item.event_id().map(debug);
1372            let transaction_id = event_item.transaction_id().map(debug);
1373
1374            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1375                Some(profile) => {
1376                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1377                    {
1378                        debug!(event_id, transaction_id, "Profile already up-to-date");
1379                    } else {
1380                        trace!(event_id, transaction_id, "Updating profile");
1381                        let updated_item =
1382                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1383                        let new_item = entry.with_kind(updated_item);
1384                        ObservableItemsEntry::replace(&mut entry, new_item);
1385                    }
1386                }
1387                None => {
1388                    if !event_item.sender_profile().is_unavailable() {
1389                        trace!(event_id, transaction_id, "Marking profile unavailable");
1390                        let updated_item =
1391                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1392                        let new_item = entry.with_kind(updated_item);
1393                        ObservableItemsEntry::replace(&mut entry, new_item);
1394                    } else {
1395                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1396                    }
1397                }
1398            }
1399        }
1400
1401        trace!("Done forcing update of sender profiles");
1402    }
1403
1404    #[cfg(test)]
1405    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1406        let own_user_id = self.room_data_provider.own_user_id();
1407        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1408    }
1409
1410    /// Get the latest read receipt for the given user.
1411    ///
1412    /// Useful to get the latest read receipt, whether it's private or public.
1413    pub(super) async fn latest_user_read_receipt(
1414        &self,
1415        user_id: &UserId,
1416    ) -> Option<(OwnedEventId, Receipt)> {
1417        let receipt_thread = self.focus.receipt_thread();
1418
1419        self.state
1420            .read()
1421            .await
1422            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1423            .await
1424    }
1425
1426    /// Get the ID of the timeline event with the latest read receipt for the
1427    /// given user.
1428    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1429        &self,
1430        user_id: &UserId,
1431    ) -> Option<OwnedEventId> {
1432        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1433    }
1434
1435    /// Subscribe to changes in the read receipts of our own user.
1436    pub async fn subscribe_own_user_read_receipts_changed(
1437        &self,
1438    ) -> impl Stream<Item = ()> + use<P> {
1439        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1440    }
1441
1442    /// Handle a room send update that's a new local echo.
1443    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1444        match echo.content {
1445            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1446                let content = match serialized_event.deserialize() {
1447                    Ok(d) => d,
1448                    Err(err) => {
1449                        warn!("error deserializing local echo: {err}");
1450                        return;
1451                    }
1452                };
1453
1454                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1455                    .await;
1456
1457                if let Some(send_error) = send_error {
1458                    self.update_event_send_state(
1459                        &echo.transaction_id,
1460                        EventSendState::SendingFailed {
1461                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1462                                send_error,
1463                            ))),
1464                            is_recoverable: false,
1465                        },
1466                    )
1467                    .await;
1468                }
1469            }
1470
1471            LocalEchoContent::React { key, send_handle, applies_to } => {
1472                self.handle_local_reaction(key, send_handle, applies_to).await;
1473            }
1474        }
1475    }
1476
1477    /// Adds a reaction (local echo) to a local echo.
1478    #[instrument(skip(self, send_handle))]
1479    async fn handle_local_reaction(
1480        &self,
1481        reaction_key: String,
1482        send_handle: SendReactionHandle,
1483        applies_to: OwnedTransactionId,
1484    ) {
1485        let mut state = self.state.write().await;
1486        let mut tr = state.transaction();
1487
1488        let target = TimelineEventItemId::TransactionId(applies_to);
1489
1490        let reaction_txn_id = send_handle.transaction_id().to_owned();
1491        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1492        let aggregation = Aggregation::new(
1493            TimelineEventItemId::TransactionId(reaction_txn_id),
1494            AggregationKind::Reaction {
1495                key: reaction_key.clone(),
1496                sender: self.room_data_provider.own_user_id().to_owned(),
1497                timestamp: MilliSecondsSinceUnixEpoch::now(),
1498                reaction_status,
1499            },
1500        );
1501
1502        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1503        find_item_and_apply_aggregation(
1504            &tr.meta.aggregations,
1505            &mut tr.items,
1506            &target,
1507            aggregation,
1508            &tr.meta.room_version_rules,
1509        );
1510
1511        tr.commit();
1512    }
1513
1514    /// Handle a single room send queue update.
1515    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1516        match update {
1517            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1518                self.handle_local_echo(echo).await;
1519            }
1520
1521            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1522                if !self.discard_local_echo(&transaction_id).await {
1523                    warn!("couldn't find the local echo to discard");
1524                }
1525            }
1526
1527            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1528                let content = match new_content.deserialize() {
1529                    Ok(d) => d,
1530                    Err(err) => {
1531                        warn!("error deserializing local echo (upon edit): {err}");
1532                        return;
1533                    }
1534                };
1535
1536                if !self.replace_local_echo(&transaction_id, content).await {
1537                    warn!("couldn't find the local echo to replace");
1538                }
1539            }
1540
1541            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1542                self.update_event_send_state(
1543                    &transaction_id,
1544                    EventSendState::SendingFailed { error, is_recoverable },
1545                )
1546                .await;
1547            }
1548
1549            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1550                self.update_event_send_state(
1551                    &transaction_id,
1552                    EventSendState::NotSentYet { progress: None },
1553                )
1554                .await;
1555            }
1556
1557            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1558                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1559                    .await;
1560            }
1561
1562            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1563                self.update_event_send_state(
1564                    &related_to,
1565                    EventSendState::NotSentYet {
1566                        progress: Some(MediaUploadProgress { index, progress }),
1567                    },
1568                )
1569                .await;
1570            }
1571        }
1572    }
1573
1574    /// Insert a timeline start item at the beginning of the room, if it's
1575    /// missing.
1576    pub async fn insert_timeline_start_if_missing(&self) {
1577        let mut state = self.state.write().await;
1578        let mut txn = state.transaction();
1579        txn.items.push_timeline_start_if_missing(
1580            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1581        );
1582        txn.commit();
1583    }
1584
1585    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1586    /// timeline or not.
1587    ///
1588    /// Can be `None` if the event cannot be represented as a standalone item,
1589    /// because it's an aggregation.
1590    pub(super) async fn make_replied_to(
1591        &self,
1592        event: TimelineEvent,
1593    ) -> Result<Option<EmbeddedEvent>, Error> {
1594        let state = self.state.read().await;
1595        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1596    }
1597}
1598
1599impl TimelineController {
1600    pub(super) fn room(&self) -> &Room {
1601        &self.room_data_provider
1602    }
1603
1604    /// Given an event identifier, will fetch the details for the event it's
1605    /// replying to, if applicable.
1606    #[instrument(skip(self))]
1607    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1608        let state_guard = self.state.write().await;
1609        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1610            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1611        let remote_item = item
1612            .as_remote()
1613            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1614            .clone();
1615
1616        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1617            debug!("Event is not a message");
1618            return Ok(());
1619        };
1620        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1621            debug!("Event is not a reply");
1622            return Ok(());
1623        };
1624        if let TimelineDetails::Pending = &in_reply_to.event {
1625            debug!("Replied-to event is already being fetched");
1626            return Ok(());
1627        }
1628        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1629            debug!("Replied-to event has already been fetched");
1630            return Ok(());
1631        }
1632
1633        let internal_id = item.internal_id.to_owned();
1634        let item = item.clone();
1635        let event = fetch_replied_to_event(
1636            state_guard,
1637            &self.state,
1638            index,
1639            &item,
1640            internal_id,
1641            &msglike,
1642            &in_reply_to.event_id,
1643            self.room(),
1644        )
1645        .await?;
1646
1647        // We need to be sure to have the latest position of the event as it might have
1648        // changed while waiting for the request.
1649        let mut state = self.state.write().await;
1650        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1651            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1652
1653        // Check the state of the event again, it might have been redacted while
1654        // the request was in-flight.
1655        let TimelineItemContent::MsgLike(MsgLikeContent {
1656            kind: MsgLikeKind::Message(message),
1657            reactions,
1658            thread_root,
1659            in_reply_to,
1660            thread_summary,
1661        }) = item.content().clone()
1662        else {
1663            info!("Event is no longer a message (redacted?)");
1664            return Ok(());
1665        };
1666        let Some(in_reply_to) = in_reply_to else {
1667            warn!("Event no longer has a reply (bug?)");
1668            return Ok(());
1669        };
1670
1671        // Now that we've received the content of the replied-to event, replace the
1672        // replied-to content in the item with it.
1673        trace!("Updating in-reply-to details");
1674        let internal_id = item.internal_id.to_owned();
1675        let mut item = item.clone();
1676        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1677            kind: MsgLikeKind::Message(message),
1678            reactions,
1679            thread_root,
1680            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1681            thread_summary,
1682        }));
1683        state.items.replace(index, TimelineItem::new(item, internal_id));
1684
1685        Ok(())
1686    }
1687
1688    /// Returns the thread that should be used for a read receipt based on the
1689    /// current focus of the timeline and the receipt type.
1690    ///
1691    /// A `SendReceiptType::FullyRead` will always use
1692    /// `ReceiptThread::Unthreaded`
1693    pub(super) fn infer_thread_for_read_receipt(
1694        &self,
1695        receipt_type: &SendReceiptType,
1696    ) -> ReceiptThread {
1697        if matches!(receipt_type, SendReceiptType::FullyRead) {
1698            ReceiptThread::Unthreaded
1699        } else {
1700            self.focus.receipt_thread()
1701        }
1702    }
1703
1704    /// Check whether the given receipt should be sent.
1705    ///
1706    /// Returns `false` if the given receipt is older than the current one.
1707    pub(super) async fn should_send_receipt(
1708        &self,
1709        receipt_type: &SendReceiptType,
1710        receipt_thread: &ReceiptThread,
1711        event_id: &EventId,
1712    ) -> bool {
1713        let own_user_id = self.room().own_user_id();
1714        let state = self.state.read().await;
1715        let room = self.room();
1716
1717        match receipt_type {
1718            SendReceiptType::Read => {
1719                if let Some((old_pub_read, _)) = state
1720                    .meta
1721                    .user_receipt(
1722                        own_user_id,
1723                        ReceiptType::Read,
1724                        receipt_thread.clone(),
1725                        room,
1726                        state.items.all_remote_events(),
1727                    )
1728                    .await
1729                {
1730                    trace!(%old_pub_read, "found a previous public receipt");
1731                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1732                        &old_pub_read,
1733                        event_id,
1734                        state.items.all_remote_events(),
1735                    ) {
1736                        trace!(
1737                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1738                        );
1739                        return relative_pos == RelativePosition::After;
1740                    }
1741                }
1742            }
1743
1744            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1745            // doesn't make sense to have a private read receipt behind a public one.
1746            SendReceiptType::ReadPrivate => {
1747                if let Some((old_priv_read, _)) =
1748                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1749                {
1750                    trace!(%old_priv_read, "found a previous private receipt");
1751                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1752                        &old_priv_read,
1753                        event_id,
1754                        state.items.all_remote_events(),
1755                    ) {
1756                        trace!(
1757                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1758                        );
1759                        return relative_pos == RelativePosition::After;
1760                    }
1761                }
1762            }
1763
1764            SendReceiptType::FullyRead => {
1765                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1766                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1767                        &prev_event_id,
1768                        event_id,
1769                        state.items.all_remote_events(),
1770                    )
1771                {
1772                    return relative_pos == RelativePosition::After;
1773                }
1774            }
1775
1776            _ => {}
1777        }
1778
1779        // Let the server handle unknown receipts.
1780        true
1781    }
1782
1783    /// Returns the latest event identifier, even if it's not visible, or if
1784    /// it's folded into another timeline item.
1785    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1786        let state = self.state.read().await;
1787        let filter_out_thread_events = match self.focus() {
1788            TimelineFocusKind::Thread { .. } => false,
1789            TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1790            TimelineFocusKind::Event { paginator } => {
1791                paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1792            }
1793            _ => true,
1794        };
1795
1796        // In some timelines, threaded events are added to the `AllRemoteEvents`
1797        // collection since they need to be taken into account to calculate read
1798        // receipts, but we don't want to actually take them into account for returning
1799        // the latest event id since they're not visibly in the timeline
1800        state
1801            .items
1802            .all_remote_events()
1803            .iter()
1804            .rev()
1805            .filter_map(|item| {
1806                if !filter_out_thread_events || item.thread_root_id.is_none() {
1807                    Some(item.event_id.clone())
1808                } else {
1809                    None
1810                }
1811            })
1812            .next()
1813    }
1814
1815    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1816    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1817        let (utds, decrypted) = self.compute_redecryption_candidates().await;
1818
1819        let request = DecryptionRetryRequest {
1820            room_id: self.room().room_id().to_owned(),
1821            utd_session_ids: utds,
1822            refresh_info_session_ids: decrypted,
1823        };
1824
1825        self.room().client().event_cache().request_decryption(request);
1826    }
1827
1828    /// Combine the global (event cache) pagination status with the local state
1829    /// of the timeline.
1830    ///
1831    /// This only changes the global pagination status of this room, in one
1832    /// case: if the timeline has a skip count greater than 0, it will
1833    /// ensure that the pagination status says that we haven't reached the
1834    /// timeline start yet.
1835    pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1836        match status {
1837            PaginationStatus::Idle { hit_timeline_start } => {
1838                if hit_timeline_start {
1839                    let state = self.state.read().await;
1840                    // If the skip count is greater than 0, it means that a subsequent pagination
1841                    // could return more items, so pretend we didn't get the information that the
1842                    // timeline start was hit.
1843                    if state.meta.subscriber_skip_count.get() > 0 {
1844                        return PaginationStatus::Idle { hit_timeline_start: false };
1845                    }
1846                }
1847            }
1848            PaginationStatus::Paginating => {}
1849        }
1850
1851        // You're perfect, just the way you are.
1852        status
1853    }
1854}
1855
1856impl<P: RoomDataProvider> TimelineController<P> {
1857    /// Returns the timeline focus of the [`TimelineController`].
1858    pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1859        &self.focus
1860    }
1861}
1862
1863#[allow(clippy::too_many_arguments)]
1864async fn fetch_replied_to_event<P: RoomDataProvider>(
1865    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1866    state_lock: &RwLock<TimelineState<P>>,
1867    index: usize,
1868    item: &EventTimelineItem,
1869    internal_id: TimelineUniqueId,
1870    msglike: &MsgLikeContent,
1871    in_reply_to: &EventId,
1872    room: &Room,
1873) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1874    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1875        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1876        trace!("Found replied-to event locally");
1877        return Ok(details);
1878    }
1879
1880    // Replace the item with a new timeline item that has the fetching status of the
1881    // replied-to event to pending.
1882    trace!("Setting in-reply-to details to pending");
1883    let in_reply_to_details =
1884        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1885
1886    let event_item = item
1887        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1888
1889    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1890    state_guard.items.replace(index, new_timeline_item);
1891
1892    // Don't hold the state lock while the network request is made.
1893    drop(state_guard);
1894
1895    trace!("Fetching replied-to event");
1896    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1897        Ok(timeline_event) => {
1898            let state = state_lock.read().await;
1899
1900            let replied_to_item =
1901                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1902
1903            if let Some(item) = replied_to_item {
1904                TimelineDetails::Ready(Box::new(item))
1905            } else {
1906                // The replied-to item is an aggregation, not a standalone item.
1907                return Err(Error::UnsupportedEvent);
1908            }
1909        }
1910
1911        Err(e) => TimelineDetails::Error(Arc::new(e)),
1912    };
1913
1914    Ok(res)
1915}