1use std::collections::{HashMap, HashSet};
16
17use eyeball_im::VectorDiff;
18use itertools::Itertools as _;
19use matrix_sdk::deserialized_responses::TimelineEvent;
20use ruma::{push::Action, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId};
21use tracing::{debug, instrument, warn};
22
23use super::{
24 super::{
25 controller::{FullEventMeta, ObservableItemsTransactionEntry},
26 date_dividers::DateDividerAdjuster,
27 event_handler::{
28 Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind,
29 TimelineItemPosition,
30 },
31 event_item::RemoteEventOrigin,
32 traits::RoomDataProvider,
33 },
34 ObservableItems, ObservableItemsTransaction, TimelineFocusKind, TimelineMetadata,
35 TimelineSettings,
36};
37use crate::{events::SyncTimelineEventWithoutContent, timeline::VirtualTimelineItem};
38
39pub(in crate::timeline) struct TimelineStateTransaction<'a> {
40 pub items: ObservableItemsTransaction<'a>,
43
44 number_of_items_when_transaction_started: usize,
46
47 pub meta: TimelineMetadata,
51
52 previous_meta: &'a mut TimelineMetadata,
54
55 pub(super) timeline_focus: TimelineFocusKind,
57}
58
59impl<'a> TimelineStateTransaction<'a> {
60 pub(super) fn new(
62 items: &'a mut ObservableItems,
63 meta: &'a mut TimelineMetadata,
64 timeline_focus: TimelineFocusKind,
65 ) -> Self {
66 let previous_meta = meta;
67 let meta = previous_meta.clone();
68 let items = items.transaction();
69
70 Self {
71 number_of_items_when_transaction_started: items.len(),
72 items,
73 previous_meta,
74 meta,
75 timeline_focus,
76 }
77 }
78
79 pub(super) async fn handle_remote_events_with_diffs<RoomData>(
81 &mut self,
82 diffs: Vec<VectorDiff<TimelineEvent>>,
83 origin: RemoteEventOrigin,
84 room_data_provider: &RoomData,
85 settings: &TimelineSettings,
86 ) where
87 RoomData: RoomDataProvider,
88 {
89 let mut date_divider_adjuster =
90 DateDividerAdjuster::new(settings.date_divider_mode.clone());
91
92 for diff in diffs {
93 match diff {
94 VectorDiff::Append { values: events } => {
95 for event in events {
96 self.handle_remote_event(
97 event,
98 TimelineItemPosition::End { origin },
99 room_data_provider,
100 settings,
101 &mut date_divider_adjuster,
102 )
103 .await;
104 }
105 }
106
107 VectorDiff::PushFront { value: event } => {
108 self.handle_remote_event(
109 event,
110 TimelineItemPosition::Start { origin },
111 room_data_provider,
112 settings,
113 &mut date_divider_adjuster,
114 )
115 .await;
116 }
117
118 VectorDiff::PushBack { value: event } => {
119 self.handle_remote_event(
120 event,
121 TimelineItemPosition::End { origin },
122 room_data_provider,
123 settings,
124 &mut date_divider_adjuster,
125 )
126 .await;
127 }
128
129 VectorDiff::Insert { index: event_index, value: event } => {
130 self.handle_remote_event(
131 event,
132 TimelineItemPosition::At { event_index, origin },
133 room_data_provider,
134 settings,
135 &mut date_divider_adjuster,
136 )
137 .await;
138 }
139
140 VectorDiff::Set { index: event_index, value: event } => {
141 if let Some(timeline_item_index) = self
142 .items
143 .all_remote_events()
144 .get(event_index)
145 .and_then(|meta| meta.timeline_item_index)
146 {
147 self.handle_remote_event(
148 event,
149 TimelineItemPosition::UpdateAt { timeline_item_index },
150 room_data_provider,
151 settings,
152 &mut date_divider_adjuster,
153 )
154 .await;
155 } else {
156 warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
157 }
158 }
159
160 VectorDiff::Remove { index: event_index } => {
161 self.remove_timeline_item(event_index, &mut date_divider_adjuster);
162 }
163
164 VectorDiff::Clear => {
165 self.clear();
166 }
167
168 v => unimplemented!("{v:?}"),
169 }
170 }
171
172 self.adjust_date_dividers(date_divider_adjuster);
173 self.check_invariants();
174 }
175
176 fn check_invariants(&self) {
177 self.check_no_duplicate_read_receipts();
178 self.check_no_unused_unique_ids();
179 }
180
181 fn check_no_duplicate_read_receipts(&self) {
182 let mut by_user_id = HashMap::new();
183 let mut duplicates = HashSet::new();
184
185 for item in self.items.iter().filter_map(|item| item.as_event()) {
186 if let Some(event_id) = item.event_id() {
187 for (user_id, _read_receipt) in item.read_receipts() {
188 if let Some(prev_event_id) = by_user_id.insert(user_id, event_id) {
189 duplicates.insert((user_id.clone(), prev_event_id, event_id));
190 }
191 }
192 }
193 }
194
195 if !duplicates.is_empty() {
196 #[cfg(any(debug_assertions, test))]
197 panic!("duplicate read receipts in this timeline:{:?}\n{:?}", duplicates, self.items);
198
199 #[cfg(not(any(debug_assertions, test)))]
200 tracing::error!(
201 "duplicate read receipts in this timeline:{:?}\n{:?}",
202 duplicates,
203 self.items
204 );
205 }
206 }
207
208 fn check_no_unused_unique_ids(&self) {
209 let duplicates = self
210 .items
211 .iter()
212 .duplicates_by(|item| item.unique_id())
213 .map(|item| item.unique_id())
214 .collect::<Vec<_>>();
215
216 if !duplicates.is_empty() {
217 #[cfg(any(debug_assertions, test))]
218 panic!("duplicate unique ids in this timeline:{:?}\n{:?}", duplicates, self.items);
219
220 #[cfg(not(any(debug_assertions, test)))]
221 tracing::error!(
222 "duplicate unique ids in this timeline:{:?}\n{:?}",
223 duplicates,
224 self.items
225 );
226 }
227 }
228
229 pub(super) async fn handle_remote_event<P: RoomDataProvider>(
233 &mut self,
234 event: TimelineEvent,
235 position: TimelineItemPosition,
236 room_data_provider: &P,
237 settings: &TimelineSettings,
238 date_divider_adjuster: &mut DateDividerAdjuster,
239 ) -> HandleEventResult {
240 let TimelineEvent { push_actions, kind } = event;
241 let encryption_info = kind.encryption_info().cloned();
242
243 let (raw, utd_info) = match kind {
244 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
245 utd_info,
246 event,
247 } => (event, Some(utd_info)),
248 _ => (kind.into_raw(), None),
249 };
250
251 let (event_id, sender, timestamp, txn_id, event_kind, should_add) = match raw.deserialize()
252 {
253 Ok(event) => {
255 let event_id = event.event_id().to_owned();
256 let room_version = room_data_provider.room_version();
257
258 let mut should_add = (settings.event_filter)(&event, &room_version);
259
260 if should_add {
261 let origin = match position {
263 TimelineItemPosition::End { origin }
264 | TimelineItemPosition::Start { origin }
265 | TimelineItemPosition::At { origin, .. } => origin,
266
267 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
268 .items
269 .get(idx)
270 .and_then(|item| item.as_event())
271 .and_then(|item| item.as_remote())
272 .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
273 };
274
275 match self.timeline_focus {
279 TimelineFocusKind::PinnedEvents => {
280 should_add = room_data_provider.is_pinned_event(&event_id);
282 }
283 TimelineFocusKind::Live => {
284 match origin {
285 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => {
286 should_add = true;
289 }
290 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => {
291 }
293 }
294 }
295 TimelineFocusKind::Event => {
296 match origin {
297 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => {
298 should_add = false;
301 }
302 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => {
303 }
305 }
306 }
307 }
308 }
309
310 (
311 event_id,
312 event.sender().to_owned(),
313 event.origin_server_ts(),
314 event.transaction_id().map(ToOwned::to_owned),
315 TimelineEventKind::from_event(event, &raw, room_data_provider, utd_info).await,
316 should_add,
317 )
318 }
319
320 Err(e) => match raw.deserialize_as::<SyncTimelineEventWithoutContent>() {
322 Ok(event) if settings.add_failed_to_parse => (
325 event.event_id().to_owned(),
326 event.sender().to_owned(),
327 event.origin_server_ts(),
328 event.transaction_id().map(ToOwned::to_owned),
329 TimelineEventKind::failed_to_parse(event, e),
330 true,
331 ),
332
333 Ok(event) => {
336 let event_type = event.event_type();
337 let event_id = event.event_id();
338 warn!(%event_type, %event_id, "Failed to deserialize timeline event: {e}");
339
340 let is_own_event = event.sender() == room_data_provider.own_user_id();
341 let event_meta = FullEventMeta {
342 event_id,
343 sender: Some(event.sender()),
344 is_own_event,
345 timestamp: Some(event.origin_server_ts()),
346 visible: false,
347 };
348
349 self.add_or_update_remote_event(
352 event_meta,
353 position,
354 room_data_provider,
355 settings,
356 )
357 .await;
358
359 return HandleEventResult::default();
360 }
361
362 Err(e) => {
364 let event_type: Option<String> = raw.get_field("type").ok().flatten();
365 let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
366 warn!(
367 event_type,
368 event_id, "Failed to deserialize timeline event even without content: {e}"
369 );
370
371 let event_id = event_id.and_then(|s| EventId::parse(s).ok());
372
373 if let Some(event_id) = &event_id {
374 let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
375 let is_own_event =
376 sender.as_ref().is_some_and(|s| s == room_data_provider.own_user_id());
377 let timestamp: Option<MilliSecondsSinceUnixEpoch> =
378 raw.get_field("origin_server_ts").ok().flatten();
379
380 let event_meta = FullEventMeta {
381 event_id,
382 sender: sender.as_deref(),
383 is_own_event,
384 timestamp,
385 visible: false,
386 };
387
388 self.add_or_update_remote_event(
391 event_meta,
392 position,
393 room_data_provider,
394 settings,
395 )
396 .await;
397 }
398
399 return HandleEventResult::default();
400 }
401 },
402 };
403
404 let is_own_event = sender == room_data_provider.own_user_id();
405
406 let event_meta = FullEventMeta {
407 event_id: &event_id,
408 sender: Some(&sender),
409 is_own_event,
410 timestamp: Some(timestamp),
411 visible: should_add,
412 };
413
414 self.add_or_update_remote_event(event_meta, position, room_data_provider, settings).await;
417
418 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
419 let ctx = TimelineEventContext {
420 sender,
421 sender_profile,
422 timestamp,
423 is_own_event,
424 read_receipts: if settings.track_read_receipts && should_add {
425 self.meta.read_receipts.compute_event_receipts(
426 &event_id,
427 &mut self.items,
428 matches!(position, TimelineItemPosition::End { .. }),
429 )
430 } else {
431 Default::default()
432 },
433 is_highlighted: push_actions
434 .as_ref()
435 .is_some_and(|actions| actions.iter().any(Action::is_highlight)),
436 flow: Flow::Remote {
437 event_id: event_id.clone(),
438 raw_event: raw,
439 encryption_info,
440 txn_id,
441 position,
442 },
443 should_add_new_items: should_add,
444 };
445
446 TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, event_kind).await
448 }
449
450 fn remove_timeline_item(
452 &mut self,
453 event_index: usize,
454 day_divider_adjuster: &mut DateDividerAdjuster,
455 ) {
456 day_divider_adjuster.mark_used();
457
458 if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
468 if let Some(timeline_item_index) = event_meta.timeline_item_index {
470 let _ = self.items.remove(timeline_item_index);
471 }
472
473 self.items.remove_remote_event(event_index);
475 }
476 }
477
478 pub(super) fn clear(&mut self) {
479 let has_local_echoes = self.items.iter().any(|item| item.is_local_echo());
480
481 if has_local_echoes {
487 self.items.for_each(|entry| {
489 if entry.is_remote_event()
490 || entry.as_virtual().is_some_and(|vitem| match vitem {
491 VirtualTimelineItem::DateDivider(_) => false,
492 VirtualTimelineItem::ReadMarker | VirtualTimelineItem::TimelineStart => {
493 true
494 }
495 })
496 {
497 ObservableItemsTransactionEntry::remove(entry);
498 }
499 });
500
501 let mut idx = 0;
503 while idx < self.items.len() {
504 if self.items[idx].is_date_divider()
505 && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
506 {
507 self.items.remove(idx);
508 } else {
510 idx += 1;
511 }
512 }
513 } else {
514 self.items.clear();
515 }
516
517 self.meta.clear();
518
519 debug!(remaining_items = self.items.len(), "Timeline cleared");
520 }
521
522 #[instrument(skip_all)]
523 pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
524 if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
526 return;
527 }
528
529 self.meta.fully_read_event = Some(fully_read_event_id);
530 self.meta.update_read_marker(&mut self.items);
531 }
532
533 pub(super) fn commit(self) {
534 let previous_number_of_items = self.number_of_items_when_transaction_started;
536 let next_number_of_items = self.items.len();
537
538 if previous_number_of_items != next_number_of_items {
539 let count = self
540 .meta
541 .subscriber_skip_count
542 .compute_next(previous_number_of_items, next_number_of_items);
543 self.meta.subscriber_skip_count.update(count, &self.timeline_focus);
544 }
545
546 *self.previous_meta = self.meta;
548
549 self.items.commit();
550 }
551
552 async fn add_or_update_remote_event<P: RoomDataProvider>(
557 &mut self,
558 event_meta: FullEventMeta<'_>,
559 position: TimelineItemPosition,
560 room_data_provider: &P,
561 settings: &TimelineSettings,
562 ) {
563 match position {
564 TimelineItemPosition::Start { .. } => {
565 self.items.push_front_remote_event(event_meta.base_meta())
566 }
567
568 TimelineItemPosition::End { .. } => {
569 self.items.push_back_remote_event(event_meta.base_meta());
570 }
571
572 TimelineItemPosition::At { event_index, .. } => {
573 self.items.insert_remote_event(event_index, event_meta.base_meta());
574 }
575
576 TimelineItemPosition::UpdateAt { .. } => {
577 if let Some(event) =
578 self.items.get_remote_event_by_event_id_mut(event_meta.event_id)
579 {
580 if event.visible != event_meta.visible {
581 event.visible = event_meta.visible;
582
583 if settings.track_read_receipts {
584 self.maybe_update_read_receipts_of_prev_event(event_meta.event_id);
587 }
588 }
589 }
590 }
591 }
592
593 if settings.track_read_receipts
594 && matches!(
595 position,
596 TimelineItemPosition::Start { .. }
597 | TimelineItemPosition::End { .. }
598 | TimelineItemPosition::At { .. }
599 )
600 {
601 self.load_read_receipts_for_event(event_meta.event_id, room_data_provider).await;
602
603 self.maybe_add_implicit_read_receipt(event_meta);
604 }
605 }
606
607 pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
608 adjuster.run(&mut self.items, &mut self.meta);
609 }
610
611 pub(super) fn mark_all_events_as_encrypted(&mut self) {
615 for idx in 0..self.items.len() {
616 let item = &self.items[idx];
617
618 if let Some(event) = item.as_event() {
619 if event.is_room_encrypted {
620 continue;
621 }
622
623 let mut cloned_event = event.clone();
624 cloned_event.is_room_encrypted = true;
625
626 let item = item.with_kind(cloned_event);
628 self.items.replace(idx, item);
629 }
630 }
631 }
632}