1use eyeball_im::VectorDiff;
16use itertools::Itertools as _;
17use matrix_sdk::deserialized_responses::TimelineEvent;
18use ruma::{push::Action, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId};
19use tracing::{debug, instrument, warn};
20
21use super::{
22 super::{
23 controller::{FullEventMeta, ObservableItemsTransactionEntry},
24 date_dividers::DateDividerAdjuster,
25 event_handler::{
26 Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind,
27 TimelineItemPosition,
28 },
29 event_item::RemoteEventOrigin,
30 traits::RoomDataProvider,
31 },
32 ObservableItems, ObservableItemsTransaction, TimelineFocusKind, TimelineMetadata,
33 TimelineSettings,
34};
35use crate::events::SyncTimelineEventWithoutContent;
36
37pub(in crate::timeline) struct TimelineStateTransaction<'a> {
38 pub items: ObservableItemsTransaction<'a>,
41
42 number_of_items_when_transaction_started: usize,
44
45 pub meta: TimelineMetadata,
49
50 previous_meta: &'a mut TimelineMetadata,
52
53 pub(super) timeline_focus: TimelineFocusKind,
55}
56
57impl<'a> TimelineStateTransaction<'a> {
58 pub(super) fn new(
60 items: &'a mut ObservableItems,
61 meta: &'a mut TimelineMetadata,
62 timeline_focus: TimelineFocusKind,
63 ) -> Self {
64 let previous_meta = meta;
65 let meta = previous_meta.clone();
66 let items = items.transaction();
67
68 Self {
69 number_of_items_when_transaction_started: items.len(),
70 items,
71 previous_meta,
72 meta,
73 timeline_focus,
74 }
75 }
76
77 pub(super) async fn handle_remote_events_with_diffs<RoomData>(
79 &mut self,
80 diffs: Vec<VectorDiff<TimelineEvent>>,
81 origin: RemoteEventOrigin,
82 room_data_provider: &RoomData,
83 settings: &TimelineSettings,
84 ) where
85 RoomData: RoomDataProvider,
86 {
87 let mut date_divider_adjuster =
88 DateDividerAdjuster::new(settings.date_divider_mode.clone());
89
90 for diff in diffs {
91 match diff {
92 VectorDiff::Append { values: events } => {
93 for event in events {
94 self.handle_remote_event(
95 event,
96 TimelineItemPosition::End { origin },
97 room_data_provider,
98 settings,
99 &mut date_divider_adjuster,
100 )
101 .await;
102 }
103 }
104
105 VectorDiff::PushFront { value: event } => {
106 self.handle_remote_event(
107 event,
108 TimelineItemPosition::Start { origin },
109 room_data_provider,
110 settings,
111 &mut date_divider_adjuster,
112 )
113 .await;
114 }
115
116 VectorDiff::PushBack { value: event } => {
117 self.handle_remote_event(
118 event,
119 TimelineItemPosition::End { origin },
120 room_data_provider,
121 settings,
122 &mut date_divider_adjuster,
123 )
124 .await;
125 }
126
127 VectorDiff::Insert { index: event_index, value: event } => {
128 self.handle_remote_event(
129 event,
130 TimelineItemPosition::At { event_index, origin },
131 room_data_provider,
132 settings,
133 &mut date_divider_adjuster,
134 )
135 .await;
136 }
137
138 VectorDiff::Set { index: event_index, value: event } => {
139 if let Some(timeline_item_index) = self
140 .items
141 .all_remote_events()
142 .get(event_index)
143 .and_then(|meta| meta.timeline_item_index)
144 {
145 self.handle_remote_event(
146 event,
147 TimelineItemPosition::UpdateAt { timeline_item_index },
148 room_data_provider,
149 settings,
150 &mut date_divider_adjuster,
151 )
152 .await;
153 } else {
154 warn!(event_index, "Set update dropped because there wasn't any attached timeline item index.");
155 }
156 }
157
158 VectorDiff::Remove { index: event_index } => {
159 self.remove_timeline_item(event_index, &mut date_divider_adjuster);
160 }
161
162 VectorDiff::Clear => {
163 self.clear();
164 }
165
166 v => unimplemented!("{v:?}"),
167 }
168 }
169
170 self.adjust_date_dividers(date_divider_adjuster);
171 self.check_no_unused_unique_ids();
172 }
173
174 fn check_no_unused_unique_ids(&self) {
175 let duplicates = self
176 .items
177 .iter()
178 .duplicates_by(|item| item.unique_id())
179 .map(|item| item.unique_id())
180 .collect::<Vec<_>>();
181
182 if !duplicates.is_empty() {
183 #[cfg(any(debug_assertions, test))]
184 panic!("duplicate unique ids in this timeline:{:?}\n{:?}", duplicates, self.items);
185
186 #[cfg(not(any(debug_assertions, test)))]
187 tracing::error!(
188 "duplicate unique ids in this timeline:{:?}\n{:?}",
189 duplicates,
190 self.items
191 );
192 }
193 }
194
195 pub(super) async fn handle_remote_event<P: RoomDataProvider>(
199 &mut self,
200 event: TimelineEvent,
201 position: TimelineItemPosition,
202 room_data_provider: &P,
203 settings: &TimelineSettings,
204 date_divider_adjuster: &mut DateDividerAdjuster,
205 ) -> HandleEventResult {
206 let TimelineEvent { push_actions, kind } = event;
207 let encryption_info = kind.encryption_info().cloned();
208
209 let (raw, utd_info) = match kind {
210 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
211 utd_info,
212 event,
213 } => (event, Some(utd_info)),
214 _ => (kind.into_raw(), None),
215 };
216
217 let (event_id, sender, timestamp, txn_id, event_kind, should_add) = match raw.deserialize()
218 {
219 Ok(event) => {
221 let event_id = event.event_id().to_owned();
222 let room_version = room_data_provider.room_version();
223
224 let mut should_add = (settings.event_filter)(&event, &room_version);
225
226 if should_add {
227 let origin = match position {
229 TimelineItemPosition::End { origin }
230 | TimelineItemPosition::Start { origin }
231 | TimelineItemPosition::At { origin, .. } => origin,
232
233 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self
234 .items
235 .get(idx)
236 .and_then(|item| item.as_event())
237 .and_then(|item| item.as_remote())
238 .map_or(RemoteEventOrigin::Unknown, |item| item.origin),
239 };
240
241 match self.timeline_focus {
245 TimelineFocusKind::PinnedEvents => {
246 should_add = room_data_provider.is_pinned_event(&event_id);
248 }
249 TimelineFocusKind::Live => {
250 match origin {
251 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => {
252 should_add = true;
255 }
256 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => {
257 }
259 }
260 }
261 TimelineFocusKind::Event => {
262 match origin {
263 RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => {
264 should_add = false;
267 }
268 RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => {
269 }
271 }
272 }
273 }
274 }
275
276 (
277 event_id,
278 event.sender().to_owned(),
279 event.origin_server_ts(),
280 event.transaction_id().map(ToOwned::to_owned),
281 TimelineEventKind::from_event(event, &raw, room_data_provider, utd_info).await,
282 should_add,
283 )
284 }
285
286 Err(e) => match raw.deserialize_as::<SyncTimelineEventWithoutContent>() {
288 Ok(event) if settings.add_failed_to_parse => (
291 event.event_id().to_owned(),
292 event.sender().to_owned(),
293 event.origin_server_ts(),
294 event.transaction_id().map(ToOwned::to_owned),
295 TimelineEventKind::failed_to_parse(event, e),
296 true,
297 ),
298
299 Ok(event) => {
302 let event_type = event.event_type();
303 let event_id = event.event_id();
304 warn!(%event_type, %event_id, "Failed to deserialize timeline event: {e}");
305
306 let is_own_event = event.sender() == room_data_provider.own_user_id();
307 let event_meta = FullEventMeta {
308 event_id,
309 sender: Some(event.sender()),
310 is_own_event,
311 timestamp: Some(event.origin_server_ts()),
312 visible: false,
313 };
314
315 self.add_or_update_remote_event(
318 event_meta,
319 position,
320 room_data_provider,
321 settings,
322 )
323 .await;
324
325 return HandleEventResult::default();
326 }
327
328 Err(e) => {
330 let event_type: Option<String> = raw.get_field("type").ok().flatten();
331 let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
332 warn!(
333 event_type,
334 event_id, "Failed to deserialize timeline event even without content: {e}"
335 );
336
337 let event_id = event_id.and_then(|s| EventId::parse(s).ok());
338
339 if let Some(event_id) = &event_id {
340 let sender: Option<OwnedUserId> = raw.get_field("sender").ok().flatten();
341 let is_own_event =
342 sender.as_ref().is_some_and(|s| s == room_data_provider.own_user_id());
343 let timestamp: Option<MilliSecondsSinceUnixEpoch> =
344 raw.get_field("origin_server_ts").ok().flatten();
345
346 let event_meta = FullEventMeta {
347 event_id,
348 sender: sender.as_deref(),
349 is_own_event,
350 timestamp,
351 visible: false,
352 };
353
354 self.add_or_update_remote_event(
357 event_meta,
358 position,
359 room_data_provider,
360 settings,
361 )
362 .await;
363 }
364
365 return HandleEventResult::default();
366 }
367 },
368 };
369
370 let is_own_event = sender == room_data_provider.own_user_id();
371
372 let event_meta = FullEventMeta {
373 event_id: &event_id,
374 sender: Some(&sender),
375 is_own_event,
376 timestamp: Some(timestamp),
377 visible: should_add,
378 };
379
380 self.add_or_update_remote_event(event_meta, position, room_data_provider, settings).await;
383
384 let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
385 let ctx = TimelineEventContext {
386 sender,
387 sender_profile,
388 timestamp,
389 is_own_event,
390 read_receipts: if settings.track_read_receipts && should_add {
391 self.meta.read_receipts.compute_event_receipts(
392 &event_id,
393 self.items.all_remote_events(),
394 matches!(position, TimelineItemPosition::End { .. }),
395 )
396 } else {
397 Default::default()
398 },
399 is_highlighted: push_actions
400 .as_ref()
401 .is_some_and(|actions| actions.iter().any(Action::is_highlight)),
402 flow: Flow::Remote {
403 event_id: event_id.clone(),
404 raw_event: raw,
405 encryption_info,
406 txn_id,
407 position,
408 },
409 should_add_new_items: should_add,
410 };
411
412 TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, event_kind).await
414 }
415
416 fn remove_timeline_item(
418 &mut self,
419 event_index: usize,
420 day_divider_adjuster: &mut DateDividerAdjuster,
421 ) {
422 day_divider_adjuster.mark_used();
423
424 if let Some(event_meta) = self.items.all_remote_events().get(event_index) {
434 if let Some(timeline_item_index) = event_meta.timeline_item_index {
436 let _ = self.items.remove(timeline_item_index);
437 }
438
439 self.items.remove_remote_event(event_index);
441 }
442 }
443
444 pub(super) fn clear(&mut self) {
445 let has_local_echoes = self.items.iter().any(|item| item.is_local_echo());
446
447 if has_local_echoes {
453 self.items.for_each(|entry| {
455 if entry.is_remote_event() || entry.is_read_marker() {
456 ObservableItemsTransactionEntry::remove(entry);
457 }
458 });
459
460 let mut idx = 0;
462 while idx < self.items.len() {
463 if self.items[idx].is_date_divider()
464 && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider())
465 {
466 self.items.remove(idx);
467 } else {
469 idx += 1;
470 }
471 }
472 } else {
473 self.items.clear();
474 }
475
476 self.meta.clear();
477
478 debug!(remaining_items = self.items.len(), "Timeline cleared");
479 }
480
481 #[instrument(skip_all)]
482 pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) {
483 if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) {
485 return;
486 }
487
488 self.meta.fully_read_event = Some(fully_read_event_id);
489 self.meta.update_read_marker(&mut self.items);
490 }
491
492 pub(super) fn commit(self) {
493 let previous_number_of_items = self.number_of_items_when_transaction_started;
495 let next_number_of_items = self.items.len();
496
497 if previous_number_of_items != next_number_of_items {
498 let count = self
499 .meta
500 .subscriber_skip_count
501 .compute_next(previous_number_of_items, next_number_of_items);
502 self.meta.subscriber_skip_count.update(count, &self.timeline_focus);
503 }
504
505 *self.previous_meta = self.meta;
507
508 self.items.commit();
509 }
510
511 async fn add_or_update_remote_event<P: RoomDataProvider>(
516 &mut self,
517 event_meta: FullEventMeta<'_>,
518 position: TimelineItemPosition,
519 room_data_provider: &P,
520 settings: &TimelineSettings,
521 ) {
522 match position {
523 TimelineItemPosition::Start { .. } => {
524 self.items.push_front_remote_event(event_meta.base_meta())
525 }
526
527 TimelineItemPosition::End { .. } => {
528 self.items.push_back_remote_event(event_meta.base_meta());
529 }
530
531 TimelineItemPosition::At { event_index, .. } => {
532 self.items.insert_remote_event(event_index, event_meta.base_meta());
533 }
534
535 TimelineItemPosition::UpdateAt { .. } => {
536 if let Some(event) =
537 self.items.get_remote_event_by_event_id_mut(event_meta.event_id)
538 {
539 if event.visible != event_meta.visible {
540 event.visible = event_meta.visible;
541
542 if settings.track_read_receipts {
543 self.maybe_update_read_receipts_of_prev_event(event_meta.event_id);
546 }
547 }
548 }
549 }
550 }
551
552 if settings.track_read_receipts
553 && matches!(
554 position,
555 TimelineItemPosition::Start { .. }
556 | TimelineItemPosition::End { .. }
557 | TimelineItemPosition::At { .. }
558 )
559 {
560 self.load_read_receipts_for_event(event_meta.event_id, room_data_provider).await;
561
562 self.maybe_add_implicit_read_receipt(event_meta);
563 }
564 }
565
566 pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) {
567 adjuster.run(&mut self.items, &mut self.meta);
568 }
569
570 pub(super) fn mark_all_events_as_encrypted(&mut self) {
574 for idx in 0..self.items.len() {
575 let item = &self.items[idx];
576
577 if let Some(event) = item.as_event() {
578 if event.is_room_encrypted {
579 continue;
580 }
581
582 let mut cloned_event = event.clone();
583 cloned_event.is_room_encrypted = true;
584
585 let item = item.with_kind(cloned_event);
587 self.items.replace(idx, item);
588 }
589 }
590 }
591}