matrix_sdk_ui/timeline/controller/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::VectorDiff;
20use eyeball_im_util::vector::VectorObserverExt;
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
25use matrix_sdk::{
26    deserialized_responses::TimelineEvent,
27    event_cache::{
28        paginator::{PaginationResult, Paginator},
29        RoomEventCache,
30    },
31    send_queue::{
32        LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
33    },
34    Result, Room,
35};
36use ruma::{
37    api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38    events::{
39        poll::unstable_start::UnstablePollStartEventContent,
40        reaction::ReactionEventContent,
41        receipt::{Receipt, ReceiptThread, ReceiptType},
42        relation::Annotation,
43        room::message::{MessageType, Relation},
44        AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
45        AnySyncTimelineEvent, MessageLikeEventType,
46    },
47    serde::Raw,
48    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
49    TransactionId, UserId,
50};
51#[cfg(test)]
52use ruma::{events::receipt::ReceiptEventContent, OwnedRoomId, RoomId};
53use tokio::sync::{RwLock, RwLockWriteGuard};
54use tracing::{debug, error, field::debug, info, instrument, trace, warn};
55
56pub(super) use self::{
57    metadata::{RelativePosition, TimelineMetadata},
58    observable_items::{
59        AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
60        ObservableItemsTransactionEntry,
61    },
62    state::{FullEventMeta, PendingEdit, PendingEditKind, TimelineState},
63    state_transaction::TimelineStateTransaction,
64};
65use super::{
66    algorithms::{rfind_event_by_id, rfind_event_item},
67    event_handler::TimelineEventKind,
68    event_item::{ReactionStatus, RemoteEventOrigin},
69    item::TimelineUniqueId,
70    subscriber::TimelineSubscriber,
71    traits::{Decryptor, RoomDataProvider},
72    DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, PaginationError,
73    Profile, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
74    TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
75};
76use crate::{
77    timeline::{
78        algorithms::rfind_event_by_item_id,
79        date_dividers::DateDividerAdjuster,
80        event_item::EventTimelineItemKind,
81        pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
82        MsgLikeContent, MsgLikeKind, TimelineEventFilterFn,
83    },
84    unable_to_decrypt_hook::UtdHookManager,
85};
86
87mod aggregations;
88mod decryption_retry_task;
89mod metadata;
90mod observable_items;
91mod read_receipts;
92mod state;
93mod state_transaction;
94
95pub(super) use aggregations::*;
96
97/// Data associated to the current timeline focus.
98#[derive(Debug)]
99enum TimelineFocusData<P: RoomDataProvider> {
100    /// The timeline receives live events from the sync.
101    Live,
102
103    /// The timeline is focused on a single event, and it can expand in one
104    /// direction or another.
105    Event {
106        /// The event id we've started to focus on.
107        event_id: OwnedEventId,
108        /// The paginator instance.
109        paginator: Paginator<P>,
110        /// Number of context events to request for the first request.
111        num_context_events: u16,
112    },
113
114    PinnedEvents {
115        loader: PinnedEventsLoader,
116    },
117}
118
119#[derive(Clone, Debug)]
120pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
121    /// Inner mutable state.
122    state: Arc<RwLock<TimelineState>>,
123
124    /// Inner mutable focus state.
125    focus: Arc<RwLock<TimelineFocusData<P>>>,
126
127    /// A [`RoomDataProvider`] implementation, providing data.
128    ///
129    /// Useful for testing only; in the real world, it's just a [`Room`].
130    pub(crate) room_data_provider: P,
131
132    /// Settings applied to this timeline.
133    pub(super) settings: TimelineSettings,
134
135    /// Long-running task used to retry decryption of timeline items without
136    /// blocking main processing.
137    decryption_retry_task: DecryptionRetryTask<D>,
138}
139
140#[derive(Clone)]
141pub(super) struct TimelineSettings {
142    /// Should the read receipts and read markers be handled?
143    pub(super) track_read_receipts: bool,
144
145    /// Event filter that controls what's rendered as a timeline item (and thus
146    /// what can carry read receipts).
147    pub(super) event_filter: Arc<TimelineEventFilterFn>,
148
149    /// Are unparsable events added as timeline items of their own kind?
150    pub(super) add_failed_to_parse: bool,
151
152    /// Should the timeline items be grouped by day or month?
153    pub(super) date_divider_mode: DateDividerMode,
154}
155
156#[cfg(not(tarpaulin_include))]
157impl fmt::Debug for TimelineSettings {
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159        f.debug_struct("TimelineSettings")
160            .field("track_read_receipts", &self.track_read_receipts)
161            .field("add_failed_to_parse", &self.add_failed_to_parse)
162            .finish_non_exhaustive()
163    }
164}
165
166impl Default for TimelineSettings {
167    fn default() -> Self {
168        Self {
169            track_read_receipts: false,
170            event_filter: Arc::new(default_event_filter),
171            add_failed_to_parse: true,
172            date_divider_mode: DateDividerMode::Daily,
173        }
174    }
175}
176
177#[derive(Debug, Clone, Copy)]
178pub(super) enum TimelineFocusKind {
179    Live,
180    Event,
181    PinnedEvents,
182}
183
184/// The default event filter for
185/// [`crate::timeline::TimelineBuilder::event_filter`].
186///
187/// It filters out events that are not rendered by the timeline, including but
188/// not limited to: reactions, edits, redactions on existing messages.
189///
190/// If you have a custom filter, it may be best to chain yours with this one if
191/// you do not want to run into situations where a read receipt is not visible
192/// because it's living on an event that doesn't have a matching timeline item.
193pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
194    match event {
195        AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
196            if ev.redacts(room_version).is_some() {
197                // This is a redaction of an existing message, we'll only update the previous
198                // message and not render a new entry.
199                false
200            } else {
201                // This is a redacted entry, that we'll show only if the redacted entity wasn't
202                // a reaction.
203                ev.event_type() != MessageLikeEventType::Reaction
204            }
205        }
206
207        AnySyncTimelineEvent::MessageLike(msg) => {
208            match msg.original_content() {
209                None => {
210                    // This is a redacted entry, that we'll show only if the redacted entity wasn't
211                    // a reaction.
212                    msg.event_type() != MessageLikeEventType::Reaction
213                }
214
215                Some(original_content) => {
216                    match original_content {
217                        AnyMessageLikeEventContent::RoomMessage(content) => {
218                            if content
219                                .relates_to
220                                .as_ref()
221                                .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
222                            {
223                                // Edits aren't visible by default.
224                                return false;
225                            }
226
227                            matches!(
228                                content.msgtype,
229                                MessageType::Audio(_)
230                                    | MessageType::Emote(_)
231                                    | MessageType::File(_)
232                                    | MessageType::Image(_)
233                                    | MessageType::Location(_)
234                                    | MessageType::Notice(_)
235                                    | MessageType::ServerNotice(_)
236                                    | MessageType::Text(_)
237                                    | MessageType::Video(_)
238                                    | MessageType::VerificationRequest(_)
239                            )
240                        }
241
242                        AnyMessageLikeEventContent::Sticker(_)
243                        | AnyMessageLikeEventContent::UnstablePollStart(
244                            UnstablePollStartEventContent::New(_),
245                        )
246                        | AnyMessageLikeEventContent::CallInvite(_)
247                        | AnyMessageLikeEventContent::CallNotify(_)
248                        | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
249
250                        _ => false,
251                    }
252                }
253            }
254        }
255
256        AnySyncTimelineEvent::State(_) => {
257            // All the state events may get displayed by default.
258            true
259        }
260    }
261}
262
263impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
264    pub(super) fn new(
265        room_data_provider: P,
266        focus: TimelineFocus,
267        internal_id_prefix: Option<String>,
268        unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
269        is_room_encrypted: bool,
270    ) -> Self {
271        let (focus_data, focus_kind) = match focus {
272            TimelineFocus::Live => (TimelineFocusData::Live, TimelineFocusKind::Live),
273
274            TimelineFocus::Event { target, num_context_events } => {
275                let paginator = Paginator::new(room_data_provider.clone());
276                (
277                    TimelineFocusData::Event { paginator, event_id: target, num_context_events },
278                    TimelineFocusKind::Event,
279                )
280            }
281
282            TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
283                TimelineFocusData::PinnedEvents {
284                    loader: PinnedEventsLoader::new(
285                        Arc::new(room_data_provider.clone()),
286                        max_events_to_load as usize,
287                        max_concurrent_requests as usize,
288                    ),
289                },
290                TimelineFocusKind::PinnedEvents,
291            ),
292        };
293
294        let state = Arc::new(RwLock::new(TimelineState::new(
295            focus_kind,
296            room_data_provider.own_user_id().to_owned(),
297            room_data_provider.room_version(),
298            internal_id_prefix,
299            unable_to_decrypt_hook,
300            is_room_encrypted,
301        )));
302
303        let settings = TimelineSettings::default();
304
305        let decryption_retry_task =
306            DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
307
308        Self {
309            state,
310            focus: Arc::new(RwLock::new(focus_data)),
311            room_data_provider,
312            settings,
313            decryption_retry_task,
314        }
315    }
316
317    /// Initializes the configured focus with appropriate data.
318    ///
319    /// Should be called only once after creation of the [`TimelineInner`], with
320    /// all its fields set.
321    ///
322    /// Returns whether there were any events added to the timeline.
323    pub(super) async fn init_focus(
324        &self,
325        room_event_cache: &RoomEventCache,
326    ) -> Result<bool, Error> {
327        let focus_guard = self.focus.read().await;
328
329        match &*focus_guard {
330            TimelineFocusData::Live => {
331                // Retrieve the cached events, and add them to the timeline.
332                let (events, _stream) = room_event_cache.subscribe().await;
333
334                let has_events = !events.is_empty();
335
336                self.replace_with_initial_remote_events(
337                    events.into_iter(),
338                    RemoteEventOrigin::Cache,
339                )
340                .await;
341
342                Ok(has_events)
343            }
344
345            TimelineFocusData::Event { event_id, paginator, num_context_events } => {
346                // Start a /context request, and append the results (in order) to the timeline.
347                let start_from_result = paginator
348                    .start_from(event_id, (*num_context_events).into())
349                    .await
350                    .map_err(PaginationError::Paginator)?;
351
352                drop(focus_guard);
353
354                let has_events = !start_from_result.events.is_empty();
355
356                self.replace_with_initial_remote_events(
357                    start_from_result.events.into_iter(),
358                    RemoteEventOrigin::Pagination,
359                )
360                .await;
361
362                Ok(has_events)
363            }
364
365            TimelineFocusData::PinnedEvents { loader } => {
366                let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
367
368                drop(focus_guard);
369
370                let has_events = !loaded_events.is_empty();
371
372                self.replace_with_initial_remote_events(
373                    loaded_events.into_iter(),
374                    RemoteEventOrigin::Pagination,
375                )
376                .await;
377
378                Ok(has_events)
379            }
380        }
381    }
382
383    /// Listens to encryption state changes for the room in
384    /// [`matrix_sdk_base::RoomInfo`] and applies the new value to the
385    /// existing timeline items. This will then cause a refresh of those
386    /// timeline items.
387    pub async fn handle_encryption_state_changes(&self) {
388        let mut room_info = self.room_data_provider.room_info();
389
390        // Small function helper to help mark as encrypted.
391        let mark_encrypted = || async {
392            let mut state = self.state.write().await;
393            state.meta.is_room_encrypted = true;
394            state.mark_all_events_as_encrypted();
395        };
396
397        if room_info.get().encryption_state().is_encrypted() {
398            // If the room was already encrypted, it won't toggle to unencrypted, so we can
399            // shut down this task early.
400            mark_encrypted().await;
401            return;
402        }
403
404        while let Some(info) = room_info.next().await {
405            if info.encryption_state().is_encrypted() {
406                mark_encrypted().await;
407                // Once the room is encrypted, it cannot switch back to unencrypted, so our work
408                // here is done.
409                break;
410            }
411        }
412    }
413
414    pub(crate) async fn reload_pinned_events(
415        &self,
416    ) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
417        let focus_guard = self.focus.read().await;
418
419        if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
420            loader.load_events().await
421        } else {
422            Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
423        }
424    }
425
426    /// Run a lazy backwards pagination (in live mode).
427    ///
428    /// It adjusts the `count` value of the `Skip` higher-order stream so that
429    /// more items are pushed front in the timeline.
430    ///
431    /// If no more items are available (i.e. if the `count` is zero), this
432    /// method returns `Some(needs)` where `needs` is the number of events that
433    /// must be unlazily backwards paginated.
434    pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
435        let state = self.state.read().await;
436
437        let (count, needs) = state
438            .meta
439            .subscriber_skip_count
440            .compute_next_when_paginating_backwards(num_events.into());
441
442        state.meta.subscriber_skip_count.update(count, &state.timeline_focus);
443
444        needs
445    }
446
447    /// Run a backwards pagination (in focused mode) and append the results to
448    /// the timeline.
449    ///
450    /// Returns whether we hit the start of the timeline.
451    pub(super) async fn focused_paginate_backwards(
452        &self,
453        num_events: u16,
454    ) -> Result<bool, PaginationError> {
455        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
456            TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
457                return Err(PaginationError::NotEventFocusMode)
458            }
459            TimelineFocusData::Event { paginator, .. } => paginator
460                .paginate_backward(num_events.into())
461                .await
462                .map_err(PaginationError::Paginator)?,
463        };
464
465        // Events are in reverse topological order.
466        // We can push front each event individually.
467        self.handle_remote_events_with_diffs(
468            events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
469            RemoteEventOrigin::Pagination,
470        )
471        .await;
472
473        Ok(hit_end_of_timeline)
474    }
475
476    /// Run a forwards pagination (in focused mode) and append the results to
477    /// the timeline.
478    ///
479    /// Returns whether we hit the end of the timeline.
480    pub(super) async fn focused_paginate_forwards(
481        &self,
482        num_events: u16,
483    ) -> Result<bool, PaginationError> {
484        let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
485            TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
486                return Err(PaginationError::NotEventFocusMode)
487            }
488            TimelineFocusData::Event { paginator, .. } => paginator
489                .paginate_forward(num_events.into())
490                .await
491                .map_err(PaginationError::Paginator)?,
492        };
493
494        // Events are in topological order.
495        // We can append all events with no transformation.
496        self.handle_remote_events_with_diffs(
497            vec![VectorDiff::Append { values: events.into() }],
498            RemoteEventOrigin::Pagination,
499        )
500        .await;
501
502        Ok(hit_end_of_timeline)
503    }
504
505    /// Is this timeline receiving events from sync (aka has a live focus)?
506    pub(super) async fn is_live(&self) -> bool {
507        matches!(&*self.focus.read().await, TimelineFocusData::Live)
508    }
509
510    pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
511        self.settings = settings;
512        self
513    }
514
515    /// Get a copy of the current items in the list.
516    ///
517    /// Cheap because `im::Vector` is cheap to clone.
518    pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
519        self.state.read().await.items.clone_items()
520    }
521
522    #[cfg(test)]
523    pub(super) async fn subscribe_raw(
524        &self,
525    ) -> (
526        Vector<Arc<TimelineItem>>,
527        impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
528    ) {
529        let state = self.state.read().await;
530
531        state.items.subscribe().into_values_and_stream()
532    }
533
534    pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
535        let state = self.state.read().await;
536
537        TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
538    }
539
540    pub(super) async fn subscribe_filter_map<U, F>(
541        &self,
542        f: F,
543    ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
544    where
545        U: Clone,
546        F: Fn(Arc<TimelineItem>) -> Option<U>,
547    {
548        self.state.read().await.items.subscribe().filter_map(f)
549    }
550
551    /// Toggle a reaction locally.
552    ///
553    /// Returns true if the reaction was added, false if it was removed.
554    #[instrument(skip_all)]
555    pub(super) async fn toggle_reaction_local(
556        &self,
557        item_id: &TimelineEventItemId,
558        key: &str,
559    ) -> Result<bool, Error> {
560        let mut state = self.state.write().await;
561
562        let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
563            warn!("Timeline item not found, can't add reaction");
564            return Err(Error::FailedToToggleReaction);
565        };
566
567        let user_id = self.room_data_provider.own_user_id();
568        let prev_status = item
569            .content()
570            .reactions()
571            .get(key)
572            .and_then(|group| group.get(user_id))
573            .map(|reaction_info| reaction_info.status.clone());
574
575        let Some(prev_status) = prev_status else {
576            match &item.kind {
577                EventTimelineItemKind::Local(local) => {
578                    if let Some(send_handle) = &local.send_handle {
579                        if send_handle
580                            .react(key.to_owned())
581                            .await
582                            .map_err(|err| Error::SendQueueError(err.into()))?
583                            .is_some()
584                        {
585                            trace!("adding a reaction to a local echo");
586                            return Ok(true);
587                        }
588
589                        warn!("couldn't toggle reaction for local echo");
590                        return Ok(false);
591                    }
592
593                    warn!("missing send handle for local echo; is this a test?");
594                    return Ok(false);
595                }
596
597                EventTimelineItemKind::Remote(remote) => {
598                    // Add a reaction through the room data provider.
599                    // No need to reflect the effect locally, since the local echo handling will
600                    // take care of it.
601                    trace!("adding a reaction to a remote echo");
602                    let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
603                    self.room_data_provider
604                        .send(ReactionEventContent::from(annotation).into())
605                        .await?;
606                    return Ok(true);
607                }
608            }
609        };
610
611        trace!("removing a previous reaction");
612        match prev_status {
613            ReactionStatus::LocalToLocal(send_reaction_handle) => {
614                if let Some(handle) = send_reaction_handle {
615                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
616                        // Impossible state: the reaction has moved from local to echo under our
617                        // feet, but the timeline was supposed to be locked!
618                        warn!("unexpectedly unable to abort sending of local reaction");
619                    }
620                } else {
621                    warn!("no send reaction handle (this should only happen in testing contexts)");
622                }
623            }
624
625            ReactionStatus::LocalToRemote(send_handle) => {
626                // No need to reflect the change ourselves, since handling the discard of the
627                // local echo will take care of it.
628                trace!("aborting send of the previous reaction that was a local echo");
629                if let Some(handle) = send_handle {
630                    if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
631                        // Impossible state: the reaction has moved from local to echo under our
632                        // feet, but the timeline was supposed to be locked!
633                        warn!("unexpectedly unable to abort sending of local reaction");
634                    }
635                } else {
636                    warn!("no send handle (this should only happen in testing contexts)");
637                }
638            }
639
640            ReactionStatus::RemoteToRemote(event_id) => {
641                // Assume the redaction will work; we'll re-add the reaction if it didn't.
642                let Some(annotated_event_id) =
643                    item.as_remote().map(|event_item| event_item.event_id.clone())
644                else {
645                    warn!("remote reaction to remote event, but the associated item isn't remote");
646                    return Ok(false);
647                };
648
649                let mut reactions = item.content().reactions().clone();
650                let reaction_info = reactions.remove_reaction(user_id, key);
651
652                if reaction_info.is_some() {
653                    let new_item = item.with_reactions(reactions);
654                    state.items.replace(item_pos, new_item);
655                } else {
656                    warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
657                }
658
659                // Release the lock before running the request.
660                drop(state);
661
662                trace!("sending redact for a previous reaction");
663                if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
664                    if let Some(reaction_info) = reaction_info {
665                        debug!("sending redact failed, adding the reaction back to the list");
666
667                        let mut state = self.state.write().await;
668                        if let Some((item_pos, item)) =
669                            rfind_event_by_id(&state.items, &annotated_event_id)
670                        {
671                            // Re-add the reaction to the mapping.
672                            let mut reactions = item.content().reactions();
673                            reactions
674                                .entry(key.to_owned())
675                                .or_default()
676                                .insert(user_id.to_owned(), reaction_info);
677                            let new_item = item.with_reactions(reactions);
678                            state.items.replace(item_pos, new_item);
679                        } else {
680                            warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
681                        }
682                    }
683
684                    return Err(err);
685                }
686            }
687        }
688
689        Ok(false)
690    }
691
692    /// Handle updates on events as [`VectorDiff`]s.
693    pub(super) async fn handle_remote_events_with_diffs(
694        &self,
695        diffs: Vec<VectorDiff<TimelineEvent>>,
696        origin: RemoteEventOrigin,
697    ) {
698        if diffs.is_empty() {
699            return;
700        }
701
702        let mut state = self.state.write().await;
703        state
704            .handle_remote_events_with_diffs(
705                diffs,
706                origin,
707                &self.room_data_provider,
708                &self.settings,
709            )
710            .await
711    }
712
713    pub(super) async fn clear(&self) {
714        self.state.write().await.clear();
715    }
716
717    /// Replaces the content of the current timeline with initial events.
718    ///
719    /// Also sets up read receipts and the read marker for a live timeline of a
720    /// room.
721    ///
722    /// This is all done with a single lock guard, since we don't want the state
723    /// to be modified between the clear and re-insertion of new events.
724    pub(super) async fn replace_with_initial_remote_events<Events>(
725        &self,
726        events: Events,
727        origin: RemoteEventOrigin,
728    ) where
729        Events: IntoIterator + ExactSizeIterator,
730        <Events as IntoIterator>::Item: Into<TimelineEvent>,
731    {
732        let mut state = self.state.write().await;
733
734        let track_read_markers = self.settings.track_read_receipts;
735        if track_read_markers {
736            state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
737            state
738                .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
739                .await;
740        }
741
742        // Replace the events if either the current event list or the new one aren't
743        // empty.
744        // Previously we just had to check the new one wasn't empty because
745        // we did a clear operation before so the current one would always be empty, but
746        // now we may want to replace a populated timeline with an empty one.
747        if !state.items.is_empty() || events.len() > 0 {
748            state
749                .replace_with_remote_events(
750                    events,
751                    origin,
752                    &self.room_data_provider,
753                    &self.settings,
754                )
755                .await;
756        }
757
758        if track_read_markers {
759            if let Some(fully_read_event_id) =
760                self.room_data_provider.load_fully_read_marker().await
761            {
762                state.handle_fully_read_marker(fully_read_event_id);
763            }
764        }
765    }
766
767    pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
768        self.state.write().await.handle_fully_read_marker(fully_read_event_id);
769    }
770
771    pub(super) async fn handle_ephemeral_events(
772        &self,
773        events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
774    ) {
775        let mut state = self.state.write().await;
776        state.handle_ephemeral_events(events, &self.room_data_provider).await;
777    }
778
779    /// Creates the local echo for an event we're sending.
780    #[instrument(skip_all)]
781    pub(super) async fn handle_local_event(
782        &self,
783        txn_id: OwnedTransactionId,
784        content: TimelineEventKind,
785        send_handle: Option<SendHandle>,
786    ) {
787        let sender = self.room_data_provider.own_user_id().to_owned();
788        let profile = self.room_data_provider.profile_from_user_id(&sender).await;
789
790        // Only add new items if the timeline is live.
791        let should_add_new_items = self.is_live().await;
792
793        let date_divider_mode = self.settings.date_divider_mode.clone();
794
795        let mut state = self.state.write().await;
796        state
797            .handle_local_event(
798                sender,
799                profile,
800                should_add_new_items,
801                date_divider_mode,
802                txn_id,
803                send_handle,
804                content,
805            )
806            .await;
807    }
808
809    /// Update the send state of a local event represented by a transaction ID.
810    ///
811    /// If the corresponding local timeline item is missing, a warning is
812    /// raised.
813    #[instrument(skip(self))]
814    pub(super) async fn update_event_send_state(
815        &self,
816        txn_id: &TransactionId,
817        send_state: EventSendState,
818    ) {
819        let mut state = self.state.write().await;
820        let mut txn = state.transaction();
821
822        let new_event_id: Option<&EventId> =
823            as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
824
825        // The local echoes are always at the end of the timeline, we must first make
826        // sure the remote echo hasn't showed up yet.
827        if rfind_event_item(&txn.items, |it| {
828            new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
829        })
830        .is_some()
831        {
832            // Remote echo already received. This is very unlikely.
833            trace!("Remote echo received before send-event response");
834
835            let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
836
837            // If there's both the remote echo and a local echo, that means the
838            // remote echo was received before the response *and* contained no
839            // transaction ID (and thus duplicated the local echo).
840            if let Some((idx, _)) = local_echo {
841                warn!("Message echo got duplicated, removing the local one");
842                txn.items.remove(idx);
843
844                // Adjust the date dividers, if needs be.
845                let mut adjuster =
846                    DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
847                adjuster.run(&mut txn.items, &mut txn.meta);
848            }
849
850            txn.commit();
851            return;
852        }
853
854        // Look for the local event by the transaction ID or event ID.
855        let result = rfind_event_item(&txn.items, |it| {
856            it.transaction_id() == Some(txn_id)
857                || new_event_id.is_some()
858                    && it.event_id() == new_event_id
859                    && it.as_local().is_some()
860        });
861
862        let Some((idx, item)) = result else {
863            // Event wasn't found as a standalone item.
864            //
865            // If it was just sent, try to find if it matches a corresponding aggregation,
866            // and mark it as sent in that case.
867            if let Some(new_event_id) = new_event_id {
868                match txn
869                    .meta
870                    .aggregations
871                    .mark_aggregation_as_sent(txn_id.to_owned(), new_event_id.to_owned())
872                {
873                    MarkAggregationSentResult::MarkedSent { update } => {
874                        trace!("marked aggregation as sent");
875
876                        if let Some((target, aggregation)) = update {
877                            if let Some((item_pos, item)) =
878                                rfind_event_by_item_id(&txn.items, &target)
879                            {
880                                let mut content = item.content().clone();
881                                match aggregation.apply(&mut content) {
882                                    ApplyAggregationResult::UpdatedItem => {
883                                        trace!("reapplied aggregation in the event");
884                                        let internal_id = item.internal_id.to_owned();
885                                        let new_item = item.with_content(content);
886                                        txn.items.replace(
887                                            item_pos,
888                                            TimelineItem::new(new_item, internal_id),
889                                        );
890                                        txn.commit();
891                                    }
892                                    ApplyAggregationResult::LeftItemIntact => {}
893                                    ApplyAggregationResult::Error(err) => {
894                                        warn!("when reapplying aggregation just marked as sent: {err}");
895                                    }
896                                }
897                            }
898                        }
899
900                        // Early return: we've found the event to mark as sent, it was an
901                        // aggregation.
902                        return;
903                    }
904
905                    MarkAggregationSentResult::NotFound => {}
906                }
907            }
908
909            warn!("Timeline item not found, can't update send state");
910            return;
911        };
912
913        let Some(local_item) = item.as_local() else {
914            warn!("We looked for a local item, but it transitioned to remote.");
915            return;
916        };
917
918        // The event was already marked as sent, that's a broken state, let's
919        // emit an error but also override to the given sent state.
920        if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
921            error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
922        }
923
924        // If the event has just been marked as sent, update the aggregations mapping to
925        // take that into account.
926        if let Some(new_event_id) = new_event_id {
927            txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
928        }
929
930        let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
931        txn.items.replace(idx, new_item);
932
933        txn.commit();
934    }
935
936    pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
937        let mut state = self.state.write().await;
938
939        if let Some((idx, _)) =
940            rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
941        {
942            let mut txn = state.transaction();
943
944            txn.items.remove(idx);
945
946            // A read marker or a date divider may have been inserted before the local echo.
947            // Ensure both are up to date.
948            let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
949            adjuster.run(&mut txn.items, &mut txn.meta);
950
951            txn.meta.update_read_marker(&mut txn.items);
952
953            txn.commit();
954
955            debug!("discarded local echo");
956            return true;
957        }
958
959        // Avoid multiple mutable and immutable borrows of the lock guard by explicitly
960        // dereferencing it once.
961        let state = &mut *state;
962
963        // Look if this was a local aggregation.
964        if let Some((target, aggregation)) = state
965            .meta
966            .aggregations
967            .try_remove_aggregation(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
968        {
969            let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, target) else {
970                warn!("missing target item for a local aggregation");
971                return false;
972            };
973
974            let mut content = item.content().clone();
975            match aggregation.unapply(&mut content) {
976                ApplyAggregationResult::UpdatedItem => {
977                    trace!("removed local reaction to local echo");
978                    let internal_id = item.internal_id.clone();
979                    let new_item = item.with_content(content);
980                    state.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
981                }
982                ApplyAggregationResult::LeftItemIntact => {}
983                ApplyAggregationResult::Error(err) => {
984                    warn!("when undoing local aggregation: {err}");
985                }
986            }
987
988            return true;
989        }
990
991        false
992    }
993
994    pub(super) async fn replace_local_echo(
995        &self,
996        txn_id: &TransactionId,
997        content: AnyMessageLikeEventContent,
998    ) -> bool {
999        let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1000            // Ideally, we'd support replacing local echoes for a reaction, etc., but
1001            // handling RoomMessage should be sufficient in most cases. Worst
1002            // case, the local echo will be sent Soon™ and we'll get another chance at
1003            // editing the event then.
1004            warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1005            return false;
1006        };
1007
1008        let mut state = self.state.write().await;
1009        let mut txn = state.transaction();
1010
1011        let Some((idx, prev_item)) =
1012            rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1013        else {
1014            debug!("Can't find local echo to replace");
1015            return false;
1016        };
1017
1018        // Reuse the previous local echo's state, but reset the send state to not sent
1019        // (per API contract).
1020        let ti_kind = {
1021            let Some(prev_local_item) = prev_item.as_local() else {
1022                warn!("We looked for a local item, but it transitioned as remote??");
1023                return false;
1024            };
1025            prev_local_item.with_send_state(EventSendState::NotSentYet)
1026        };
1027
1028        // Replace the local-related state (kind) and the content state.
1029        let prev_reactions = prev_item.content().reactions();
1030        let prev_thread_root = prev_item.content().thread_root();
1031        let prev_in_reply_to = prev_item.content().in_reply_to();
1032        let new_item = TimelineItem::new(
1033            prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1034                content,
1035                None,
1036                prev_reactions,
1037                prev_thread_root,
1038                prev_in_reply_to,
1039            )),
1040            prev_item.internal_id.to_owned(),
1041        );
1042
1043        txn.items.replace(idx, new_item);
1044
1045        // This doesn't change the original sending time, so there's no need to adjust
1046        // date dividers.
1047
1048        txn.commit();
1049
1050        debug!("Replaced local echo");
1051        true
1052    }
1053
1054    async fn retry_event_decryption_inner(
1055        &self,
1056        decryptor: D,
1057        session_ids: Option<BTreeSet<String>>,
1058    ) {
1059        self.decryption_retry_task.decrypt(decryptor, session_ids, self.settings.clone()).await;
1060    }
1061
1062    pub(super) async fn set_sender_profiles_pending(&self) {
1063        self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1064    }
1065
1066    pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1067        self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1068    }
1069
1070    async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1071        self.state.write().await.items.for_each(|mut entry| {
1072            let Some(event_item) = entry.as_event() else { return };
1073            if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1074                let new_item = entry.with_kind(TimelineItemKind::Event(
1075                    event_item.with_sender_profile(profile_state.clone()),
1076                ));
1077                ObservableItemsEntry::replace(&mut entry, new_item);
1078            }
1079        });
1080    }
1081
1082    pub(super) async fn update_missing_sender_profiles(&self) {
1083        trace!("Updating missing sender profiles");
1084
1085        let mut state = self.state.write().await;
1086        let mut entries = state.items.entries();
1087        while let Some(mut entry) = entries.next() {
1088            let Some(event_item) = entry.as_event() else { continue };
1089            let event_id = event_item.event_id().map(debug);
1090            let transaction_id = event_item.transaction_id().map(debug);
1091
1092            if event_item.sender_profile().is_ready() {
1093                trace!(event_id, transaction_id, "Profile already set");
1094                continue;
1095            }
1096
1097            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1098                Some(profile) => {
1099                    trace!(event_id, transaction_id, "Adding profile");
1100                    let updated_item =
1101                        event_item.with_sender_profile(TimelineDetails::Ready(profile));
1102                    let new_item = entry.with_kind(updated_item);
1103                    ObservableItemsEntry::replace(&mut entry, new_item);
1104                }
1105                None => {
1106                    if !event_item.sender_profile().is_unavailable() {
1107                        trace!(event_id, transaction_id, "Marking profile unavailable");
1108                        let updated_item =
1109                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1110                        let new_item = entry.with_kind(updated_item);
1111                        ObservableItemsEntry::replace(&mut entry, new_item);
1112                    } else {
1113                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1114                    }
1115                }
1116            }
1117        }
1118
1119        trace!("Done updating missing sender profiles");
1120    }
1121
1122    /// Update the profiles of the given senders, even if they are ready.
1123    pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1124        trace!("Forcing update of sender profiles: {sender_ids:?}");
1125
1126        let mut state = self.state.write().await;
1127        let mut entries = state.items.entries();
1128        while let Some(mut entry) = entries.next() {
1129            let Some(event_item) = entry.as_event() else { continue };
1130            if !sender_ids.contains(event_item.sender()) {
1131                continue;
1132            }
1133
1134            let event_id = event_item.event_id().map(debug);
1135            let transaction_id = event_item.transaction_id().map(debug);
1136
1137            match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1138                Some(profile) => {
1139                    if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1140                    {
1141                        debug!(event_id, transaction_id, "Profile already up-to-date");
1142                    } else {
1143                        trace!(event_id, transaction_id, "Updating profile");
1144                        let updated_item =
1145                            event_item.with_sender_profile(TimelineDetails::Ready(profile));
1146                        let new_item = entry.with_kind(updated_item);
1147                        ObservableItemsEntry::replace(&mut entry, new_item);
1148                    }
1149                }
1150                None => {
1151                    if !event_item.sender_profile().is_unavailable() {
1152                        trace!(event_id, transaction_id, "Marking profile unavailable");
1153                        let updated_item =
1154                            event_item.with_sender_profile(TimelineDetails::Unavailable);
1155                        let new_item = entry.with_kind(updated_item);
1156                        ObservableItemsEntry::replace(&mut entry, new_item);
1157                    } else {
1158                        debug!(event_id, transaction_id, "Profile already marked unavailable");
1159                    }
1160                }
1161            }
1162        }
1163
1164        trace!("Done forcing update of sender profiles");
1165    }
1166
1167    #[cfg(test)]
1168    pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1169        let own_user_id = self.room_data_provider.own_user_id();
1170        self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1171    }
1172
1173    /// Get the latest read receipt for the given user.
1174    ///
1175    /// Useful to get the latest read receipt, whether it's private or public.
1176    pub(super) async fn latest_user_read_receipt(
1177        &self,
1178        user_id: &UserId,
1179    ) -> Option<(OwnedEventId, Receipt)> {
1180        self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
1181    }
1182
1183    /// Get the ID of the timeline event with the latest read receipt for the
1184    /// given user.
1185    pub(super) async fn latest_user_read_receipt_timeline_event_id(
1186        &self,
1187        user_id: &UserId,
1188    ) -> Option<OwnedEventId> {
1189        self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1190    }
1191
1192    /// Subscribe to changes in the read receipts of our own user.
1193    pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1194        self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1195    }
1196
1197    /// Handle a room send update that's a new local echo.
1198    pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1199        match echo.content {
1200            LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1201                let content = match serialized_event.deserialize() {
1202                    Ok(d) => d,
1203                    Err(err) => {
1204                        warn!("error deserializing local echo: {err}");
1205                        return;
1206                    }
1207                };
1208
1209                self.handle_local_event(
1210                    echo.transaction_id.clone(),
1211                    TimelineEventKind::Message { content, relations: Default::default() },
1212                    Some(send_handle),
1213                )
1214                .await;
1215
1216                if let Some(send_error) = send_error {
1217                    self.update_event_send_state(
1218                        &echo.transaction_id,
1219                        EventSendState::SendingFailed {
1220                            error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)),
1221                            is_recoverable: false,
1222                        },
1223                    )
1224                    .await;
1225                }
1226            }
1227
1228            LocalEchoContent::React { key, send_handle, applies_to } => {
1229                self.handle_local_reaction(key, send_handle, applies_to).await;
1230            }
1231        }
1232    }
1233
1234    /// Adds a reaction (local echo) to a local echo.
1235    #[instrument(skip(self, send_handle))]
1236    async fn handle_local_reaction(
1237        &self,
1238        reaction_key: String,
1239        send_handle: SendReactionHandle,
1240        applies_to: OwnedTransactionId,
1241    ) {
1242        let mut state = self.state.write().await;
1243        let mut tr = state.transaction();
1244
1245        let target = TimelineEventItemId::TransactionId(applies_to);
1246
1247        let reaction_txn_id = send_handle.transaction_id().to_owned();
1248        let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1249        let aggregation = Aggregation::new(
1250            TimelineEventItemId::TransactionId(reaction_txn_id),
1251            AggregationKind::Reaction {
1252                key: reaction_key.clone(),
1253                sender: self.room_data_provider.own_user_id().to_owned(),
1254                timestamp: MilliSecondsSinceUnixEpoch::now(),
1255                reaction_status,
1256            },
1257        );
1258
1259        tr.meta.aggregations.add(target.clone(), aggregation.clone());
1260        find_item_and_apply_aggregation(&mut tr.items, &target, aggregation);
1261
1262        tr.commit();
1263    }
1264
1265    /// Handle a single room send queue update.
1266    pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1267        match update {
1268            RoomSendQueueUpdate::NewLocalEvent(echo) => {
1269                self.handle_local_echo(echo).await;
1270            }
1271
1272            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1273                if !self.discard_local_echo(&transaction_id).await {
1274                    warn!("couldn't find the local echo to discard");
1275                }
1276            }
1277
1278            RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1279                let content = match new_content.deserialize() {
1280                    Ok(d) => d,
1281                    Err(err) => {
1282                        warn!("error deserializing local echo (upon edit): {err}");
1283                        return;
1284                    }
1285                };
1286
1287                if !self.replace_local_echo(&transaction_id, content).await {
1288                    warn!("couldn't find the local echo to replace");
1289                }
1290            }
1291
1292            RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1293                self.update_event_send_state(
1294                    &transaction_id,
1295                    EventSendState::SendingFailed { error, is_recoverable },
1296                )
1297                .await;
1298            }
1299
1300            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1301                self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1302            }
1303
1304            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1305                self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1306                    .await;
1307            }
1308
1309            RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1310                // TODO(bnjbvr): Do something else?
1311                info!(txn_id = %related_to, "some media for a media event has been uploaded");
1312            }
1313        }
1314    }
1315
1316    /// Insert a timeline start item at the beginning of the room, if it's
1317    /// missing.
1318    pub async fn insert_timeline_start_if_missing(&self) {
1319        let mut state = self.state.write().await;
1320        let mut txn = state.transaction();
1321        if txn.items.get(0).is_some_and(|item| item.is_timeline_start()) {
1322            return;
1323        }
1324        txn.items.push_front(txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart), None);
1325        txn.commit();
1326    }
1327}
1328
1329impl TimelineController {
1330    pub(super) fn room(&self) -> &Room {
1331        &self.room_data_provider
1332    }
1333
1334    /// Given an event identifier, will fetch the details for the event it's
1335    /// replying to, if applicable.
1336    #[instrument(skip(self))]
1337    pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1338        let state = self.state.write().await;
1339        let (index, item) = rfind_event_by_id(&state.items, event_id)
1340            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1341        let remote_item = item
1342            .as_remote()
1343            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1344            .clone();
1345
1346        let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1347            debug!("Event is not a message");
1348            return Ok(());
1349        };
1350        let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1351            debug!("Event is not a reply");
1352            return Ok(());
1353        };
1354        if let TimelineDetails::Pending = &in_reply_to.event {
1355            debug!("Replied-to event is already being fetched");
1356            return Ok(());
1357        }
1358        if let TimelineDetails::Ready(_) = &in_reply_to.event {
1359            debug!("Replied-to event has already been fetched");
1360            return Ok(());
1361        }
1362
1363        let internal_id = item.internal_id.to_owned();
1364        let item = item.clone();
1365        let event = fetch_replied_to_event(
1366            state,
1367            index,
1368            &item,
1369            internal_id,
1370            &msglike,
1371            &in_reply_to.event_id,
1372            self.room(),
1373        )
1374        .await?;
1375
1376        // We need to be sure to have the latest position of the event as it might have
1377        // changed while waiting for the request.
1378        let mut state = self.state.write().await;
1379        let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1380            .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1381
1382        // Check the state of the event again, it might have been redacted while
1383        // the request was in-flight.
1384        let TimelineItemContent::MsgLike(MsgLikeContent {
1385            kind: MsgLikeKind::Message(message),
1386            reactions,
1387            thread_root,
1388            in_reply_to,
1389        }) = item.content().clone()
1390        else {
1391            info!("Event is no longer a message (redacted?)");
1392            return Ok(());
1393        };
1394        let Some(in_reply_to) = in_reply_to else {
1395            warn!("Event no longer has a reply (bug?)");
1396            return Ok(());
1397        };
1398
1399        // Now that we've received the content of the replied-to event, replace the
1400        // replied-to content in the item with it.
1401        trace!("Updating in-reply-to details");
1402        let internal_id = item.internal_id.to_owned();
1403        let mut item = item.clone();
1404        item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1405            kind: MsgLikeKind::Message(message),
1406            reactions,
1407            thread_root,
1408            in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1409        }));
1410        state.items.replace(index, TimelineItem::new(item, internal_id));
1411
1412        Ok(())
1413    }
1414
1415    /// Check whether the given receipt should be sent.
1416    ///
1417    /// Returns `false` if the given receipt is older than the current one.
1418    pub(super) async fn should_send_receipt(
1419        &self,
1420        receipt_type: &SendReceiptType,
1421        thread: &ReceiptThread,
1422        event_id: &EventId,
1423    ) -> bool {
1424        // We don't support threaded receipts yet.
1425        if *thread != ReceiptThread::Unthreaded {
1426            return true;
1427        }
1428
1429        let own_user_id = self.room().own_user_id();
1430        let state = self.state.read().await;
1431        let room = self.room();
1432
1433        match receipt_type {
1434            SendReceiptType::Read => {
1435                if let Some((old_pub_read, _)) = state
1436                    .meta
1437                    .user_receipt(
1438                        own_user_id,
1439                        ReceiptType::Read,
1440                        room,
1441                        state.items.all_remote_events(),
1442                    )
1443                    .await
1444                {
1445                    trace!(%old_pub_read, "found a previous public receipt");
1446                    if let Some(relative_pos) = state.meta.compare_events_positions(
1447                        &old_pub_read,
1448                        event_id,
1449                        state.items.all_remote_events(),
1450                    ) {
1451                        trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1452                        return relative_pos == RelativePosition::After;
1453                    }
1454                }
1455            }
1456            // Implicit read receipts are saved as public read receipts, so get the latest. It also
1457            // doesn't make sense to have a private read receipt behind a public one.
1458            SendReceiptType::ReadPrivate => {
1459                if let Some((old_priv_read, _)) =
1460                    state.latest_user_read_receipt(own_user_id, room).await
1461                {
1462                    trace!(%old_priv_read, "found a previous private receipt");
1463                    if let Some(relative_pos) = state.meta.compare_events_positions(
1464                        &old_priv_read,
1465                        event_id,
1466                        state.items.all_remote_events(),
1467                    ) {
1468                        trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1469                        return relative_pos == RelativePosition::After;
1470                    }
1471                }
1472            }
1473            SendReceiptType::FullyRead => {
1474                if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1475                {
1476                    if let Some(relative_pos) = state.meta.compare_events_positions(
1477                        &prev_event_id,
1478                        event_id,
1479                        state.items.all_remote_events(),
1480                    ) {
1481                        return relative_pos == RelativePosition::After;
1482                    }
1483                }
1484            }
1485            _ => {}
1486        }
1487
1488        // Let the server handle unknown receipts.
1489        true
1490    }
1491
1492    /// Returns the latest event identifier, even if it's not visible, or if
1493    /// it's folded into another timeline item.
1494    pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1495        let state = self.state.read().await;
1496        state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1497    }
1498
1499    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1500    pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1501        self.retry_event_decryption_inner(self.room().to_owned(), session_ids).await
1502    }
1503}
1504
1505#[cfg(test)]
1506impl<P: RoomDataProvider> TimelineController<P, (OlmMachine, OwnedRoomId)> {
1507    pub(super) async fn retry_event_decryption_test(
1508        &self,
1509        room_id: &RoomId,
1510        olm_machine: OlmMachine,
1511        session_ids: Option<BTreeSet<String>>,
1512    ) {
1513        self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1514    }
1515}
1516
1517async fn fetch_replied_to_event(
1518    mut state: RwLockWriteGuard<'_, TimelineState>,
1519    index: usize,
1520    item: &EventTimelineItem,
1521    internal_id: TimelineUniqueId,
1522    msglike: &MsgLikeContent,
1523    in_reply_to: &EventId,
1524    room: &Room,
1525) -> Result<TimelineDetails<Box<RepliedToEvent>>, Error> {
1526    if let Some((_, item)) = rfind_event_by_id(&state.items, in_reply_to) {
1527        let details = TimelineDetails::Ready(Box::new(RepliedToEvent::from_timeline_item(&item)));
1528        trace!("Found replied-to event locally");
1529        return Ok(details);
1530    };
1531
1532    // Replace the item with a new timeline item that has the fetching status of the
1533    // replied-to event to pending.
1534    trace!("Setting in-reply-to details to pending");
1535    let in_reply_to_details =
1536        InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1537
1538    let event_item = item
1539        .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1540
1541    let new_timeline_item = TimelineItem::new(event_item, internal_id);
1542    state.items.replace(index, new_timeline_item);
1543
1544    // Don't hold the state lock while the network request is made
1545    drop(state);
1546
1547    trace!("Fetching replied-to event");
1548    let res = match room.load_or_fetch_event(in_reply_to, None).await {
1549        Ok(timeline_event) => TimelineDetails::Ready(Box::new(
1550            RepliedToEvent::try_from_timeline_event(timeline_event, room).await?,
1551        )),
1552        Err(e) => TimelineDetails::Error(Arc::new(e)),
1553    };
1554    Ok(res)
1555}