matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::Result;
25use matrix_sdk::{
26    deserialized_responses::TimelineEvent,
27    event_cache::{RoomEventCache, RoomPaginationStatus},
28    paginators::{PaginationResult, Paginator},
29    send_queue::{
30        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31    },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40        AnySyncTimelineEvent, MessageLikeEventType,
41        poll::unstable_start::UnstablePollStartEventContent,
42        reaction::ReactionEventContent,
43        receipt::{Receipt, ReceiptThread, ReceiptType},
44        relation::Annotation,
45        room::message::{MessageType, Relation},
46    },
47    room_version_rules::RoomVersionRules,
48    serde::Raw,
49};
50use tokio::sync::{RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54    metadata::{RelativePosition, TimelineMetadata},
55    observable_items::{
56        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57        ObservableItemsTransactionEntry,
58    },
59    state::TimelineState,
60    state_transaction::TimelineStateTransaction,
61};
62use super::{
63    DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64    MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65    TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
66    algorithms::{rfind_event_by_id, rfind_event_item},
67    event_item::{ReactionStatus, RemoteEventOrigin},
68    item::TimelineUniqueId,
69    subscriber::TimelineSubscriber,
70    traits::RoomDataProvider,
71};
72use crate::{
73    timeline::{
74        MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75        algorithms::rfind_event_by_item_id,
76        date_dividers::DateDividerAdjuster,
77        event_item::TimelineItemHandle,
78        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79    },
80    unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93
94/// Data associated to the current timeline focus.
95///
96/// This is the private counterpart of [`TimelineFocus`], and it is an augmented
97/// version of it, including extra state that makes it useful over the lifetime
98/// of a timeline.
99#[derive(Debug)]
100pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
101    /// The timeline receives live events from the sync.
102    Live {
103        /// Whether to hide in-thread events from the timeline.
104        hide_threaded_events: bool,
105    },
106
107    /// The timeline is focused on a single event, and it can expand in one
108    /// direction or another.
109    Event {
110        /// The paginator instance.
111        paginator: Paginator<P>,
112
113        /// Whether to hide in-thread events from the timeline.
114        hide_threaded_events: bool,
115    },
116
117    /// A live timeline for a thread.
118    Thread {
119        /// The root event for the current thread.
120        root_event_id: OwnedEventId,
121    },
122
123    PinnedEvents {
124        loader: PinnedEventsLoader,
125    },
126}
127
128impl<P: RoomDataProvider> TimelineFocusKind<P> {
129    /// Returns the [`ReceiptThread`] that should be used for the current
130    /// timeline focus.
131    ///
132    /// Live and event timelines will use the unthreaded read receipt type in
133    /// general, unless they hide in-thread events, in which case they will
134    /// use the main thread.
135    pub(super) fn receipt_thread(&self) -> ReceiptThread {
136        match self {
137            TimelineFocusKind::Live { hide_threaded_events }
138            | TimelineFocusKind::Event { hide_threaded_events, .. } => {
139                if *hide_threaded_events {
140                    ReceiptThread::Main
141                } else {
142                    ReceiptThread::Unthreaded
143                }
144            }
145            TimelineFocusKind::Thread { root_event_id } => {
146                ReceiptThread::Thread(root_event_id.clone())
147            }
148            TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded,
149        }
150    }
151}
152
153#[derive(Clone, Debug)]
154pub(super) struct TimelineController<P: RoomDataProvider = Room> {
155    /// Inner mutable state.
156    state: Arc<RwLock<TimelineState<P>>>,
157
158    /// Focus data.
159    focus: Arc<TimelineFocusKind<P>>,
160
161    /// A [`RoomDataProvider`] implementation, providing data.
162    ///
163    /// The type is a `RoomDataProvider` to allow testing. In the real world,
164    /// this would normally be a [`Room`].
165    pub(crate) room_data_provider: P,
166
167    /// Settings applied to this timeline.
168    pub(super) settings: TimelineSettings,
169
170    /// Long-running task used to retry decryption of timeline items without
171    /// blocking main processing.
172    decryption_retry_task: DecryptionRetryTask<P, P>,
173}
174
175#[derive(Clone)]
176pub(super) struct TimelineSettings {
177    /// Should the read receipts and read markers be handled?
178    pub(super) track_read_receipts: bool,
179
180    /// Event filter that controls what's rendered as a timeline item (and thus
181    /// what can carry read receipts).
182    pub(super) event_filter: Arc<TimelineEventFilterFn>,
183
184    /// Are unparsable events added as timeline items of their own kind?
185    pub(super) add_failed_to_parse: bool,
186
187    /// Should the timeline items be grouped by day or month?
188    pub(super) date_divider_mode: DateDividerMode,
189}
190
191#[cfg(not(tarpaulin_include))]
192impl fmt::Debug for TimelineSettings {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        f.debug_struct("TimelineSettings")
195            .field("track_read_receipts", &self.track_read_receipts)
196            .field("add_failed_to_parse", &self.add_failed_to_parse)
197            .finish_non_exhaustive()
198    }
199}
200
201impl Default for TimelineSettings {
202    fn default() -> Self {
203        Self {
204            track_read_receipts: false,
205            event_filter: Arc::new(default_event_filter),
206            add_failed_to_parse: true,
207            date_divider_mode: DateDividerMode::Daily,
208        }
209    }
210}
211
212/// The default event filter for
213/// [`crate::timeline::TimelineBuilder::event_filter`].
214///
215/// It filters out events that are not rendered by the timeline, including but
216/// not limited to: reactions, edits, redactions on existing messages.
217///
218/// If you have a custom filter, it may be best to chain yours with this one if
219/// you do not want to run into situations where a read receipt is not visible
220/// because it's living on an event that doesn't have a matching timeline item.
221pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
222    match event {
223        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
224            if ev.redacts(&rules.redaction).is_some() {
225                // This is a redaction of an existing message, we'll only update the previous
226                // message and not render a new entry.
227                false
228            } else {
229                // This is a redacted entry, that we'll show only if the redacted entity wasn't
230                // a reaction.
231                ev.event_type() != MessageLikeEventType::Reaction
232            }
233        }
234
235        AnySyncTimelineEvent::MessageLike(msg) => {
236            match msg.original_content() {
237                None => {
238                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
239                    // a reaction.
240                    msg.event_type() != MessageLikeEventType::Reaction
241                }
242
243                Some(original_content) => {
244                    match original_content {
245                        AnyMessageLikeEventContent::RoomMessage(content) => {
246                            if content
247                                .relates_to
248                                .as_ref()
249                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
250                            {
251                                // Edits aren't visible by default.
252                                return false;
253                            }
254
255                            match content.msgtype {
256                                MessageType::Audio(_)
257                                | MessageType::Emote(_)
258                                | MessageType::File(_)
259                                | MessageType::Image(_)
260                                | MessageType::Location(_)
261                                | MessageType::Notice(_)
262                                | MessageType::ServerNotice(_)
263                                | MessageType::Text(_)
264                                | MessageType::Video(_)
265                                | MessageType::VerificationRequest(_) => true,
266                                #[cfg(feature = "unstable-msc4274")]
267                                MessageType::Gallery(_) => true,
268                                _ => false,
269                            }
270                        }
271
272                        AnyMessageLikeEventContent::Sticker(_)
273                        | AnyMessageLikeEventContent::UnstablePollStart(
274                            UnstablePollStartEventContent::New(_),
275                        )
276                        | AnyMessageLikeEventContent::CallInvite(_)
277                        | AnyMessageLikeEventContent::CallNotify(_)
278                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
279
280                        _ => false,
281                    }
282                }
283            }
284        }
285
286        AnySyncTimelineEvent::State(_) => {
287            // All the state events may get displayed by default.
288            true
289        }
290    }
291}
292
293impl<P: RoomDataProvider> TimelineController<P> {
294    pub(super) fn new(
295        room_data_provider: P,
296        focus: TimelineFocus,
297        internal_id_prefix: Option<String>,
298        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
299        is_room_encrypted: bool,
300        settings: TimelineSettings,
301    ) -> Self {
302        let focus = match focus {
303            TimelineFocus::Live { hide_threaded_events } => {
304                TimelineFocusKind::Live { hide_threaded_events }
305            }
306
307            TimelineFocus::Event { hide_threaded_events, .. } => {
308                let paginator = Paginator::new(room_data_provider.clone());
309                TimelineFocusKind::Event { paginator, hide_threaded_events }
310            }
311
312            TimelineFocus::Thread { root_event_id, .. } => {
313                TimelineFocusKind::Thread { root_event_id }
314            }
315
316            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
317                TimelineFocusKind::PinnedEvents {
318                    loader: PinnedEventsLoader::new(
319                        Arc::new(room_data_provider.clone()),
320                        max_events_to_load as usize,
321                        max_concurrent_requests as usize,
322                    ),
323                }
324            }
325        };
326
327        let focus = Arc::new(focus);
328        let state = Arc::new(RwLock::new(TimelineState::new(
329            focus.clone(),
330            room_data_provider.own_user_id().to_owned(),
331            room_data_provider.room_version_rules(),
332            internal_id_prefix,
333            unable_to_decrypt_hook,
334            is_room_encrypted,
335        )));
336
337        let decryption_retry_task =
338            DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
339
340        Self { state, focus, room_data_provider, settings, decryption_retry_task }
341    }
342
343    /// Initializes the configured focus with appropriate data.
344    ///
345    /// Should be called only once after creation of the [`TimelineInner`], with
346    /// all its fields set.
347    ///
348    /// Returns whether there were any events added to the timeline.
349    pub(super) async fn init_focus(
350        &self,
351        focus: &TimelineFocus,
352        room_event_cache: &RoomEventCache,
353    ) -> Result<bool, Error> {
354        match focus {
355            TimelineFocus::Live { .. } => {
356                // Retrieve the cached events, and add them to the timeline.
357                let events = room_event_cache.events().await;
358
359                let has_events = !events.is_empty();
360
361                self.replace_with_initial_remote_events(
362                    events.into_iter(),
363                    RemoteEventOrigin::Cache,
364                )
365                .await;
366
367                match room_event_cache.pagination().status().get() {
368                    RoomPaginationStatus::Idle { hit_timeline_start } => {
369                        if hit_timeline_start {
370                            // Eagerly insert the timeline start item, since pagination claims
371                            // we've already hit the timeline start.
372                            self.insert_timeline_start_if_missing().await;
373                        }
374                    }
375                    RoomPaginationStatus::Paginating => {}
376                }
377
378                Ok(has_events)
379            }
380
381            TimelineFocus::Event { target: event_id, num_context_events, .. } => {
382                let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
383                    // Note: this is sync'd with code in the ctor.
384                    unreachable!();
385                };
386
387                // Start a /context request, and append the results (in order) to the timeline.
388                let start_from_result = paginator
389                    .start_from(event_id, (*num_context_events).into())
390                    .await
391                    .map_err(PaginationError::Paginator)?;
392
393                let has_events = !start_from_result.events.is_empty();
394
395                self.replace_with_initial_remote_events(
396                    start_from_result.events.into_iter(),
397                    RemoteEventOrigin::Pagination,
398                )
399                .await;
400
401                Ok(has_events)
402            }
403
404            TimelineFocus::Thread { root_event_id, .. } => {
405                let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
406                let has_events = !events.is_empty();
407
408                // For each event, we also need to find the related events, as they don't
409                // include the thread relationship, they won't be included in
410                // the initial list of events.
411                let mut related_events = Vector::new();
412                for event_id in events.iter().filter_map(|event| event.event_id()) {
413                    if let Some((_original, related)) =
414                        room_event_cache.find_event_with_relations(&event_id, None).await
415                    {
416                        related_events.extend(related);
417                    }
418                }
419
420                self.replace_with_initial_remote_events(
421                    events.into_iter(),
422                    RemoteEventOrigin::Cache,
423                )
424                .await;
425
426                // Now that we've inserted the thread events, add the aggregations too.
427                if !related_events.is_empty() {
428                    self.handle_remote_aggregations(
429                        vec![VectorDiff::Append { values: related_events }],
430                        RemoteEventOrigin::Cache,
431                    )
432                    .await;
433                }
434
435                Ok(has_events)
436            }
437
438            TimelineFocus::PinnedEvents { .. } => {
439                let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
440                    // Note: this is sync'd with code in the ctor.
441                    unreachable!();
442                };
443
444                let Some(loaded_events) =
445                    loader.load_events().await.map_err(Error::PinnedEventsError)?
446                else {
447                    // There wasn't any events.
448                    return Ok(false);
449                };
450
451                let has_events = !loaded_events.is_empty();
452
453                self.replace_with_initial_remote_events(
454                    loaded_events.into_iter(),
455                    RemoteEventOrigin::Pagination,
456                )
457                .await;
458
459                Ok(has_events)
460            }
461        }
462    }
463
464    /// Listens to encryption state changes for the room in
465    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
466    /// existing timeline items. This will then cause a refresh of those
467    /// timeline items.
468    pub async fn handle_encryption_state_changes(&self) {
469        let mut room_info = self.room_data_provider.room_info();
470
471        // Small function helper to help mark as encrypted.
472        let mark_encrypted = || async {
473            let mut state = self.state.write().await;
474            state.meta.is_room_encrypted = true;
475            state.mark_all_events_as_encrypted();
476        };
477
478        if room_info.get().encryption_state().is_encrypted() {
479            // If the room was already encrypted, it won't toggle to unencrypted, so we can
480            // shut down this task early.
481            mark_encrypted().await;
482            return;
483        }
484
485        while let Some(info) = room_info.next().await {
486            if info.encryption_state().is_encrypted() {
487                mark_encrypted().await;
488                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
489                // here is done.
490                break;
491            }
492        }
493    }
494
495    pub(crate) async fn reload_pinned_events(
496        &self,
497    ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
498        if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
499            loader.load_events().await
500        } else {
501            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
502        }
503    }
504
505    /// Run a lazy backwards pagination (in live mode).
506    ///
507    /// It adjusts the `count` value of the `Skip` higher-order stream so that
508    /// more items are pushed front in the timeline.
509    ///
510    /// If no more items are available (i.e. if the `count` is zero), this
511    /// method returns `Some(needs)` where `needs` is the number of events that
512    /// must be unlazily backwards paginated.
513    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
514        let state = self.state.read().await;
515
516        let (count, needs) = state
517            .meta
518            .subscriber_skip_count
519            .compute_next_when_paginating_backwards(num_events.into());
520
521        // This always happens on a live timeline.
522        let is_live_timeline = true;
523        state.meta.subscriber_skip_count.update(count, is_live_timeline);
524
525        needs
526    }
527
528    /// Run a backwards pagination (in focused mode) and append the results to
529    /// the timeline.
530    ///
531    /// Returns whether we hit the start of the timeline.
532    pub(super) async fn focused_paginate_backwards(
533        &self,
534        num_events: u16,
535    ) -> Result<bool, PaginationError> {
536        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
537            TimelineFocusKind::Live { .. }
538            | TimelineFocusKind::PinnedEvents { .. }
539            | TimelineFocusKind::Thread { .. } => {
540                return Err(PaginationError::NotSupported);
541            }
542            TimelineFocusKind::Event { paginator, .. } => paginator
543                .paginate_backward(num_events.into())
544                .await
545                .map_err(PaginationError::Paginator)?,
546        };
547
548        // Events are in reverse topological order.
549        // We can push front each event individually.
550        self.handle_remote_events_with_diffs(
551            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
552            RemoteEventOrigin::Pagination,
553        )
554        .await;
555
556        Ok(hit_end_of_timeline)
557    }
558
559    /// Run a forwards pagination (in focused mode) and append the results to
560    /// the timeline.
561    ///
562    /// Returns whether we hit the end of the timeline.
563    pub(super) async fn focused_paginate_forwards(
564        &self,
565        num_events: u16,
566    ) -> Result<bool, PaginationError> {
567        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
568            TimelineFocusKind::Live { .. }
569            | TimelineFocusKind::PinnedEvents { .. }
570            | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
571
572            TimelineFocusKind::Event { paginator, .. } => paginator
573                .paginate_forward(num_events.into())
574                .await
575                .map_err(PaginationError::Paginator)?,
576        };
577
578        // Events are in topological order.
579        // We can append all events with no transformation.
580        self.handle_remote_events_with_diffs(
581            vec![VectorDiff::Append { values: events.into() }],
582            RemoteEventOrigin::Pagination,
583        )
584        .await;
585
586        Ok(hit_end_of_timeline)
587    }
588
589    /// Is this timeline receiving events from sync (aka has a live focus)?
590    pub(super) fn is_live(&self) -> bool {
591        matches!(&*self.focus, TimelineFocusKind::Live { .. })
592    }
593
594    /// Is this timeline focused on a thread?
595    pub(super) fn is_threaded(&self) -> bool {
596        matches!(&*self.focus, TimelineFocusKind::Thread { .. })
597    }
598
599    pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
600        as_variant!(&*self.focus, TimelineFocusKind::Thread { root_event_id } => root_event_id.clone())
601    }
602
603    /// Get a copy of the current items in the list.
604    ///
605    /// Cheap because `im::Vector` is cheap to clone.
606    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
607        self.state.read().await.items.clone_items()
608    }
609
610    #[cfg(test)]
611    pub(super) async fn subscribe_raw(
612        &self,
613    ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
614        self.state.read().await.items.subscribe().into_values_and_stream()
615    }
616
617    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
618        let state = self.state.read().await;
619
620        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
621    }
622
623    pub(super) async fn subscribe_filter_map<U, F>(
624        &self,
625        f: F,
626    ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
627    where
628        U: Clone,
629        F: Fn(Arc<TimelineItem>) -> Option<U>,
630    {
631        self.state.read().await.items.subscribe().filter_map(f)
632    }
633
634    /// Toggle a reaction locally.
635    ///
636    /// Returns true if the reaction was added, false if it was removed.
637    #[instrument(skip_all)]
638    pub(super) async fn toggle_reaction_local(
639        &self,
640        item_id: &TimelineEventItemId,
641        key: &str,
642    ) -> Result<bool, Error> {
643        let mut state = self.state.write().await;
644
645        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
646            warn!("Timeline item not found, can't add reaction");
647            return Err(Error::FailedToToggleReaction);
648        };
649
650        let user_id = self.room_data_provider.own_user_id();
651        let prev_status = item
652            .content()
653            .reactions()
654            .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
655
656        let Some(prev_status) = prev_status else {
657            // Adding the new reaction.
658            match item.handle() {
659                TimelineItemHandle::Local(send_handle) => {
660                    if send_handle
661                        .react(key.to_owned())
662                        .await
663                        .map_err(|err| Error::SendQueueError(err.into()))?
664                        .is_some()
665                    {
666                        trace!("adding a reaction to a local echo");
667                        return Ok(true);
668                    }
669
670                    warn!("couldn't toggle reaction for local echo");
671                    return Ok(false);
672                }
673
674                TimelineItemHandle::Remote(event_id) => {
675                    // Add a reaction through the room data provider.
676                    // No need to reflect the effect locally, since the local echo handling will
677                    // take care of it.
678                    trace!("adding a reaction to a remote echo");
679                    let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
680                    self.room_data_provider
681                        .send(ReactionEventContent::from(annotation).into())
682                        .await?;
683                    return Ok(true);
684                }
685            }
686        };
687
688        trace!("removing a previous reaction");
689        match prev_status {
690            ReactionStatus::LocalToLocal(send_reaction_handle) => {
691                if let Some(handle) = send_reaction_handle {
692                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
693                        // Impossible state: the reaction has moved from local to echo under our
694                        // feet, but the timeline was supposed to be locked!
695                        warn!("unexpectedly unable to abort sending of local reaction");
696                    }
697                } else {
698                    warn!("no send reaction handle (this should only happen in testing contexts)");
699                }
700            }
701
702            ReactionStatus::LocalToRemote(send_handle) => {
703                // No need to reflect the change ourselves, since handling the discard of the
704                // local echo will take care of it.
705                trace!("aborting send of the previous reaction that was a local echo");
706                if let Some(handle) = send_handle {
707                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
708                        // Impossible state: the reaction has moved from local to echo under our
709                        // feet, but the timeline was supposed to be locked!
710                        warn!("unexpectedly unable to abort sending of local reaction");
711                    }
712                } else {
713                    warn!("no send handle (this should only happen in testing contexts)");
714                }
715            }
716
717            ReactionStatus::RemoteToRemote(event_id) => {
718                // Assume the redaction will work; we'll re-add the reaction if it didn't.
719                let Some(annotated_event_id) =
720                    item.as_remote().map(|event_item| event_item.event_id.clone())
721                else {
722                    warn!("remote reaction to remote event, but the associated item isn't remote");
723                    return Ok(false);
724                };
725
726                let mut reactions = item.content().reactions().cloned().unwrap_or_default();
727                let reaction_info = reactions.remove_reaction(user_id, key);
728
729                if reaction_info.is_some() {
730                    let new_item = item.with_reactions(reactions);
731                    state.items.replace(item_pos, new_item);
732                } else {
733                    warn!(
734                        "reaction is missing on the item, not removing it locally, \
735                         but sending redaction."
736                    );
737                }
738
739                // Release the lock before running the request.
740                drop(state);
741
742                trace!("sending redact for a previous reaction");
743                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
744                    if let Some(reaction_info) = reaction_info {
745                        debug!("sending redact failed, adding the reaction back to the list");
746
747                        let mut state = self.state.write().await;
748                        if let Some((item_pos, item)) =
749                            rfind_event_by_id(&state.items, &annotated_event_id)
750                        {
751                            // Re-add the reaction to the mapping.
752                            let mut reactions =
753                                item.content().reactions().cloned().unwrap_or_default();
754                            reactions
755                                .entry(key.to_owned())
756                                .or_default()
757                                .insert(user_id.to_owned(), reaction_info);
758                            let new_item = item.with_reactions(reactions);
759                            state.items.replace(item_pos, new_item);
760                        } else {
761                            warn!(
762                                "couldn't find item to re-add reaction anymore; \
763                                 maybe it's been redacted?"
764                            );
765                        }
766                    }
767
768                    return Err(err);
769                }
770            }
771        }
772
773        Ok(false)
774    }
775
776    /// Handle updates on events as [`VectorDiff`]s.
777    pub(super) async fn handle_remote_events_with_diffs(
778        &self,
779        diffs: Vec<VectorDiff<TimelineEvent>>,
780        origin: RemoteEventOrigin,
781    ) {
782        if diffs.is_empty() {
783            return;
784        }
785
786        let mut state = self.state.write().await;
787        state
788            .handle_remote_events_with_diffs(
789                diffs,
790                origin,
791                &self.room_data_provider,
792                &self.settings,
793            )
794            .await
795    }
796
797    /// Only handle aggregations received as [`VectorDiff`]s.
798    pub(super) async fn handle_remote_aggregations(
799        &self,
800        diffs: Vec<VectorDiff<TimelineEvent>>,
801        origin: RemoteEventOrigin,
802    ) {
803        if diffs.is_empty() {
804            return;
805        }
806
807        let mut state = self.state.write().await;
808        state
809            .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
810            .await
811    }
812
813    pub(super) async fn clear(&self) {
814        self.state.write().await.clear();
815    }
816
817    /// Replaces the content of the current timeline with initial events.
818    ///
819    /// Also sets up read receipts and the read marker for a live timeline of a
820    /// room.
821    ///
822    /// This is all done with a single lock guard, since we don't want the state
823    /// to be modified between the clear and re-insertion of new events.
824    pub(super) async fn replace_with_initial_remote_events<Events>(
825        &self,
826        events: Events,
827        origin: RemoteEventOrigin,
828    ) where
829        Events: IntoIterator + ExactSizeIterator,
830        <Events as IntoIterator>::Item: Into<TimelineEvent>,
831    {
832        let mut state = self.state.write().await;
833
834        let track_read_markers = self.settings.track_read_receipts;
835        if track_read_markers {
836            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
837            state
838                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
839                .await;
840        }
841
842        // Replace the events if either the current event list or the new one aren't
843        // empty.
844        // Previously we just had to check the new one wasn't empty because
845        // we did a clear operation before so the current one would always be empty, but
846        // now we may want to replace a populated timeline with an empty one.
847        if !state.items.is_empty() || events.len() > 0 {
848            state
849                .replace_with_remote_events(
850                    events,
851                    origin,
852                    &self.room_data_provider,
853                    &self.settings,
854                )
855                .await;
856        }
857
858        if track_read_markers
859            && let Some(fully_read_event_id) =
860                self.room_data_provider.load_fully_read_marker().await
861        {
862            state.handle_fully_read_marker(fully_read_event_id);
863        }
864    }
865
866    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
867        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
868    }
869
870    pub(super) async fn handle_ephemeral_events(
871        &self,
872        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
873    ) {
874        let mut state = self.state.write().await;
875        state.handle_ephemeral_events(events, &self.room_data_provider).await;
876    }
877
878    /// Creates the local echo for an event we're sending.
879    #[instrument(skip_all)]
880    pub(super) async fn handle_local_event(
881        &self,
882        txn_id: OwnedTransactionId,
883        content: AnyMessageLikeEventContent,
884        send_handle: Option<SendHandle>,
885    ) {
886        let sender = self.room_data_provider.own_user_id().to_owned();
887        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
888
889        let date_divider_mode = self.settings.date_divider_mode.clone();
890
891        let mut state = self.state.write().await;
892        state
893            .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
894            .await;
895    }
896
897    /// Update the send state of a local event represented by a transaction ID.
898    ///
899    /// If the corresponding local timeline item is missing, a warning is
900    /// raised.
901    #[instrument(skip(self))]
902    pub(super) async fn update_event_send_state(
903        &self,
904        txn_id: &TransactionId,
905        send_state: EventSendState,
906    ) {
907        let mut state = self.state.write().await;
908        let mut txn = state.transaction();
909
910        let new_event_id: Option<&EventId> =
911            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
912
913        // The local echoes are always at the end of the timeline, we must first make
914        // sure the remote echo hasn't showed up yet.
915        if rfind_event_item(&txn.items, |it| {
916            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
917        })
918        .is_some()
919        {
920            // Remote echo already received. This is very unlikely.
921            trace!("Remote echo received before send-event response");
922
923            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
924
925            // If there's both the remote echo and a local echo, that means the
926            // remote echo was received before the response *and* contained no
927            // transaction ID (and thus duplicated the local echo).
928            if let Some((idx, _)) = local_echo {
929                warn!("Message echo got duplicated, removing the local one");
930                txn.items.remove(idx);
931
932                // Adjust the date dividers, if needs be.
933                let mut adjuster =
934                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
935                adjuster.run(&mut txn.items, &mut txn.meta);
936            }
937
938            txn.commit();
939            return;
940        }
941
942        // Look for the local event by the transaction ID or event ID.
943        let result = rfind_event_item(&txn.items, |it| {
944            it.transaction_id() == Some(txn_id)
945                || new_event_id.is_some()
946                    && it.event_id() == new_event_id
947                    && it.as_local().is_some()
948        });
949
950        let Some((idx, item)) = result else {
951            // Event wasn't found as a standalone item.
952            //
953            // If it was just sent, try to find if it matches a corresponding aggregation,
954            // and mark it as sent in that case.
955            if let Some(new_event_id) = new_event_id {
956                if txn.meta.aggregations.mark_aggregation_as_sent(
957                    txn_id.to_owned(),
958                    new_event_id.to_owned(),
959                    &mut txn.items,
960                    &txn.meta.room_version_rules,
961                ) {
962                    trace!("Aggregation marked as sent");
963                    txn.commit();
964                    return;
965                }
966
967                trace!("Sent aggregation was not found");
968            }
969
970            warn!("Timeline item not found, can't update send state");
971            return;
972        };
973
974        let Some(local_item) = item.as_local() else {
975            warn!("We looked for a local item, but it transitioned to remote.");
976            return;
977        };
978
979        // The event was already marked as sent, that's a broken state, let's
980        // emit an error but also override to the given sent state.
981        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
982            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
983        }
984
985        // If the event has just been marked as sent, update the aggregations mapping to
986        // take that into account.
987        if let Some(new_event_id) = new_event_id {
988            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
989        }
990
991        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
992        txn.items.replace(idx, new_item);
993
994        txn.commit();
995    }
996
997    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
998        let mut state = self.state.write().await;
999
1000        if let Some((idx, _)) =
1001            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1002        {
1003            let mut txn = state.transaction();
1004
1005            txn.items.remove(idx);
1006
1007            // A read marker or a date divider may have been inserted before the local echo.
1008            // Ensure both are up to date.
1009            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1010            adjuster.run(&mut txn.items, &mut txn.meta);
1011
1012            txn.meta.update_read_marker(&mut txn.items);
1013
1014            txn.commit();
1015
1016            debug!("discarded local echo");
1017            return true;
1018        }
1019
1020        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
1021        // dereferencing it once.
1022        let mut txn = state.transaction();
1023
1024        // Look if this was a local aggregation.
1025        let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1026            &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1027            &mut txn.items,
1028        ) {
1029            Ok(val) => val,
1030            Err(err) => {
1031                warn!("error when discarding local echo for an aggregation: {err}");
1032                // The aggregation has been found, it's just that we couldn't discard it.
1033                true
1034            }
1035        };
1036
1037        if found_aggregation {
1038            txn.commit();
1039        }
1040
1041        found_aggregation
1042    }
1043
1044    pub(super) async fn replace_local_echo(
1045        &self,
1046        txn_id: &TransactionId,
1047        content: AnyMessageLikeEventContent,
1048    ) -> bool {
1049        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1050            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1051            // handling RoomMessage should be sufficient in most cases. Worst
1052            // case, the local echo will be sent Soonâ„¢ and we'll get another chance at
1053            // editing the event then.
1054            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1055            return false;
1056        };
1057
1058        let mut state = self.state.write().await;
1059        let mut txn = state.transaction();
1060
1061        let Some((idx, prev_item)) =
1062            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1063        else {
1064            debug!("Can't find local echo to replace");
1065            return false;
1066        };
1067
1068        // Reuse the previous local echo's state, but reset the send state to not sent
1069        // (per API contract).
1070        let ti_kind = {
1071            let Some(prev_local_item) = prev_item.as_local() else {
1072                warn!("We looked for a local item, but it transitioned as remote??");
1073                return false;
1074            };
1075            // If the local echo had an upload progress, retain it.
1076            let progress = as_variant!(&prev_local_item.send_state,
1077                EventSendState::NotSentYet { progress } => progress.clone())
1078            .flatten();
1079            prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1080        };
1081
1082        // Replace the local-related state (kind) and the content state.
1083        let new_item = TimelineItem::new(
1084            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1085                content.msgtype,
1086                content.mentions,
1087                prev_item.content().reactions().cloned().unwrap_or_default(),
1088                prev_item.content().thread_root(),
1089                prev_item.content().in_reply_to(),
1090                prev_item.content().thread_summary(),
1091            )),
1092            prev_item.internal_id.to_owned(),
1093        );
1094
1095        txn.items.replace(idx, new_item);
1096
1097        // This doesn't change the original sending time, so there's no need to adjust
1098        // date dividers.
1099
1100        txn.commit();
1101
1102        debug!("Replaced local echo");
1103        true
1104    }
1105
1106    pub(crate) async fn retry_event_decryption_inner(&self, session_ids: Option<BTreeSet<String>>) {
1107        self.decryption_retry_task
1108            .decrypt(self.room_data_provider.clone(), session_ids, self.settings.clone())
1109            .await;
1110    }
1111
1112    pub(super) async fn set_sender_profiles_pending(&self) {
1113        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1114    }
1115
1116    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1117        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1118    }
1119
1120    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1121        self.state.write().await.items.for_each(|mut entry| {
1122            let Some(event_item) = entry.as_event() else { return };
1123            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1124                let new_item = entry.with_kind(TimelineItemKind::Event(
1125                    event_item.with_sender_profile(profile_state.clone()),
1126                ));
1127                ObservableItemsEntry::replace(&mut entry, new_item);
1128            }
1129        });
1130    }
1131
1132    pub(super) async fn update_missing_sender_profiles(&self) {
1133        trace!("Updating missing sender profiles");
1134
1135        let mut state = self.state.write().await;
1136        let mut entries = state.items.entries();
1137        while let Some(mut entry) = entries.next() {
1138            let Some(event_item) = entry.as_event() else { continue };
1139            let event_id = event_item.event_id().map(debug);
1140            let transaction_id = event_item.transaction_id().map(debug);
1141
1142            if event_item.sender_profile().is_ready() {
1143                trace!(event_id, transaction_id, "Profile already set");
1144                continue;
1145            }
1146
1147            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1148                Some(profile) => {
1149                    trace!(event_id, transaction_id, "Adding profile");
1150                    let updated_item =
1151                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1152                    let new_item = entry.with_kind(updated_item);
1153                    ObservableItemsEntry::replace(&mut entry, new_item);
1154                }
1155                None => {
1156                    if !event_item.sender_profile().is_unavailable() {
1157                        trace!(event_id, transaction_id, "Marking profile unavailable");
1158                        let updated_item =
1159                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1160                        let new_item = entry.with_kind(updated_item);
1161                        ObservableItemsEntry::replace(&mut entry, new_item);
1162                    } else {
1163                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1164                    }
1165                }
1166            }
1167        }
1168
1169        trace!("Done updating missing sender profiles");
1170    }
1171
1172    /// Update the profiles of the given senders, even if they are ready.
1173    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1174        trace!("Forcing update of sender profiles: {sender_ids:?}");
1175
1176        let mut state = self.state.write().await;
1177        let mut entries = state.items.entries();
1178        while let Some(mut entry) = entries.next() {
1179            let Some(event_item) = entry.as_event() else { continue };
1180            if !sender_ids.contains(event_item.sender()) {
1181                continue;
1182            }
1183
1184            let event_id = event_item.event_id().map(debug);
1185            let transaction_id = event_item.transaction_id().map(debug);
1186
1187            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1188                Some(profile) => {
1189                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1190                    {
1191                        debug!(event_id, transaction_id, "Profile already up-to-date");
1192                    } else {
1193                        trace!(event_id, transaction_id, "Updating profile");
1194                        let updated_item =
1195                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1196                        let new_item = entry.with_kind(updated_item);
1197                        ObservableItemsEntry::replace(&mut entry, new_item);
1198                    }
1199                }
1200                None => {
1201                    if !event_item.sender_profile().is_unavailable() {
1202                        trace!(event_id, transaction_id, "Marking profile unavailable");
1203                        let updated_item =
1204                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1205                        let new_item = entry.with_kind(updated_item);
1206                        ObservableItemsEntry::replace(&mut entry, new_item);
1207                    } else {
1208                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1209                    }
1210                }
1211            }
1212        }
1213
1214        trace!("Done forcing update of sender profiles");
1215    }
1216
1217    #[cfg(test)]
1218    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1219        let own_user_id = self.room_data_provider.own_user_id();
1220        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1221    }
1222
1223    /// Get the latest read receipt for the given user.
1224    ///
1225    /// Useful to get the latest read receipt, whether it's private or public.
1226    pub(super) async fn latest_user_read_receipt(
1227        &self,
1228        user_id: &UserId,
1229    ) -> Option<(OwnedEventId, Receipt)> {
1230        let receipt_thread = self.focus.receipt_thread();
1231
1232        self.state
1233            .read()
1234            .await
1235            .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1236            .await
1237    }
1238
1239    /// Get the ID of the timeline event with the latest read receipt for the
1240    /// given user.
1241    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1242        &self,
1243        user_id: &UserId,
1244    ) -> Option<OwnedEventId> {
1245        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1246    }
1247
1248    /// Subscribe to changes in the read receipts of our own user.
1249    pub async fn subscribe_own_user_read_receipts_changed(
1250        &self,
1251    ) -> impl Stream<Item = ()> + use<P> {
1252        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1253    }
1254
1255    /// Handle a room send update that's a new local echo.
1256    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1257        match echo.content {
1258            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1259                let content = match serialized_event.deserialize() {
1260                    Ok(d) => d,
1261                    Err(err) => {
1262                        warn!("error deserializing local echo: {err}");
1263                        return;
1264                    }
1265                };
1266
1267                self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1268                    .await;
1269
1270                if let Some(send_error) = send_error {
1271                    self.update_event_send_state(
1272                        &echo.transaction_id,
1273                        EventSendState::SendingFailed {
1274                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1275                                send_error,
1276                            ))),
1277                            is_recoverable: false,
1278                        },
1279                    )
1280                    .await;
1281                }
1282            }
1283
1284            LocalEchoContent::React { key, send_handle, applies_to } => {
1285                self.handle_local_reaction(key, send_handle, applies_to).await;
1286            }
1287        }
1288    }
1289
1290    /// Adds a reaction (local echo) to a local echo.
1291    #[instrument(skip(self, send_handle))]
1292    async fn handle_local_reaction(
1293        &self,
1294        reaction_key: String,
1295        send_handle: SendReactionHandle,
1296        applies_to: OwnedTransactionId,
1297    ) {
1298        let mut state = self.state.write().await;
1299        let mut tr = state.transaction();
1300
1301        let target = TimelineEventItemId::TransactionId(applies_to);
1302
1303        let reaction_txn_id = send_handle.transaction_id().to_owned();
1304        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1305        let aggregation = Aggregation::new(
1306            TimelineEventItemId::TransactionId(reaction_txn_id),
1307            AggregationKind::Reaction {
1308                key: reaction_key.clone(),
1309                sender: self.room_data_provider.own_user_id().to_owned(),
1310                timestamp: MilliSecondsSinceUnixEpoch::now(),
1311                reaction_status,
1312            },
1313        );
1314
1315        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1316        find_item_and_apply_aggregation(
1317            &tr.meta.aggregations,
1318            &mut tr.items,
1319            &target,
1320            aggregation,
1321            &tr.meta.room_version_rules,
1322        );
1323
1324        tr.commit();
1325    }
1326
1327    /// Handle a single room send queue update.
1328    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1329        match update {
1330            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1331                self.handle_local_echo(echo).await;
1332            }
1333
1334            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1335                if !self.discard_local_echo(&transaction_id).await {
1336                    warn!("couldn't find the local echo to discard");
1337                }
1338            }
1339
1340            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1341                let content = match new_content.deserialize() {
1342                    Ok(d) => d,
1343                    Err(err) => {
1344                        warn!("error deserializing local echo (upon edit): {err}");
1345                        return;
1346                    }
1347                };
1348
1349                if !self.replace_local_echo(&transaction_id, content).await {
1350                    warn!("couldn't find the local echo to replace");
1351                }
1352            }
1353
1354            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1355                self.update_event_send_state(
1356                    &transaction_id,
1357                    EventSendState::SendingFailed { error, is_recoverable },
1358                )
1359                .await;
1360            }
1361
1362            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1363                self.update_event_send_state(
1364                    &transaction_id,
1365                    EventSendState::NotSentYet { progress: None },
1366                )
1367                .await;
1368            }
1369
1370            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1371                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1372                    .await;
1373            }
1374
1375            RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1376                self.update_event_send_state(
1377                    &related_to,
1378                    EventSendState::NotSentYet {
1379                        progress: Some(MediaUploadProgress { index, progress }),
1380                    },
1381                )
1382                .await;
1383            }
1384        }
1385    }
1386
1387    /// Insert a timeline start item at the beginning of the room, if it's
1388    /// missing.
1389    pub async fn insert_timeline_start_if_missing(&self) {
1390        let mut state = self.state.write().await;
1391        let mut txn = state.transaction();
1392        txn.items.push_timeline_start_if_missing(
1393            txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1394        );
1395        txn.commit();
1396    }
1397
1398    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
1399    /// timeline or not.
1400    ///
1401    /// Can be `None` if the event cannot be represented as a standalone item,
1402    /// because it's an aggregation.
1403    pub(super) async fn make_replied_to(
1404        &self,
1405        event: TimelineEvent,
1406    ) -> Result<Option<EmbeddedEvent>, Error> {
1407        let state = self.state.read().await;
1408        EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1409    }
1410}
1411
1412impl TimelineController {
1413    pub(super) fn room(&self) -> &Room {
1414        &self.room_data_provider
1415    }
1416
1417    /// Given an event identifier, will fetch the details for the event it's
1418    /// replying to, if applicable.
1419    #[instrument(skip(self))]
1420    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1421        let state_guard = self.state.write().await;
1422        let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1423            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1424        let remote_item = item
1425            .as_remote()
1426            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1427            .clone();
1428
1429        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1430            debug!("Event is not a message");
1431            return Ok(());
1432        };
1433        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1434            debug!("Event is not a reply");
1435            return Ok(());
1436        };
1437        if let TimelineDetails::Pending = &in_reply_to.event {
1438            debug!("Replied-to event is already being fetched");
1439            return Ok(());
1440        }
1441        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1442            debug!("Replied-to event has already been fetched");
1443            return Ok(());
1444        }
1445
1446        let internal_id = item.internal_id.to_owned();
1447        let item = item.clone();
1448        let event = fetch_replied_to_event(
1449            state_guard,
1450            &self.state,
1451            index,
1452            &item,
1453            internal_id,
1454            &msglike,
1455            &in_reply_to.event_id,
1456            self.room(),
1457        )
1458        .await?;
1459
1460        // We need to be sure to have the latest position of the event as it might have
1461        // changed while waiting for the request.
1462        let mut state = self.state.write().await;
1463        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1464            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1465
1466        // Check the state of the event again, it might have been redacted while
1467        // the request was in-flight.
1468        let TimelineItemContent::MsgLike(MsgLikeContent {
1469            kind: MsgLikeKind::Message(message),
1470            reactions,
1471            thread_root,
1472            in_reply_to,
1473            thread_summary,
1474        }) = item.content().clone()
1475        else {
1476            info!("Event is no longer a message (redacted?)");
1477            return Ok(());
1478        };
1479        let Some(in_reply_to) = in_reply_to else {
1480            warn!("Event no longer has a reply (bug?)");
1481            return Ok(());
1482        };
1483
1484        // Now that we've received the content of the replied-to event, replace the
1485        // replied-to content in the item with it.
1486        trace!("Updating in-reply-to details");
1487        let internal_id = item.internal_id.to_owned();
1488        let mut item = item.clone();
1489        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1490            kind: MsgLikeKind::Message(message),
1491            reactions,
1492            thread_root,
1493            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1494            thread_summary,
1495        }));
1496        state.items.replace(index, TimelineItem::new(item, internal_id));
1497
1498        Ok(())
1499    }
1500
1501    /// Returns the thread that should be used for a read receipt based on the
1502    /// current focus of the timeline and the receipt type.
1503    ///
1504    /// A `SendReceiptType::FullyRead` will always use
1505    /// `ReceiptThread::Unthreaded`
1506    pub(super) fn infer_thread_for_read_receipt(
1507        &self,
1508        receipt_type: &SendReceiptType,
1509    ) -> ReceiptThread {
1510        if matches!(receipt_type, SendReceiptType::FullyRead) {
1511            ReceiptThread::Unthreaded
1512        } else {
1513            self.focus.receipt_thread()
1514        }
1515    }
1516
1517    /// Check whether the given receipt should be sent.
1518    ///
1519    /// Returns `false` if the given receipt is older than the current one.
1520    pub(super) async fn should_send_receipt(
1521        &self,
1522        receipt_type: &SendReceiptType,
1523        receipt_thread: &ReceiptThread,
1524        event_id: &EventId,
1525    ) -> bool {
1526        let own_user_id = self.room().own_user_id();
1527        let state = self.state.read().await;
1528        let room = self.room();
1529
1530        match receipt_type {
1531            SendReceiptType::Read => {
1532                if let Some((old_pub_read, _)) = state
1533                    .meta
1534                    .user_receipt(
1535                        own_user_id,
1536                        ReceiptType::Read,
1537                        receipt_thread.clone(),
1538                        room,
1539                        state.items.all_remote_events(),
1540                    )
1541                    .await
1542                {
1543                    trace!(%old_pub_read, "found a previous public receipt");
1544                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1545                        &old_pub_read,
1546                        event_id,
1547                        state.items.all_remote_events(),
1548                    ) {
1549                        trace!(
1550                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1551                        );
1552                        return relative_pos == RelativePosition::After;
1553                    }
1554                }
1555            }
1556
1557            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1558            // doesn't make sense to have a private read receipt behind a public one.
1559            SendReceiptType::ReadPrivate => {
1560                if let Some((old_priv_read, _)) =
1561                    state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1562                {
1563                    trace!(%old_priv_read, "found a previous private receipt");
1564                    if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1565                        &old_priv_read,
1566                        event_id,
1567                        state.items.all_remote_events(),
1568                    ) {
1569                        trace!(
1570                            "event referred to new receipt is {relative_pos:?} the previous receipt"
1571                        );
1572                        return relative_pos == RelativePosition::After;
1573                    }
1574                }
1575            }
1576
1577            SendReceiptType::FullyRead => {
1578                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1579                    && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1580                        &prev_event_id,
1581                        event_id,
1582                        state.items.all_remote_events(),
1583                    )
1584                {
1585                    return relative_pos == RelativePosition::After;
1586                }
1587            }
1588
1589            _ => {}
1590        }
1591
1592        // Let the server handle unknown receipts.
1593        true
1594    }
1595
1596    /// Returns the latest event identifier, even if it's not visible, or if
1597    /// it's folded into another timeline item.
1598    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1599        let state = self.state.read().await;
1600        state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1601    }
1602
1603    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1604    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1605        self.retry_event_decryption_inner(session_ids).await
1606    }
1607
1608    /// Combine the global (event cache) pagination status with the local state
1609    /// of the timeline.
1610    ///
1611    /// This only changes the global pagination status of this room, in one
1612    /// case: if the timeline has a skip count greater than 0, it will
1613    /// ensure that the pagination status says that we haven't reached the
1614    /// timeline start yet.
1615    pub(super) async fn map_pagination_status(
1616        &self,
1617        status: RoomPaginationStatus,
1618    ) -> RoomPaginationStatus {
1619        match status {
1620            RoomPaginationStatus::Idle { hit_timeline_start } => {
1621                if hit_timeline_start {
1622                    let state = self.state.read().await;
1623                    // If the skip count is greater than 0, it means that a subsequent pagination
1624                    // could return more items, so pretend we didn't get the information that the
1625                    // timeline start was hit.
1626                    if state.meta.subscriber_skip_count.get() > 0 {
1627                        return RoomPaginationStatus::Idle { hit_timeline_start: false };
1628                    }
1629                }
1630            }
1631            RoomPaginationStatus::Paginating => {}
1632        }
1633
1634        // You're perfect, just the way you are.
1635        status
1636    }
1637}
1638
1639#[allow(clippy::too_many_arguments)]
1640async fn fetch_replied_to_event<P: RoomDataProvider>(
1641    mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1642    state_lock: &RwLock<TimelineState<P>>,
1643    index: usize,
1644    item: &EventTimelineItem,
1645    internal_id: TimelineUniqueId,
1646    msglike: &MsgLikeContent,
1647    in_reply_to: &EventId,
1648    room: &Room,
1649) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1650    if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1651        let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1652        trace!("Found replied-to event locally");
1653        return Ok(details);
1654    }
1655
1656    // Replace the item with a new timeline item that has the fetching status of the
1657    // replied-to event to pending.
1658    trace!("Setting in-reply-to details to pending");
1659    let in_reply_to_details =
1660        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1661
1662    let event_item = item
1663        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1664
1665    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1666    state_guard.items.replace(index, new_timeline_item);
1667
1668    // Don't hold the state lock while the network request is made.
1669    drop(state_guard);
1670
1671    trace!("Fetching replied-to event");
1672    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1673        Ok(timeline_event) => {
1674            let state = state_lock.read().await;
1675
1676            let replied_to_item =
1677                EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1678
1679            if let Some(item) = replied_to_item {
1680                TimelineDetails::Ready(Box::new(item))
1681            } else {
1682                // The replied-to item is an aggregation, not a standalone item.
1683                return Err(Error::UnsupportedEvent);
1684            }
1685        }
1686
1687        Err(e) => TimelineDetails::Error(Arc::new(e)),
1688    };
1689
1690    Ok(res)
1691}