matrix_sdk_ui/timeline/controller/
state_transaction.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::TimelineEvent;
20use ruma::{push::Action, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId};
21use tracing::{debug, instrument, warn};
22
23use super::{
24    super::{
25        controller::{FullEventMeta, ObservableItemsTransactionEntry},
26        date_dividers::DateDividerAdjuster,
27        event_handler::{
28            Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind,
29            TimelineItemPosition,
30        },
31        event_item::RemoteEventOrigin,
32        traits::RoomDataProvider,
33    },
34    ObservableItems, ObservableItemsTransaction, TimelineFocusKind, TimelineMetadata,
35    TimelineSettings,
36};
37use crate::{events::SyncTimelineEventWithoutContent, timeline::VirtualTimelineItem};
38
39pub(in crate::timeline) struct TimelineStateTransaction<'a> {
40    /// A vector transaction over the items themselves. Holds temporary state
41    /// until committed.
42    pub items: ObservableItemsTransaction<'a>,
43
44    /// Number of items when the transaction has been created/has started.
45    number_of_items_when_transaction_started: usize,
46
47    /// A clone of the previous meta, that we're operating on during the
48    /// transaction, and that will be committed to the previous meta location in
49    /// [`Self::commit`].
50    pub meta: TimelineMetadata,
51
52    /// Pointer to the previous meta, only used during [`Self::commit`].
53    previous_meta: &'a mut TimelineMetadata,
54
55    /// The kind of focus of this timeline.
56    pub(super) timeline_focus: TimelineFocusKind,
57}
58
59impl<'a> TimelineStateTransaction<'a> {
60    /// Create a new [`TimelineStateTransaction`].
61    pub(super) fn new(
62        items: &'a mut ObservableItems,
63        meta: &'a mut TimelineMetadata,
64        timeline_focus: TimelineFocusKind,
65    ) -> Self {
66        let previous_meta = meta;
67        let meta = previous_meta.clone();
68        let items = items.transaction();
69
70        Self {
71            number_of_items_when_transaction_started: items.len(),
72            items,
73            previous_meta,
74            meta,
75            timeline_focus,
76        }
77    }
78
79    /// Handle updates on events as [`VectorDiff`]s.
80    pub(super) async fn handle_remote_events_with_diffs<RoomData>(
81        &mut self,
82        diffs: Vec<VectorDiff<TimelineEvent>>,
83        origin: RemoteEventOrigin,
84        room_data_provider: &RoomData,
85        settings: &TimelineSettings,
86    ) where
87        RoomData: RoomDataProvider,
88    {
89        let mut date_divider_adjuster =
90            DateDividerAdjuster::new(settings.date_divider_mode.clone());
91
92        for diff in diffs {
93            match diff {
94                VectorDiff::Append { values: events } => {
95                    for event in events {
96                        self.handle_remote_event(
97                            event,
98                            TimelineItemPosition::End { origin },
99                            room_data_provider,
100                            settings,
101                            &mut date_divider_adjuster,
102                        )
103                        .await;
104                    }
105                }
106
107                VectorDiff::PushFront { value: event } => {
108                    self.handle_remote_event(
109                        event,
110                        TimelineItemPosition::Start { origin },
111                        room_data_provider,
112                        settings,
113                        &mut date_divider_adjuster,
114                    )
115                    .await;
116                }
117
118                VectorDiff::PushBack { value: event } => {
119                    self.handle_remote_event(
120                        event,
121                        TimelineItemPosition::End { origin },
122                        room_data_provider,
123                        settings,
124                        &mut date_divider_adjuster,
125                    )
126                    .await;
127                }
128
129                VectorDiff::Insert { index: event_index, value: event } => {
130                    self.handle_remote_event(
131                        event,
132                        TimelineItemPosition::At { event_index, origin },
133                        room_data_provider,
134                        settings,
135                        &mut date_divider_adjuster,
136                    )
137                    .await;
138                }
139
140                VectorDiff::Set { index: event_index, value: event } => {
141                    if let Some(timeline_item_index) = self
142                        .items
143                        .all_remote_events()
144                        .get(event_index)
145                        .and_then(|meta| meta.timeline_item_index)
146                    {
147                        self.handle_remote_event(
148                            event,
149                            TimelineItemPosition::UpdateAt { timeline_item_index },
150                            room_data_provider,
151                            settings,
152                            &mut date_divider_adjuster,
153                        )
154                        .await;
155                    } else {
156                        warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
157                    }
158                }
159
160                VectorDiff::Remove { index: event_index } => {
161                    self.remove_timeline_item(event_index, &mut date_divider_adjuster);
162                }
163
164                VectorDiff::Clear => {
165                    self.clear();
166                }
167
168                v => unimplemented!("{v:?}"),
169            }
170        }
171
172        self.adjust_date_dividers(date_divider_adjuster);
173        self.check_invariants();
174    }
175
176    fn check_invariants(&self) {
177        self.check_no_duplicate_read_receipts();
178        self.check_no_unused_unique_ids();
179    }
180
181    fn check_no_duplicate_read_receipts(&self) {
182        let mut by_user_id = HashMap::new();
183        let mut duplicates = HashSet::new();
184
185        for item in self.items.iter().filter_map(|item| item.as_event()) {
186            if let Some(event_id) = item.event_id() {
187                for (user_id, _read_receipt) in item.read_receipts() {
188                    if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
189                        duplicates.insert((user_id.clone(), prev_event_id, event_id));
190                    }
191                }
192            }
193        }
194
195        if !duplicates.is_empty() {
196            #[cfg(any(debug_assertions, test))]
197            panic!("duplicate read receipts in this timeline:{:?}\n{:?}", duplicates, self.items);
198
199            #[cfg(not(any(debug_assertions, test)))]
200            tracing::error!(
201                "duplicate read receipts in this timeline:{:?}\n{:?}",
202                duplicates,
203                self.items
204            );
205        }
206    }
207
208    fn check_no_unused_unique_ids(&self) {
209        let duplicates = self
210            .items
211            .iter()
212            .duplicates_by(|item| item.unique_id())
213            .map(|item| item.unique_id())
214            .collect::<Vec<_>>();
215
216        if !duplicates.is_empty() {
217            #[cfg(any(debug_assertions, test))]
218            panic!("duplicate unique ids in this timeline:{:?}\n{:?}", duplicates, self.items);
219
220            #[cfg(not(any(debug_assertions, test)))]
221            tracing::error!(
222                "duplicate unique ids in this timeline:{:?}\n{:?}",
223                duplicates,
224                self.items
225            );
226        }
227    }
228
229    /// Handle a remote event.
230    ///
231    /// Returns the number of timeline updates that were made.
232    pub(super) async fn handle_remote_event<P: RoomDataProvider>(
233        &mut self,
234        event: TimelineEvent,
235        position: TimelineItemPosition,
236        room_data_provider: &P,
237        settings: &TimelineSettings,
238        date_divider_adjuster: &mut DateDividerAdjuster,
239    ) -> HandleEventResult {
240        let TimelineEvent { push_actions, kind } = event;
241        let encryption_info = kind.encryption_info().cloned();
242
243        let (raw, utd_info) = match kind {
244            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
245                utd_info,
246                event,
247            } => (event, Some(utd_info)),
248            _ => (kind.into_raw(), None),
249        };
250
251        let (event_id, sender, timestamp, txn_id, event_kind, should_add) = match raw.deserialize()
252        {
253            // Classical path: the event is valid, can be deserialized, everything is alright.
254            Ok(event) => {
255                let event_id = event.event_id().to_owned();
256                let room_version = room_data_provider.room_version();
257
258                let mut should_add = (settings.event_filter)(&event, &room_version);
259
260                if should_add {
261                    // Retrieve the origin of the event.
262                    let origin = match position {
263                        TimelineItemPosition::End { origin }
264                        | TimelineItemPosition::Start { origin }
265                        | TimelineItemPosition::At { origin, .. } => origin,
266
267                        TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
268                            .items
269                            .get(idx)
270                            .and_then(|item| item.as_event())
271                            .and_then(|item| item.as_remote())
272                            .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
273                    };
274
275                    // If the event should be added according to the general event filter, use a
276                    // second filter to decide whether it should be added depending on the timeline
277                    // focus and events origin, if needed
278                    match self.timeline_focus {
279                        TimelineFocusKind::PinnedEvents => {
280                            // Only add pinned events for the pinned events timeline
281                            should_add = room_data_provider.is_pinned_event(&event_id);
282                        }
283                        TimelineFocusKind::Live => {
284                            match origin {
285                                RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => {
286                                    // Always add new items to a live timeline receiving items from
287                                    // sync.
288                                    should_add = true;
289                                }
290                                RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => {
291                                    // Forward the previous decision to add it.
292                                }
293                            }
294                        }
295                        TimelineFocusKind::Event => {
296                            match origin {
297                                RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => {
298                                    // Never add any item to a focused timeline when the item comes
299                                    // down from the sync.
300                                    should_add = false;
301                                }
302                                RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => {
303                                    // Forward the previous decision to add it.
304                                }
305                            }
306                        }
307                    }
308                }
309
310                (
311                    event_id,
312                    event.sender().to_owned(),
313                    event.origin_server_ts(),
314                    event.transaction_id().map(ToOwned::to_owned),
315                    TimelineEventKind::from_event(event, &raw, room_data_provider, utd_info).await,
316                    should_add,
317                )
318            }
319
320            // The event seems invalid…
321            Err(e) => match raw.deserialize_as::<SyncTimelineEventWithoutContent>() {
322                // The event can be partially deserialized, and it is allowed to be added to the
323                // timeline.
324                Ok(event) if settings.add_failed_to_parse => (
325                    event.event_id().to_owned(),
326                    event.sender().to_owned(),
327                    event.origin_server_ts(),
328                    event.transaction_id().map(ToOwned::to_owned),
329                    TimelineEventKind::failed_to_parse(event, e),
330                    true,
331                ),
332
333                // The event can be partially deserialized, but it is NOT allowed to be added to
334                // the timeline.
335                Ok(event) => {
336                    let event_type = event.event_type();
337                    let event_id = event.event_id();
338                    warn!(%event_type, %event_id, "Failed to deserialize timeline event: {e}");
339
340                    let is_own_event = event.sender() == room_data_provider.own_user_id();
341                    let event_meta = FullEventMeta {
342                        event_id,
343                        sender: Some(event.sender()),
344                        is_own_event,
345                        timestamp: Some(event.origin_server_ts()),
346                        visible: false,
347                    };
348
349                    // Remember the event before returning prematurely.
350                    // See [`ObservableItems::all_remote_events`].
351                    self.add_or_update_remote_event(
352                        event_meta,
353                        position,
354                        room_data_provider,
355                        settings,
356                    )
357                    .await;
358
359                    return HandleEventResult::default();
360                }
361
362                // The event can NOT be partially deserialized, it seems really broken.
363                Err(e) => {
364                    let event_type: Option<String> = raw.get_field("type").ok().flatten();
365                    let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
366                    warn!(
367                        event_type,
368                        event_id, "Failed to deserialize timeline event even without content: {e}"
369                    );
370
371                    let event_id = event_id.and_then(|s| EventId::parse(s).ok());
372
373                    if let Some(event_id) = &event_id {
374                        let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
375                        let is_own_event =
376                            sender.as_ref().is_some_and(|s| s == room_data_provider.own_user_id());
377                        let timestamp: Option<MilliSecondsSinceUnixEpoch> =
378                            raw.get_field("origin_server_ts").ok().flatten();
379
380                        let event_meta = FullEventMeta {
381                            event_id,
382                            sender: sender.as_deref(),
383                            is_own_event,
384                            timestamp,
385                            visible: false,
386                        };
387
388                        // Remember the event before returning prematurely.
389                        // See [`ObservableItems::all_remote_events`].
390                        self.add_or_update_remote_event(
391                            event_meta,
392                            position,
393                            room_data_provider,
394                            settings,
395                        )
396                        .await;
397                    }
398
399                    return HandleEventResult::default();
400                }
401            },
402        };
403
404        let is_own_event = sender == room_data_provider.own_user_id();
405
406        let event_meta = FullEventMeta {
407            event_id: &event_id,
408            sender: Some(&sender),
409            is_own_event,
410            timestamp: Some(timestamp),
411            visible: should_add,
412        };
413
414        // Remember the event.
415        // See [`ObservableItems::all_remote_events`].
416        self.add_or_update_remote_event(event_meta, position, room_data_provider, settings).await;
417
418        let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
419        let ctx = TimelineEventContext {
420            sender,
421            sender_profile,
422            timestamp,
423            is_own_event,
424            read_receipts: if settings.track_read_receipts && should_add {
425                self.meta.read_receipts.compute_event_receipts(
426                    &event_id,
427                    &mut self.items,
428                    matches!(position, TimelineItemPosition::End { .. }),
429                )
430            } else {
431                Default::default()
432            },
433            is_highlighted: push_actions
434                .as_ref()
435                .is_some_and(|actions| actions.iter().any(Action::is_highlight)),
436            flow: Flow::Remote {
437                event_id: event_id.clone(),
438                raw_event: raw,
439                encryption_info,
440                txn_id,
441                position,
442            },
443            should_add_new_items: should_add,
444        };
445
446        // Handle the event to create or update a timeline item.
447        TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, event_kind).await
448    }
449
450    /// Remove one timeline item by its `event_index`.
451    fn remove_timeline_item(
452        &mut self,
453        event_index: usize,
454        day_divider_adjuster: &mut DateDividerAdjuster,
455    ) {
456        day_divider_adjuster.mark_used();
457
458        // We need to be careful here.
459        //
460        // We must first remove the timeline item, which will update the mapping between
461        // remote events and timeline items. Removing the timeline item will “unlink”
462        // this mapping as the remote event will be updated to map to nothing. Only
463        // after that, we can remove the remote event. Doing this in the other order
464        // will update the mapping twice, and will result in a corrupted state.
465
466        // Remove the timeline item first.
467        if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
468            // Fetch the `timeline_item_index` associated to the remote event.
469            if let Some(timeline_item_index) = event_meta.timeline_item_index {
470                let _ = self.items.remove(timeline_item_index);
471            }
472
473            // Now we can remove the remote event.
474            self.items.remove_remote_event(event_index);
475        }
476    }
477
478    pub(super) fn clear(&mut self) {
479        let has_local_echoes = self.items.iter().any(|item| item.is_local_echo());
480
481        // By first checking if there are any local echoes first, we do a bit
482        // more work in case some are found, but it should be worth it because
483        // there will often not be any, and only emitting a single
484        // `VectorDiff::Clear` should be much more efficient to process for
485        // subscribers.
486        if has_local_echoes {
487            // Remove all remote events and virtual items that aren't date dividers.
488            self.items.for_each(|entry| {
489                if entry.is_remote_event()
490                    || entry.as_virtual().is_some_and(|vitem| match vitem {
491                        VirtualTimelineItem::DateDivider(_) => false,
492                        VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
493                            true
494                        }
495                    })
496                {
497                    ObservableItemsTransactionEntry::remove(entry);
498                }
499            });
500
501            // Remove stray date dividers
502            let mut idx = 0;
503            while idx < self.items.len() {
504                if self.items[idx].is_date_divider()
505                    && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
506                {
507                    self.items.remove(idx);
508                    // don't increment idx because all elements have shifted
509                } else {
510                    idx += 1;
511                }
512            }
513        } else {
514            self.items.clear();
515        }
516
517        self.meta.clear();
518
519        debug!(remaining_items = self.items.len(), "Timeline cleared");
520    }
521
522    #[instrument(skip_all)]
523    pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
524        // A similar event has been handled already. We can ignore it.
525        if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
526            return;
527        }
528
529        self.meta.fully_read_event = Some(fully_read_event_id);
530        self.meta.update_read_marker(&mut self.items);
531    }
532
533    pub(super) fn commit(self) {
534        // Update the `subscriber_skip_count` value.
535        let previous_number_of_items = self.number_of_items_when_transaction_started;
536        let next_number_of_items = self.items.len();
537
538        if previous_number_of_items != next_number_of_items {
539            let count = self
540                .meta
541                .subscriber_skip_count
542                .compute_next(previous_number_of_items, next_number_of_items);
543            self.meta.subscriber_skip_count.update(count, &self.timeline_focus);
544        }
545
546        // Replace the pointer to the previous meta with the new one.
547        *self.previous_meta = self.meta;
548
549        self.items.commit();
550    }
551
552    /// Add or update a remote event in the
553    /// [`ObservableItems::all_remote_events`] collection.
554    ///
555    /// This method also adjusts read receipt if needed.
556    async fn add_or_update_remote_event<P: RoomDataProvider>(
557        &mut self,
558        event_meta: FullEventMeta<'_>,
559        position: TimelineItemPosition,
560        room_data_provider: &P,
561        settings: &TimelineSettings,
562    ) {
563        match position {
564            TimelineItemPosition::Start { .. } => {
565                self.items.push_front_remote_event(event_meta.base_meta())
566            }
567
568            TimelineItemPosition::End { .. } => {
569                self.items.push_back_remote_event(event_meta.base_meta());
570            }
571
572            TimelineItemPosition::At { event_index, .. } => {
573                self.items.insert_remote_event(event_index, event_meta.base_meta());
574            }
575
576            TimelineItemPosition::UpdateAt { .. } => {
577                if let Some(event) =
578                    self.items.get_remote_event_by_event_id_mut(event_meta.event_id)
579                {
580                    if event.visible != event_meta.visible {
581                        event.visible = event_meta.visible;
582
583                        if settings.track_read_receipts {
584                            // Since the event's visibility changed, we need to update the read
585                            // receipts of the previous visible event.
586                            self.maybe_update_read_receipts_of_prev_event(event_meta.event_id);
587                        }
588                    }
589                }
590            }
591        }
592
593        if settings.track_read_receipts
594            && matches!(
595                position,
596                TimelineItemPosition::Start { .. }
597                    | TimelineItemPosition::End { .. }
598                    | TimelineItemPosition::At { .. }
599            )
600        {
601            self.load_read_receipts_for_event(event_meta.event_id, room_data_provider).await;
602
603            self.maybe_add_implicit_read_receipt(event_meta);
604        }
605    }
606
607    pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
608        adjuster.run(&mut self.items, &mut self.meta);
609    }
610
611    /// This method replaces the `is_room_encrypted` value for all timeline
612    /// items to its updated version and creates a `VectorDiff::Set` operation
613    /// for each item which will be added to this transaction.
614    pub(super) fn mark_all_events_as_encrypted(&mut self) {
615        for idx in 0..self.items.len() {
616            let item = &self.items[idx];
617
618            if let Some(event) = item.as_event() {
619                if event.is_room_encrypted {
620                    continue;
621                }
622
623                let mut cloned_event = event.clone();
624                cloned_event.is_room_encrypted = true;
625
626                // Replace the existing item with a new version with the right encryption flag
627                let item = item.with_kind(cloned_event);
628                self.items.replace(idx, item);
629            }
630        }
631    }
632}