1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::Result;
25use matrix_sdk::{
26 deserialized_responses::TimelineEvent,
27 event_cache::{RoomEventCache, RoomPaginationStatus},
28 paginators::{PaginationResult, Paginator},
29 send_queue::{
30 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31 },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40 AnySyncTimelineEvent, MessageLikeEventType,
41 poll::unstable_start::UnstablePollStartEventContent,
42 reaction::ReactionEventContent,
43 receipt::{Receipt, ReceiptThread, ReceiptType},
44 relation::Annotation,
45 room::message::{MessageType, Relation},
46 },
47 room_version_rules::RoomVersionRules,
48 serde::Raw,
49};
50use tokio::sync::{RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54 metadata::{RelativePosition, TimelineMetadata},
55 observable_items::{
56 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57 ObservableItemsTransactionEntry,
58 },
59 state::TimelineState,
60 state_transaction::TimelineStateTransaction,
61};
62use super::{
63 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
66 algorithms::{rfind_event_by_id, rfind_event_item},
67 event_item::{ReactionStatus, RemoteEventOrigin},
68 item::TimelineUniqueId,
69 subscriber::TimelineSubscriber,
70 traits::RoomDataProvider,
71};
72use crate::{
73 timeline::{
74 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75 algorithms::rfind_event_by_item_id,
76 date_dividers::DateDividerAdjuster,
77 event_item::TimelineItemHandle,
78 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79 },
80 unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93
94#[derive(Debug)]
100pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
101 Live {
103 hide_threaded_events: bool,
105 },
106
107 Event {
110 paginator: Paginator<P>,
112
113 hide_threaded_events: bool,
115 },
116
117 Thread {
119 root_event_id: OwnedEventId,
121 },
122
123 PinnedEvents {
124 loader: PinnedEventsLoader,
125 },
126}
127
128impl<P: RoomDataProvider> TimelineFocusKind<P> {
129 pub(super) fn receipt_thread(&self) -> ReceiptThread {
136 match self {
137 TimelineFocusKind::Live { hide_threaded_events }
138 | TimelineFocusKind::Event { hide_threaded_events, .. } => {
139 if *hide_threaded_events {
140 ReceiptThread::Main
141 } else {
142 ReceiptThread::Unthreaded
143 }
144 }
145 TimelineFocusKind::Thread { root_event_id } => {
146 ReceiptThread::Thread(root_event_id.clone())
147 }
148 TimelineFocusKind::PinnedEvents { .. } => ReceiptThread::Unthreaded,
149 }
150 }
151}
152
153#[derive(Clone, Debug)]
154pub(super) struct TimelineController<P: RoomDataProvider = Room> {
155 state: Arc<RwLock<TimelineState<P>>>,
157
158 focus: Arc<TimelineFocusKind<P>>,
160
161 pub(crate) room_data_provider: P,
166
167 pub(super) settings: TimelineSettings,
169
170 decryption_retry_task: DecryptionRetryTask<P, P>,
173}
174
175#[derive(Clone)]
176pub(super) struct TimelineSettings {
177 pub(super) track_read_receipts: bool,
179
180 pub(super) event_filter: Arc<TimelineEventFilterFn>,
183
184 pub(super) add_failed_to_parse: bool,
186
187 pub(super) date_divider_mode: DateDividerMode,
189}
190
191#[cfg(not(tarpaulin_include))]
192impl fmt::Debug for TimelineSettings {
193 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194 f.debug_struct("TimelineSettings")
195 .field("track_read_receipts", &self.track_read_receipts)
196 .field("add_failed_to_parse", &self.add_failed_to_parse)
197 .finish_non_exhaustive()
198 }
199}
200
201impl Default for TimelineSettings {
202 fn default() -> Self {
203 Self {
204 track_read_receipts: false,
205 event_filter: Arc::new(default_event_filter),
206 add_failed_to_parse: true,
207 date_divider_mode: DateDividerMode::Daily,
208 }
209 }
210}
211
212pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
222 match event {
223 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
224 if ev.redacts(&rules.redaction).is_some() {
225 false
228 } else {
229 ev.event_type() != MessageLikeEventType::Reaction
232 }
233 }
234
235 AnySyncTimelineEvent::MessageLike(msg) => {
236 match msg.original_content() {
237 None => {
238 msg.event_type() != MessageLikeEventType::Reaction
241 }
242
243 Some(original_content) => {
244 match original_content {
245 AnyMessageLikeEventContent::RoomMessage(content) => {
246 if content
247 .relates_to
248 .as_ref()
249 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
250 {
251 return false;
253 }
254
255 match content.msgtype {
256 MessageType::Audio(_)
257 | MessageType::Emote(_)
258 | MessageType::File(_)
259 | MessageType::Image(_)
260 | MessageType::Location(_)
261 | MessageType::Notice(_)
262 | MessageType::ServerNotice(_)
263 | MessageType::Text(_)
264 | MessageType::Video(_)
265 | MessageType::VerificationRequest(_) => true,
266 #[cfg(feature = "unstable-msc4274")]
267 MessageType::Gallery(_) => true,
268 _ => false,
269 }
270 }
271
272 AnyMessageLikeEventContent::Sticker(_)
273 | AnyMessageLikeEventContent::UnstablePollStart(
274 UnstablePollStartEventContent::New(_),
275 )
276 | AnyMessageLikeEventContent::CallInvite(_)
277 | AnyMessageLikeEventContent::CallNotify(_)
278 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
279
280 _ => false,
281 }
282 }
283 }
284 }
285
286 AnySyncTimelineEvent::State(_) => {
287 true
289 }
290 }
291}
292
293impl<P: RoomDataProvider> TimelineController<P> {
294 pub(super) fn new(
295 room_data_provider: P,
296 focus: TimelineFocus,
297 internal_id_prefix: Option<String>,
298 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
299 is_room_encrypted: bool,
300 settings: TimelineSettings,
301 ) -> Self {
302 let focus = match focus {
303 TimelineFocus::Live { hide_threaded_events } => {
304 TimelineFocusKind::Live { hide_threaded_events }
305 }
306
307 TimelineFocus::Event { hide_threaded_events, .. } => {
308 let paginator = Paginator::new(room_data_provider.clone());
309 TimelineFocusKind::Event { paginator, hide_threaded_events }
310 }
311
312 TimelineFocus::Thread { root_event_id, .. } => {
313 TimelineFocusKind::Thread { root_event_id }
314 }
315
316 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
317 TimelineFocusKind::PinnedEvents {
318 loader: PinnedEventsLoader::new(
319 Arc::new(room_data_provider.clone()),
320 max_events_to_load as usize,
321 max_concurrent_requests as usize,
322 ),
323 }
324 }
325 };
326
327 let focus = Arc::new(focus);
328 let state = Arc::new(RwLock::new(TimelineState::new(
329 focus.clone(),
330 room_data_provider.own_user_id().to_owned(),
331 room_data_provider.room_version_rules(),
332 internal_id_prefix,
333 unable_to_decrypt_hook,
334 is_room_encrypted,
335 )));
336
337 let decryption_retry_task =
338 DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
339
340 Self { state, focus, room_data_provider, settings, decryption_retry_task }
341 }
342
343 pub(super) async fn init_focus(
350 &self,
351 focus: &TimelineFocus,
352 room_event_cache: &RoomEventCache,
353 ) -> Result<bool, Error> {
354 match focus {
355 TimelineFocus::Live { .. } => {
356 let events = room_event_cache.events().await;
358
359 let has_events = !events.is_empty();
360
361 self.replace_with_initial_remote_events(
362 events.into_iter(),
363 RemoteEventOrigin::Cache,
364 )
365 .await;
366
367 match room_event_cache.pagination().status().get() {
368 RoomPaginationStatus::Idle { hit_timeline_start } => {
369 if hit_timeline_start {
370 self.insert_timeline_start_if_missing().await;
373 }
374 }
375 RoomPaginationStatus::Paginating => {}
376 }
377
378 Ok(has_events)
379 }
380
381 TimelineFocus::Event { target: event_id, num_context_events, .. } => {
382 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
383 unreachable!();
385 };
386
387 let start_from_result = paginator
389 .start_from(event_id, (*num_context_events).into())
390 .await
391 .map_err(PaginationError::Paginator)?;
392
393 let has_events = !start_from_result.events.is_empty();
394
395 self.replace_with_initial_remote_events(
396 start_from_result.events.into_iter(),
397 RemoteEventOrigin::Pagination,
398 )
399 .await;
400
401 Ok(has_events)
402 }
403
404 TimelineFocus::Thread { root_event_id, .. } => {
405 let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
406 let has_events = !events.is_empty();
407
408 let mut related_events = Vector::new();
412 for event_id in events.iter().filter_map(|event| event.event_id()) {
413 if let Some((_original, related)) =
414 room_event_cache.find_event_with_relations(&event_id, None).await
415 {
416 related_events.extend(related);
417 }
418 }
419
420 self.replace_with_initial_remote_events(
421 events.into_iter(),
422 RemoteEventOrigin::Cache,
423 )
424 .await;
425
426 if !related_events.is_empty() {
428 self.handle_remote_aggregations(
429 vec![VectorDiff::Append { values: related_events }],
430 RemoteEventOrigin::Cache,
431 )
432 .await;
433 }
434
435 Ok(has_events)
436 }
437
438 TimelineFocus::PinnedEvents { .. } => {
439 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
440 unreachable!();
442 };
443
444 let Some(loaded_events) =
445 loader.load_events().await.map_err(Error::PinnedEventsError)?
446 else {
447 return Ok(false);
449 };
450
451 let has_events = !loaded_events.is_empty();
452
453 self.replace_with_initial_remote_events(
454 loaded_events.into_iter(),
455 RemoteEventOrigin::Pagination,
456 )
457 .await;
458
459 Ok(has_events)
460 }
461 }
462 }
463
464 pub async fn handle_encryption_state_changes(&self) {
469 let mut room_info = self.room_data_provider.room_info();
470
471 let mark_encrypted = || async {
473 let mut state = self.state.write().await;
474 state.meta.is_room_encrypted = true;
475 state.mark_all_events_as_encrypted();
476 };
477
478 if room_info.get().encryption_state().is_encrypted() {
479 mark_encrypted().await;
482 return;
483 }
484
485 while let Some(info) = room_info.next().await {
486 if info.encryption_state().is_encrypted() {
487 mark_encrypted().await;
488 break;
491 }
492 }
493 }
494
495 pub(crate) async fn reload_pinned_events(
496 &self,
497 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
498 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
499 loader.load_events().await
500 } else {
501 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
502 }
503 }
504
505 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
514 let state = self.state.read().await;
515
516 let (count, needs) = state
517 .meta
518 .subscriber_skip_count
519 .compute_next_when_paginating_backwards(num_events.into());
520
521 let is_live_timeline = true;
523 state.meta.subscriber_skip_count.update(count, is_live_timeline);
524
525 needs
526 }
527
528 pub(super) async fn focused_paginate_backwards(
533 &self,
534 num_events: u16,
535 ) -> Result<bool, PaginationError> {
536 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
537 TimelineFocusKind::Live { .. }
538 | TimelineFocusKind::PinnedEvents { .. }
539 | TimelineFocusKind::Thread { .. } => {
540 return Err(PaginationError::NotSupported);
541 }
542 TimelineFocusKind::Event { paginator, .. } => paginator
543 .paginate_backward(num_events.into())
544 .await
545 .map_err(PaginationError::Paginator)?,
546 };
547
548 self.handle_remote_events_with_diffs(
551 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
552 RemoteEventOrigin::Pagination,
553 )
554 .await;
555
556 Ok(hit_end_of_timeline)
557 }
558
559 pub(super) async fn focused_paginate_forwards(
564 &self,
565 num_events: u16,
566 ) -> Result<bool, PaginationError> {
567 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
568 TimelineFocusKind::Live { .. }
569 | TimelineFocusKind::PinnedEvents { .. }
570 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
571
572 TimelineFocusKind::Event { paginator, .. } => paginator
573 .paginate_forward(num_events.into())
574 .await
575 .map_err(PaginationError::Paginator)?,
576 };
577
578 self.handle_remote_events_with_diffs(
581 vec![VectorDiff::Append { values: events.into() }],
582 RemoteEventOrigin::Pagination,
583 )
584 .await;
585
586 Ok(hit_end_of_timeline)
587 }
588
589 pub(super) fn is_live(&self) -> bool {
591 matches!(&*self.focus, TimelineFocusKind::Live { .. })
592 }
593
594 pub(super) fn is_threaded(&self) -> bool {
596 matches!(&*self.focus, TimelineFocusKind::Thread { .. })
597 }
598
599 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
600 as_variant!(&*self.focus, TimelineFocusKind::Thread { root_event_id } => root_event_id.clone())
601 }
602
603 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
607 self.state.read().await.items.clone_items()
608 }
609
610 #[cfg(test)]
611 pub(super) async fn subscribe_raw(
612 &self,
613 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
614 self.state.read().await.items.subscribe().into_values_and_stream()
615 }
616
617 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
618 let state = self.state.read().await;
619
620 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
621 }
622
623 pub(super) async fn subscribe_filter_map<U, F>(
624 &self,
625 f: F,
626 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
627 where
628 U: Clone,
629 F: Fn(Arc<TimelineItem>) -> Option<U>,
630 {
631 self.state.read().await.items.subscribe().filter_map(f)
632 }
633
634 #[instrument(skip_all)]
638 pub(super) async fn toggle_reaction_local(
639 &self,
640 item_id: &TimelineEventItemId,
641 key: &str,
642 ) -> Result<bool, Error> {
643 let mut state = self.state.write().await;
644
645 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
646 warn!("Timeline item not found, can't add reaction");
647 return Err(Error::FailedToToggleReaction);
648 };
649
650 let user_id = self.room_data_provider.own_user_id();
651 let prev_status = item
652 .content()
653 .reactions()
654 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
655
656 let Some(prev_status) = prev_status else {
657 match item.handle() {
659 TimelineItemHandle::Local(send_handle) => {
660 if send_handle
661 .react(key.to_owned())
662 .await
663 .map_err(|err| Error::SendQueueError(err.into()))?
664 .is_some()
665 {
666 trace!("adding a reaction to a local echo");
667 return Ok(true);
668 }
669
670 warn!("couldn't toggle reaction for local echo");
671 return Ok(false);
672 }
673
674 TimelineItemHandle::Remote(event_id) => {
675 trace!("adding a reaction to a remote echo");
679 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
680 self.room_data_provider
681 .send(ReactionEventContent::from(annotation).into())
682 .await?;
683 return Ok(true);
684 }
685 }
686 };
687
688 trace!("removing a previous reaction");
689 match prev_status {
690 ReactionStatus::LocalToLocal(send_reaction_handle) => {
691 if let Some(handle) = send_reaction_handle {
692 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
693 warn!("unexpectedly unable to abort sending of local reaction");
696 }
697 } else {
698 warn!("no send reaction handle (this should only happen in testing contexts)");
699 }
700 }
701
702 ReactionStatus::LocalToRemote(send_handle) => {
703 trace!("aborting send of the previous reaction that was a local echo");
706 if let Some(handle) = send_handle {
707 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
708 warn!("unexpectedly unable to abort sending of local reaction");
711 }
712 } else {
713 warn!("no send handle (this should only happen in testing contexts)");
714 }
715 }
716
717 ReactionStatus::RemoteToRemote(event_id) => {
718 let Some(annotated_event_id) =
720 item.as_remote().map(|event_item| event_item.event_id.clone())
721 else {
722 warn!("remote reaction to remote event, but the associated item isn't remote");
723 return Ok(false);
724 };
725
726 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
727 let reaction_info = reactions.remove_reaction(user_id, key);
728
729 if reaction_info.is_some() {
730 let new_item = item.with_reactions(reactions);
731 state.items.replace(item_pos, new_item);
732 } else {
733 warn!(
734 "reaction is missing on the item, not removing it locally, \
735 but sending redaction."
736 );
737 }
738
739 drop(state);
741
742 trace!("sending redact for a previous reaction");
743 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
744 if let Some(reaction_info) = reaction_info {
745 debug!("sending redact failed, adding the reaction back to the list");
746
747 let mut state = self.state.write().await;
748 if let Some((item_pos, item)) =
749 rfind_event_by_id(&state.items, &annotated_event_id)
750 {
751 let mut reactions =
753 item.content().reactions().cloned().unwrap_or_default();
754 reactions
755 .entry(key.to_owned())
756 .or_default()
757 .insert(user_id.to_owned(), reaction_info);
758 let new_item = item.with_reactions(reactions);
759 state.items.replace(item_pos, new_item);
760 } else {
761 warn!(
762 "couldn't find item to re-add reaction anymore; \
763 maybe it's been redacted?"
764 );
765 }
766 }
767
768 return Err(err);
769 }
770 }
771 }
772
773 Ok(false)
774 }
775
776 pub(super) async fn handle_remote_events_with_diffs(
778 &self,
779 diffs: Vec<VectorDiff<TimelineEvent>>,
780 origin: RemoteEventOrigin,
781 ) {
782 if diffs.is_empty() {
783 return;
784 }
785
786 let mut state = self.state.write().await;
787 state
788 .handle_remote_events_with_diffs(
789 diffs,
790 origin,
791 &self.room_data_provider,
792 &self.settings,
793 )
794 .await
795 }
796
797 pub(super) async fn handle_remote_aggregations(
799 &self,
800 diffs: Vec<VectorDiff<TimelineEvent>>,
801 origin: RemoteEventOrigin,
802 ) {
803 if diffs.is_empty() {
804 return;
805 }
806
807 let mut state = self.state.write().await;
808 state
809 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
810 .await
811 }
812
813 pub(super) async fn clear(&self) {
814 self.state.write().await.clear();
815 }
816
817 pub(super) async fn replace_with_initial_remote_events<Events>(
825 &self,
826 events: Events,
827 origin: RemoteEventOrigin,
828 ) where
829 Events: IntoIterator + ExactSizeIterator,
830 <Events as IntoIterator>::Item: Into<TimelineEvent>,
831 {
832 let mut state = self.state.write().await;
833
834 let track_read_markers = self.settings.track_read_receipts;
835 if track_read_markers {
836 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
837 state
838 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
839 .await;
840 }
841
842 if !state.items.is_empty() || events.len() > 0 {
848 state
849 .replace_with_remote_events(
850 events,
851 origin,
852 &self.room_data_provider,
853 &self.settings,
854 )
855 .await;
856 }
857
858 if track_read_markers
859 && let Some(fully_read_event_id) =
860 self.room_data_provider.load_fully_read_marker().await
861 {
862 state.handle_fully_read_marker(fully_read_event_id);
863 }
864 }
865
866 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
867 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
868 }
869
870 pub(super) async fn handle_ephemeral_events(
871 &self,
872 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
873 ) {
874 let mut state = self.state.write().await;
875 state.handle_ephemeral_events(events, &self.room_data_provider).await;
876 }
877
878 #[instrument(skip_all)]
880 pub(super) async fn handle_local_event(
881 &self,
882 txn_id: OwnedTransactionId,
883 content: AnyMessageLikeEventContent,
884 send_handle: Option<SendHandle>,
885 ) {
886 let sender = self.room_data_provider.own_user_id().to_owned();
887 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
888
889 let date_divider_mode = self.settings.date_divider_mode.clone();
890
891 let mut state = self.state.write().await;
892 state
893 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
894 .await;
895 }
896
897 #[instrument(skip(self))]
902 pub(super) async fn update_event_send_state(
903 &self,
904 txn_id: &TransactionId,
905 send_state: EventSendState,
906 ) {
907 let mut state = self.state.write().await;
908 let mut txn = state.transaction();
909
910 let new_event_id: Option<&EventId> =
911 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
912
913 if rfind_event_item(&txn.items, |it| {
916 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
917 })
918 .is_some()
919 {
920 trace!("Remote echo received before send-event response");
922
923 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
924
925 if let Some((idx, _)) = local_echo {
929 warn!("Message echo got duplicated, removing the local one");
930 txn.items.remove(idx);
931
932 let mut adjuster =
934 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
935 adjuster.run(&mut txn.items, &mut txn.meta);
936 }
937
938 txn.commit();
939 return;
940 }
941
942 let result = rfind_event_item(&txn.items, |it| {
944 it.transaction_id() == Some(txn_id)
945 || new_event_id.is_some()
946 && it.event_id() == new_event_id
947 && it.as_local().is_some()
948 });
949
950 let Some((idx, item)) = result else {
951 if let Some(new_event_id) = new_event_id {
956 if txn.meta.aggregations.mark_aggregation_as_sent(
957 txn_id.to_owned(),
958 new_event_id.to_owned(),
959 &mut txn.items,
960 &txn.meta.room_version_rules,
961 ) {
962 trace!("Aggregation marked as sent");
963 txn.commit();
964 return;
965 }
966
967 trace!("Sent aggregation was not found");
968 }
969
970 warn!("Timeline item not found, can't update send state");
971 return;
972 };
973
974 let Some(local_item) = item.as_local() else {
975 warn!("We looked for a local item, but it transitioned to remote.");
976 return;
977 };
978
979 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
982 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
983 }
984
985 if let Some(new_event_id) = new_event_id {
988 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
989 }
990
991 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
992 txn.items.replace(idx, new_item);
993
994 txn.commit();
995 }
996
997 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
998 let mut state = self.state.write().await;
999
1000 if let Some((idx, _)) =
1001 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1002 {
1003 let mut txn = state.transaction();
1004
1005 txn.items.remove(idx);
1006
1007 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1010 adjuster.run(&mut txn.items, &mut txn.meta);
1011
1012 txn.meta.update_read_marker(&mut txn.items);
1013
1014 txn.commit();
1015
1016 debug!("discarded local echo");
1017 return true;
1018 }
1019
1020 let mut txn = state.transaction();
1023
1024 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1026 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1027 &mut txn.items,
1028 ) {
1029 Ok(val) => val,
1030 Err(err) => {
1031 warn!("error when discarding local echo for an aggregation: {err}");
1032 true
1034 }
1035 };
1036
1037 if found_aggregation {
1038 txn.commit();
1039 }
1040
1041 found_aggregation
1042 }
1043
1044 pub(super) async fn replace_local_echo(
1045 &self,
1046 txn_id: &TransactionId,
1047 content: AnyMessageLikeEventContent,
1048 ) -> bool {
1049 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1050 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1055 return false;
1056 };
1057
1058 let mut state = self.state.write().await;
1059 let mut txn = state.transaction();
1060
1061 let Some((idx, prev_item)) =
1062 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1063 else {
1064 debug!("Can't find local echo to replace");
1065 return false;
1066 };
1067
1068 let ti_kind = {
1071 let Some(prev_local_item) = prev_item.as_local() else {
1072 warn!("We looked for a local item, but it transitioned as remote??");
1073 return false;
1074 };
1075 let progress = as_variant!(&prev_local_item.send_state,
1077 EventSendState::NotSentYet { progress } => progress.clone())
1078 .flatten();
1079 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1080 };
1081
1082 let new_item = TimelineItem::new(
1084 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1085 content.msgtype,
1086 content.mentions,
1087 prev_item.content().reactions().cloned().unwrap_or_default(),
1088 prev_item.content().thread_root(),
1089 prev_item.content().in_reply_to(),
1090 prev_item.content().thread_summary(),
1091 )),
1092 prev_item.internal_id.to_owned(),
1093 );
1094
1095 txn.items.replace(idx, new_item);
1096
1097 txn.commit();
1101
1102 debug!("Replaced local echo");
1103 true
1104 }
1105
1106 pub(crate) async fn retry_event_decryption_inner(&self, session_ids: Option<BTreeSet<String>>) {
1107 self.decryption_retry_task
1108 .decrypt(self.room_data_provider.clone(), session_ids, self.settings.clone())
1109 .await;
1110 }
1111
1112 pub(super) async fn set_sender_profiles_pending(&self) {
1113 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1114 }
1115
1116 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1117 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1118 }
1119
1120 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1121 self.state.write().await.items.for_each(|mut entry| {
1122 let Some(event_item) = entry.as_event() else { return };
1123 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1124 let new_item = entry.with_kind(TimelineItemKind::Event(
1125 event_item.with_sender_profile(profile_state.clone()),
1126 ));
1127 ObservableItemsEntry::replace(&mut entry, new_item);
1128 }
1129 });
1130 }
1131
1132 pub(super) async fn update_missing_sender_profiles(&self) {
1133 trace!("Updating missing sender profiles");
1134
1135 let mut state = self.state.write().await;
1136 let mut entries = state.items.entries();
1137 while let Some(mut entry) = entries.next() {
1138 let Some(event_item) = entry.as_event() else { continue };
1139 let event_id = event_item.event_id().map(debug);
1140 let transaction_id = event_item.transaction_id().map(debug);
1141
1142 if event_item.sender_profile().is_ready() {
1143 trace!(event_id, transaction_id, "Profile already set");
1144 continue;
1145 }
1146
1147 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1148 Some(profile) => {
1149 trace!(event_id, transaction_id, "Adding profile");
1150 let updated_item =
1151 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1152 let new_item = entry.with_kind(updated_item);
1153 ObservableItemsEntry::replace(&mut entry, new_item);
1154 }
1155 None => {
1156 if !event_item.sender_profile().is_unavailable() {
1157 trace!(event_id, transaction_id, "Marking profile unavailable");
1158 let updated_item =
1159 event_item.with_sender_profile(TimelineDetails::Unavailable);
1160 let new_item = entry.with_kind(updated_item);
1161 ObservableItemsEntry::replace(&mut entry, new_item);
1162 } else {
1163 debug!(event_id, transaction_id, "Profile already marked unavailable");
1164 }
1165 }
1166 }
1167 }
1168
1169 trace!("Done updating missing sender profiles");
1170 }
1171
1172 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1174 trace!("Forcing update of sender profiles: {sender_ids:?}");
1175
1176 let mut state = self.state.write().await;
1177 let mut entries = state.items.entries();
1178 while let Some(mut entry) = entries.next() {
1179 let Some(event_item) = entry.as_event() else { continue };
1180 if !sender_ids.contains(event_item.sender()) {
1181 continue;
1182 }
1183
1184 let event_id = event_item.event_id().map(debug);
1185 let transaction_id = event_item.transaction_id().map(debug);
1186
1187 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1188 Some(profile) => {
1189 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1190 {
1191 debug!(event_id, transaction_id, "Profile already up-to-date");
1192 } else {
1193 trace!(event_id, transaction_id, "Updating profile");
1194 let updated_item =
1195 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1196 let new_item = entry.with_kind(updated_item);
1197 ObservableItemsEntry::replace(&mut entry, new_item);
1198 }
1199 }
1200 None => {
1201 if !event_item.sender_profile().is_unavailable() {
1202 trace!(event_id, transaction_id, "Marking profile unavailable");
1203 let updated_item =
1204 event_item.with_sender_profile(TimelineDetails::Unavailable);
1205 let new_item = entry.with_kind(updated_item);
1206 ObservableItemsEntry::replace(&mut entry, new_item);
1207 } else {
1208 debug!(event_id, transaction_id, "Profile already marked unavailable");
1209 }
1210 }
1211 }
1212 }
1213
1214 trace!("Done forcing update of sender profiles");
1215 }
1216
1217 #[cfg(test)]
1218 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1219 let own_user_id = self.room_data_provider.own_user_id();
1220 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1221 }
1222
1223 pub(super) async fn latest_user_read_receipt(
1227 &self,
1228 user_id: &UserId,
1229 ) -> Option<(OwnedEventId, Receipt)> {
1230 let receipt_thread = self.focus.receipt_thread();
1231
1232 self.state
1233 .read()
1234 .await
1235 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1236 .await
1237 }
1238
1239 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1242 &self,
1243 user_id: &UserId,
1244 ) -> Option<OwnedEventId> {
1245 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1246 }
1247
1248 pub async fn subscribe_own_user_read_receipts_changed(
1250 &self,
1251 ) -> impl Stream<Item = ()> + use<P> {
1252 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1253 }
1254
1255 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1257 match echo.content {
1258 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1259 let content = match serialized_event.deserialize() {
1260 Ok(d) => d,
1261 Err(err) => {
1262 warn!("error deserializing local echo: {err}");
1263 return;
1264 }
1265 };
1266
1267 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1268 .await;
1269
1270 if let Some(send_error) = send_error {
1271 self.update_event_send_state(
1272 &echo.transaction_id,
1273 EventSendState::SendingFailed {
1274 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1275 send_error,
1276 ))),
1277 is_recoverable: false,
1278 },
1279 )
1280 .await;
1281 }
1282 }
1283
1284 LocalEchoContent::React { key, send_handle, applies_to } => {
1285 self.handle_local_reaction(key, send_handle, applies_to).await;
1286 }
1287 }
1288 }
1289
1290 #[instrument(skip(self, send_handle))]
1292 async fn handle_local_reaction(
1293 &self,
1294 reaction_key: String,
1295 send_handle: SendReactionHandle,
1296 applies_to: OwnedTransactionId,
1297 ) {
1298 let mut state = self.state.write().await;
1299 let mut tr = state.transaction();
1300
1301 let target = TimelineEventItemId::TransactionId(applies_to);
1302
1303 let reaction_txn_id = send_handle.transaction_id().to_owned();
1304 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1305 let aggregation = Aggregation::new(
1306 TimelineEventItemId::TransactionId(reaction_txn_id),
1307 AggregationKind::Reaction {
1308 key: reaction_key.clone(),
1309 sender: self.room_data_provider.own_user_id().to_owned(),
1310 timestamp: MilliSecondsSinceUnixEpoch::now(),
1311 reaction_status,
1312 },
1313 );
1314
1315 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1316 find_item_and_apply_aggregation(
1317 &tr.meta.aggregations,
1318 &mut tr.items,
1319 &target,
1320 aggregation,
1321 &tr.meta.room_version_rules,
1322 );
1323
1324 tr.commit();
1325 }
1326
1327 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1329 match update {
1330 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1331 self.handle_local_echo(echo).await;
1332 }
1333
1334 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1335 if !self.discard_local_echo(&transaction_id).await {
1336 warn!("couldn't find the local echo to discard");
1337 }
1338 }
1339
1340 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1341 let content = match new_content.deserialize() {
1342 Ok(d) => d,
1343 Err(err) => {
1344 warn!("error deserializing local echo (upon edit): {err}");
1345 return;
1346 }
1347 };
1348
1349 if !self.replace_local_echo(&transaction_id, content).await {
1350 warn!("couldn't find the local echo to replace");
1351 }
1352 }
1353
1354 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1355 self.update_event_send_state(
1356 &transaction_id,
1357 EventSendState::SendingFailed { error, is_recoverable },
1358 )
1359 .await;
1360 }
1361
1362 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1363 self.update_event_send_state(
1364 &transaction_id,
1365 EventSendState::NotSentYet { progress: None },
1366 )
1367 .await;
1368 }
1369
1370 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1371 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1372 .await;
1373 }
1374
1375 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1376 self.update_event_send_state(
1377 &related_to,
1378 EventSendState::NotSentYet {
1379 progress: Some(MediaUploadProgress { index, progress }),
1380 },
1381 )
1382 .await;
1383 }
1384 }
1385 }
1386
1387 pub async fn insert_timeline_start_if_missing(&self) {
1390 let mut state = self.state.write().await;
1391 let mut txn = state.transaction();
1392 txn.items.push_timeline_start_if_missing(
1393 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1394 );
1395 txn.commit();
1396 }
1397
1398 pub(super) async fn make_replied_to(
1404 &self,
1405 event: TimelineEvent,
1406 ) -> Result<Option<EmbeddedEvent>, Error> {
1407 let state = self.state.read().await;
1408 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1409 }
1410}
1411
1412impl TimelineController {
1413 pub(super) fn room(&self) -> &Room {
1414 &self.room_data_provider
1415 }
1416
1417 #[instrument(skip(self))]
1420 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1421 let state_guard = self.state.write().await;
1422 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1423 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1424 let remote_item = item
1425 .as_remote()
1426 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1427 .clone();
1428
1429 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1430 debug!("Event is not a message");
1431 return Ok(());
1432 };
1433 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1434 debug!("Event is not a reply");
1435 return Ok(());
1436 };
1437 if let TimelineDetails::Pending = &in_reply_to.event {
1438 debug!("Replied-to event is already being fetched");
1439 return Ok(());
1440 }
1441 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1442 debug!("Replied-to event has already been fetched");
1443 return Ok(());
1444 }
1445
1446 let internal_id = item.internal_id.to_owned();
1447 let item = item.clone();
1448 let event = fetch_replied_to_event(
1449 state_guard,
1450 &self.state,
1451 index,
1452 &item,
1453 internal_id,
1454 &msglike,
1455 &in_reply_to.event_id,
1456 self.room(),
1457 )
1458 .await?;
1459
1460 let mut state = self.state.write().await;
1463 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1464 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1465
1466 let TimelineItemContent::MsgLike(MsgLikeContent {
1469 kind: MsgLikeKind::Message(message),
1470 reactions,
1471 thread_root,
1472 in_reply_to,
1473 thread_summary,
1474 }) = item.content().clone()
1475 else {
1476 info!("Event is no longer a message (redacted?)");
1477 return Ok(());
1478 };
1479 let Some(in_reply_to) = in_reply_to else {
1480 warn!("Event no longer has a reply (bug?)");
1481 return Ok(());
1482 };
1483
1484 trace!("Updating in-reply-to details");
1487 let internal_id = item.internal_id.to_owned();
1488 let mut item = item.clone();
1489 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1490 kind: MsgLikeKind::Message(message),
1491 reactions,
1492 thread_root,
1493 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1494 thread_summary,
1495 }));
1496 state.items.replace(index, TimelineItem::new(item, internal_id));
1497
1498 Ok(())
1499 }
1500
1501 pub(super) fn infer_thread_for_read_receipt(
1507 &self,
1508 receipt_type: &SendReceiptType,
1509 ) -> ReceiptThread {
1510 if matches!(receipt_type, SendReceiptType::FullyRead) {
1511 ReceiptThread::Unthreaded
1512 } else {
1513 self.focus.receipt_thread()
1514 }
1515 }
1516
1517 pub(super) async fn should_send_receipt(
1521 &self,
1522 receipt_type: &SendReceiptType,
1523 receipt_thread: &ReceiptThread,
1524 event_id: &EventId,
1525 ) -> bool {
1526 let own_user_id = self.room().own_user_id();
1527 let state = self.state.read().await;
1528 let room = self.room();
1529
1530 match receipt_type {
1531 SendReceiptType::Read => {
1532 if let Some((old_pub_read, _)) = state
1533 .meta
1534 .user_receipt(
1535 own_user_id,
1536 ReceiptType::Read,
1537 receipt_thread.clone(),
1538 room,
1539 state.items.all_remote_events(),
1540 )
1541 .await
1542 {
1543 trace!(%old_pub_read, "found a previous public receipt");
1544 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1545 &old_pub_read,
1546 event_id,
1547 state.items.all_remote_events(),
1548 ) {
1549 trace!(
1550 "event referred to new receipt is {relative_pos:?} the previous receipt"
1551 );
1552 return relative_pos == RelativePosition::After;
1553 }
1554 }
1555 }
1556
1557 SendReceiptType::ReadPrivate => {
1560 if let Some((old_priv_read, _)) =
1561 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1562 {
1563 trace!(%old_priv_read, "found a previous private receipt");
1564 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1565 &old_priv_read,
1566 event_id,
1567 state.items.all_remote_events(),
1568 ) {
1569 trace!(
1570 "event referred to new receipt is {relative_pos:?} the previous receipt"
1571 );
1572 return relative_pos == RelativePosition::After;
1573 }
1574 }
1575 }
1576
1577 SendReceiptType::FullyRead => {
1578 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1579 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1580 &prev_event_id,
1581 event_id,
1582 state.items.all_remote_events(),
1583 )
1584 {
1585 return relative_pos == RelativePosition::After;
1586 }
1587 }
1588
1589 _ => {}
1590 }
1591
1592 true
1594 }
1595
1596 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1599 let state = self.state.read().await;
1600 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1601 }
1602
1603 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1604 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1605 self.retry_event_decryption_inner(session_ids).await
1606 }
1607
1608 pub(super) async fn map_pagination_status(
1616 &self,
1617 status: RoomPaginationStatus,
1618 ) -> RoomPaginationStatus {
1619 match status {
1620 RoomPaginationStatus::Idle { hit_timeline_start } => {
1621 if hit_timeline_start {
1622 let state = self.state.read().await;
1623 if state.meta.subscriber_skip_count.get() > 0 {
1627 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1628 }
1629 }
1630 }
1631 RoomPaginationStatus::Paginating => {}
1632 }
1633
1634 status
1636 }
1637}
1638
1639#[allow(clippy::too_many_arguments)]
1640async fn fetch_replied_to_event<P: RoomDataProvider>(
1641 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1642 state_lock: &RwLock<TimelineState<P>>,
1643 index: usize,
1644 item: &EventTimelineItem,
1645 internal_id: TimelineUniqueId,
1646 msglike: &MsgLikeContent,
1647 in_reply_to: &EventId,
1648 room: &Room,
1649) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1650 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1651 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1652 trace!("Found replied-to event locally");
1653 return Ok(details);
1654 }
1655
1656 trace!("Setting in-reply-to details to pending");
1659 let in_reply_to_details =
1660 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1661
1662 let event_item = item
1663 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1664
1665 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1666 state_guard.items.replace(index, new_timeline_item);
1667
1668 drop(state_guard);
1670
1671 trace!("Fetching replied-to event");
1672 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1673 Ok(timeline_event) => {
1674 let state = state_lock.read().await;
1675
1676 let replied_to_item =
1677 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1678
1679 if let Some(item) = replied_to_item {
1680 TimelineDetails::Ready(Box::new(item))
1681 } else {
1682 return Err(Error::UnsupportedEvent);
1684 }
1685 }
1686
1687 Err(e) => TimelineDetails::Error(Arc::new(e)),
1688 };
1689
1690 Ok(res)
1691}