1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::Result;
25use matrix_sdk::{
26 deserialized_responses::TimelineEvent,
27 event_cache::{RoomEventCache, RoomPaginationStatus},
28 paginators::{PaginationResult, Paginator},
29 send_queue::{
30 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31 },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40 AnySyncTimelineEvent, MessageLikeEventType,
41 poll::unstable_start::UnstablePollStartEventContent,
42 reaction::ReactionEventContent,
43 receipt::{Receipt, ReceiptThread, ReceiptType},
44 relation::Annotation,
45 room::message::{MessageType, Relation},
46 },
47 room_version_rules::RoomVersionRules,
48 serde::Raw,
49};
50use tokio::sync::{RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54 metadata::{RelativePosition, TimelineMetadata},
55 observable_items::{
56 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57 ObservableItemsTransactionEntry,
58 },
59 state::TimelineState,
60 state_transaction::TimelineStateTransaction,
61};
62use super::{
63 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
66 algorithms::{rfind_event_by_id, rfind_event_item},
67 event_item::{ReactionStatus, RemoteEventOrigin},
68 item::TimelineUniqueId,
69 subscriber::TimelineSubscriber,
70 traits::RoomDataProvider,
71};
72use crate::{
73 timeline::{
74 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75 algorithms::rfind_event_by_item_id,
76 date_dividers::DateDividerAdjuster,
77 event_item::TimelineItemHandle,
78 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79 },
80 unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93
94#[derive(Debug)]
100pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
101 Live {
103 hide_threaded_events: bool,
105 },
106
107 Event {
110 paginator: Paginator<P>,
112
113 hide_threaded_events: bool,
115 },
116
117 Thread {
119 root_event_id: OwnedEventId,
121 },
122
123 PinnedEvents {
124 loader: PinnedEventsLoader,
125 },
126}
127
128impl<P: RoomDataProvider> TimelineFocusKind<P> {
129 pub(super) fn receipt_thread(&self) -> ReceiptThread {
136 if let Some(thread_root) = self.thread_root() {
137 ReceiptThread::Thread(thread_root.to_owned())
138 } else if self.hide_threaded_events() {
139 ReceiptThread::Main
140 } else {
141 ReceiptThread::Unthreaded
142 }
143 }
144
145 fn hide_threaded_events(&self) -> bool {
147 match self {
148 TimelineFocusKind::Live { hide_threaded_events }
149 | TimelineFocusKind::Event { hide_threaded_events, .. } => *hide_threaded_events,
150 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
151 }
152 }
153
154 fn is_thread(&self) -> bool {
157 self.thread_root().is_some()
158 }
159
160 fn thread_root(&self) -> Option<&EventId> {
162 match self {
163 TimelineFocusKind::Live { .. }
164 | TimelineFocusKind::Event { paginator: _, hide_threaded_events: _ }
165 | TimelineFocusKind::PinnedEvents { .. } => None,
166 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
167 }
168 }
169}
170
171#[derive(Clone, Debug)]
172pub(super) struct TimelineController<P: RoomDataProvider = Room> {
173 state: Arc<RwLock<TimelineState<P>>>,
175
176 focus: Arc<TimelineFocusKind<P>>,
178
179 pub(crate) room_data_provider: P,
184
185 pub(super) settings: TimelineSettings,
187
188 decryption_retry_task: DecryptionRetryTask<P, P>,
191}
192
193#[derive(Clone)]
194pub(super) struct TimelineSettings {
195 pub(super) track_read_receipts: bool,
197
198 pub(super) event_filter: Arc<TimelineEventFilterFn>,
201
202 pub(super) add_failed_to_parse: bool,
204
205 pub(super) date_divider_mode: DateDividerMode,
207}
208
209#[cfg(not(tarpaulin_include))]
210impl fmt::Debug for TimelineSettings {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 f.debug_struct("TimelineSettings")
213 .field("track_read_receipts", &self.track_read_receipts)
214 .field("add_failed_to_parse", &self.add_failed_to_parse)
215 .finish_non_exhaustive()
216 }
217}
218
219impl Default for TimelineSettings {
220 fn default() -> Self {
221 Self {
222 track_read_receipts: false,
223 event_filter: Arc::new(default_event_filter),
224 add_failed_to_parse: true,
225 date_divider_mode: DateDividerMode::Daily,
226 }
227 }
228}
229
230pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
240 match event {
241 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
242 if ev.redacts(&rules.redaction).is_some() {
243 false
246 } else {
247 ev.event_type() != MessageLikeEventType::Reaction
250 }
251 }
252
253 AnySyncTimelineEvent::MessageLike(msg) => {
254 match msg.original_content() {
255 None => {
256 msg.event_type() != MessageLikeEventType::Reaction
259 }
260
261 Some(original_content) => {
262 match original_content {
263 AnyMessageLikeEventContent::RoomMessage(content) => {
264 if content
265 .relates_to
266 .as_ref()
267 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
268 {
269 return false;
271 }
272
273 match content.msgtype {
274 MessageType::Audio(_)
275 | MessageType::Emote(_)
276 | MessageType::File(_)
277 | MessageType::Image(_)
278 | MessageType::Location(_)
279 | MessageType::Notice(_)
280 | MessageType::ServerNotice(_)
281 | MessageType::Text(_)
282 | MessageType::Video(_)
283 | MessageType::VerificationRequest(_) => true,
284 #[cfg(feature = "unstable-msc4274")]
285 MessageType::Gallery(_) => true,
286 _ => false,
287 }
288 }
289
290 AnyMessageLikeEventContent::Sticker(_)
291 | AnyMessageLikeEventContent::UnstablePollStart(
292 UnstablePollStartEventContent::New(_),
293 )
294 | AnyMessageLikeEventContent::CallInvite(_)
295 | AnyMessageLikeEventContent::RtcNotification(_)
296 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
297
298 _ => false,
299 }
300 }
301 }
302 }
303
304 AnySyncTimelineEvent::State(_) => {
305 true
307 }
308 }
309}
310
311impl<P: RoomDataProvider> TimelineController<P> {
312 pub(super) fn new(
313 room_data_provider: P,
314 focus: TimelineFocus,
315 internal_id_prefix: Option<String>,
316 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
317 is_room_encrypted: bool,
318 settings: TimelineSettings,
319 ) -> Self {
320 let focus = match focus {
321 TimelineFocus::Live { hide_threaded_events } => {
322 TimelineFocusKind::Live { hide_threaded_events }
323 }
324
325 TimelineFocus::Event { hide_threaded_events, .. } => {
326 let paginator = Paginator::new(room_data_provider.clone());
327 TimelineFocusKind::Event { paginator, hide_threaded_events }
328 }
329
330 TimelineFocus::Thread { root_event_id, .. } => {
331 TimelineFocusKind::Thread { root_event_id }
332 }
333
334 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
335 TimelineFocusKind::PinnedEvents {
336 loader: PinnedEventsLoader::new(
337 Arc::new(room_data_provider.clone()),
338 max_events_to_load as usize,
339 max_concurrent_requests as usize,
340 ),
341 }
342 }
343 };
344
345 let focus = Arc::new(focus);
346 let state = Arc::new(RwLock::new(TimelineState::new(
347 focus.clone(),
348 room_data_provider.own_user_id().to_owned(),
349 room_data_provider.room_version_rules(),
350 internal_id_prefix,
351 unable_to_decrypt_hook,
352 is_room_encrypted,
353 )));
354
355 let decryption_retry_task =
356 DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
357
358 Self { state, focus, room_data_provider, settings, decryption_retry_task }
359 }
360
361 pub(super) async fn init_focus(
368 &self,
369 focus: &TimelineFocus,
370 room_event_cache: &RoomEventCache,
371 ) -> Result<bool, Error> {
372 match focus {
373 TimelineFocus::Live { .. } => {
374 let events = room_event_cache.events().await;
376
377 let has_events = !events.is_empty();
378
379 self.replace_with_initial_remote_events(
380 events.into_iter(),
381 RemoteEventOrigin::Cache,
382 )
383 .await;
384
385 match room_event_cache.pagination().status().get() {
386 RoomPaginationStatus::Idle { hit_timeline_start } => {
387 if hit_timeline_start {
388 self.insert_timeline_start_if_missing().await;
391 }
392 }
393 RoomPaginationStatus::Paginating => {}
394 }
395
396 Ok(has_events)
397 }
398
399 TimelineFocus::Event { target: event_id, num_context_events, .. } => {
400 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
401 unreachable!();
403 };
404
405 let start_from_result = paginator
407 .start_from(event_id, (*num_context_events).into())
408 .await
409 .map_err(PaginationError::Paginator)?;
410
411 let has_events = !start_from_result.events.is_empty();
412
413 self.replace_with_initial_remote_events(
414 start_from_result.events.into_iter(),
415 RemoteEventOrigin::Pagination,
416 )
417 .await;
418
419 Ok(has_events)
420 }
421
422 TimelineFocus::Thread { root_event_id, .. } => {
423 let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
424 let has_events = !events.is_empty();
425
426 let mut related_events = Vector::new();
430 for event_id in events.iter().filter_map(|event| event.event_id()) {
431 if let Some((_original, related)) =
432 room_event_cache.find_event_with_relations(&event_id, None).await
433 {
434 related_events.extend(related);
435 }
436 }
437
438 self.replace_with_initial_remote_events(
439 events.into_iter(),
440 RemoteEventOrigin::Cache,
441 )
442 .await;
443
444 if !related_events.is_empty() {
446 self.handle_remote_aggregations(
447 vec![VectorDiff::Append { values: related_events }],
448 RemoteEventOrigin::Cache,
449 )
450 .await;
451 }
452
453 Ok(has_events)
454 }
455
456 TimelineFocus::PinnedEvents { .. } => {
457 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
458 unreachable!();
460 };
461
462 let Some(loaded_events) =
463 loader.load_events().await.map_err(Error::PinnedEventsError)?
464 else {
465 return Ok(false);
467 };
468
469 let has_events = !loaded_events.is_empty();
470
471 self.replace_with_initial_remote_events(
472 loaded_events.into_iter(),
473 RemoteEventOrigin::Pagination,
474 )
475 .await;
476
477 Ok(has_events)
478 }
479 }
480 }
481
482 pub async fn handle_encryption_state_changes(&self) {
487 let mut room_info = self.room_data_provider.room_info();
488
489 let mark_encrypted = || async {
491 let mut state = self.state.write().await;
492 state.meta.is_room_encrypted = true;
493 state.mark_all_events_as_encrypted();
494 };
495
496 if room_info.get().encryption_state().is_encrypted() {
497 mark_encrypted().await;
500 return;
501 }
502
503 while let Some(info) = room_info.next().await {
504 if info.encryption_state().is_encrypted() {
505 mark_encrypted().await;
506 break;
509 }
510 }
511 }
512
513 pub(crate) async fn reload_pinned_events(
514 &self,
515 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
516 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
517 loader.load_events().await
518 } else {
519 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
520 }
521 }
522
523 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
532 let state = self.state.read().await;
533
534 let (count, needs) = state
535 .meta
536 .subscriber_skip_count
537 .compute_next_when_paginating_backwards(num_events.into());
538
539 let is_live_timeline = true;
541 state.meta.subscriber_skip_count.update(count, is_live_timeline);
542
543 needs
544 }
545
546 pub(super) async fn focused_paginate_backwards(
551 &self,
552 num_events: u16,
553 ) -> Result<bool, PaginationError> {
554 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
555 TimelineFocusKind::Live { .. }
556 | TimelineFocusKind::PinnedEvents { .. }
557 | TimelineFocusKind::Thread { .. } => {
558 return Err(PaginationError::NotSupported);
559 }
560 TimelineFocusKind::Event { paginator, .. } => paginator
561 .paginate_backward(num_events.into())
562 .await
563 .map_err(PaginationError::Paginator)?,
564 };
565
566 self.handle_remote_events_with_diffs(
569 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
570 RemoteEventOrigin::Pagination,
571 )
572 .await;
573
574 Ok(hit_end_of_timeline)
575 }
576
577 pub(super) async fn focused_paginate_forwards(
582 &self,
583 num_events: u16,
584 ) -> Result<bool, PaginationError> {
585 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
586 TimelineFocusKind::Live { .. }
587 | TimelineFocusKind::PinnedEvents { .. }
588 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
589
590 TimelineFocusKind::Event { paginator, .. } => paginator
591 .paginate_forward(num_events.into())
592 .await
593 .map_err(PaginationError::Paginator)?,
594 };
595
596 self.handle_remote_events_with_diffs(
599 vec![VectorDiff::Append { values: events.into() }],
600 RemoteEventOrigin::Pagination,
601 )
602 .await;
603
604 Ok(hit_end_of_timeline)
605 }
606
607 pub(super) fn is_live(&self) -> bool {
609 matches!(&*self.focus, TimelineFocusKind::Live { .. })
610 }
611
612 pub(super) fn is_threaded(&self) -> bool {
614 self.focus.is_thread()
615 }
616
617 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
620 self.focus.thread_root().map(ToOwned::to_owned)
621 }
622
623 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
627 self.state.read().await.items.clone_items()
628 }
629
630 #[cfg(test)]
631 pub(super) async fn subscribe_raw(
632 &self,
633 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
634 self.state.read().await.items.subscribe().into_values_and_stream()
635 }
636
637 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
638 let state = self.state.read().await;
639
640 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
641 }
642
643 pub(super) async fn subscribe_filter_map<U, F>(
644 &self,
645 f: F,
646 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
647 where
648 U: Clone,
649 F: Fn(Arc<TimelineItem>) -> Option<U>,
650 {
651 self.state.read().await.items.subscribe().filter_map(f)
652 }
653
654 #[instrument(skip_all)]
658 pub(super) async fn toggle_reaction_local(
659 &self,
660 item_id: &TimelineEventItemId,
661 key: &str,
662 ) -> Result<bool, Error> {
663 let mut state = self.state.write().await;
664
665 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
666 warn!("Timeline item not found, can't add reaction");
667 return Err(Error::FailedToToggleReaction);
668 };
669
670 let user_id = self.room_data_provider.own_user_id();
671 let prev_status = item
672 .content()
673 .reactions()
674 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
675
676 let Some(prev_status) = prev_status else {
677 match item.handle() {
679 TimelineItemHandle::Local(send_handle) => {
680 if send_handle
681 .react(key.to_owned())
682 .await
683 .map_err(|err| Error::SendQueueError(err.into()))?
684 .is_some()
685 {
686 trace!("adding a reaction to a local echo");
687 return Ok(true);
688 }
689
690 warn!("couldn't toggle reaction for local echo");
691 return Ok(false);
692 }
693
694 TimelineItemHandle::Remote(event_id) => {
695 trace!("adding a reaction to a remote echo");
699 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
700 self.room_data_provider
701 .send(ReactionEventContent::from(annotation).into())
702 .await?;
703 return Ok(true);
704 }
705 }
706 };
707
708 trace!("removing a previous reaction");
709 match prev_status {
710 ReactionStatus::LocalToLocal(send_reaction_handle) => {
711 if let Some(handle) = send_reaction_handle {
712 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
713 warn!("unexpectedly unable to abort sending of local reaction");
716 }
717 } else {
718 warn!("no send reaction handle (this should only happen in testing contexts)");
719 }
720 }
721
722 ReactionStatus::LocalToRemote(send_handle) => {
723 trace!("aborting send of the previous reaction that was a local echo");
726 if let Some(handle) = send_handle {
727 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
728 warn!("unexpectedly unable to abort sending of local reaction");
731 }
732 } else {
733 warn!("no send handle (this should only happen in testing contexts)");
734 }
735 }
736
737 ReactionStatus::RemoteToRemote(event_id) => {
738 let Some(annotated_event_id) =
740 item.as_remote().map(|event_item| event_item.event_id.clone())
741 else {
742 warn!("remote reaction to remote event, but the associated item isn't remote");
743 return Ok(false);
744 };
745
746 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
747 let reaction_info = reactions.remove_reaction(user_id, key);
748
749 if reaction_info.is_some() {
750 let new_item = item.with_reactions(reactions);
751 state.items.replace(item_pos, new_item);
752 } else {
753 warn!(
754 "reaction is missing on the item, not removing it locally, \
755 but sending redaction."
756 );
757 }
758
759 drop(state);
761
762 trace!("sending redact for a previous reaction");
763 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
764 if let Some(reaction_info) = reaction_info {
765 debug!("sending redact failed, adding the reaction back to the list");
766
767 let mut state = self.state.write().await;
768 if let Some((item_pos, item)) =
769 rfind_event_by_id(&state.items, &annotated_event_id)
770 {
771 let mut reactions =
773 item.content().reactions().cloned().unwrap_or_default();
774 reactions
775 .entry(key.to_owned())
776 .or_default()
777 .insert(user_id.to_owned(), reaction_info);
778 let new_item = item.with_reactions(reactions);
779 state.items.replace(item_pos, new_item);
780 } else {
781 warn!(
782 "couldn't find item to re-add reaction anymore; \
783 maybe it's been redacted?"
784 );
785 }
786 }
787
788 return Err(err);
789 }
790 }
791 }
792
793 Ok(false)
794 }
795
796 pub(super) async fn handle_remote_events_with_diffs(
798 &self,
799 diffs: Vec<VectorDiff<TimelineEvent>>,
800 origin: RemoteEventOrigin,
801 ) {
802 if diffs.is_empty() {
803 return;
804 }
805
806 let mut state = self.state.write().await;
807 state
808 .handle_remote_events_with_diffs(
809 diffs,
810 origin,
811 &self.room_data_provider,
812 &self.settings,
813 )
814 .await
815 }
816
817 pub(super) async fn handle_remote_aggregations(
819 &self,
820 diffs: Vec<VectorDiff<TimelineEvent>>,
821 origin: RemoteEventOrigin,
822 ) {
823 if diffs.is_empty() {
824 return;
825 }
826
827 let mut state = self.state.write().await;
828 state
829 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
830 .await
831 }
832
833 pub(super) async fn clear(&self) {
834 self.state.write().await.clear();
835 }
836
837 pub(super) async fn replace_with_initial_remote_events<Events>(
845 &self,
846 events: Events,
847 origin: RemoteEventOrigin,
848 ) where
849 Events: IntoIterator + ExactSizeIterator,
850 <Events as IntoIterator>::Item: Into<TimelineEvent>,
851 {
852 let mut state = self.state.write().await;
853
854 let track_read_markers = self.settings.track_read_receipts;
855 if track_read_markers {
856 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
857 state
858 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
859 .await;
860 }
861
862 if !state.items.is_empty() || events.len() > 0 {
868 state
869 .replace_with_remote_events(
870 events,
871 origin,
872 &self.room_data_provider,
873 &self.settings,
874 )
875 .await;
876 }
877
878 if track_read_markers
879 && let Some(fully_read_event_id) =
880 self.room_data_provider.load_fully_read_marker().await
881 {
882 state.handle_fully_read_marker(fully_read_event_id);
883 }
884 }
885
886 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
887 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
888 }
889
890 pub(super) async fn handle_ephemeral_events(
891 &self,
892 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
893 ) {
894 let mut state = self.state.write().await;
895 state.handle_ephemeral_events(events, &self.room_data_provider).await;
896 }
897
898 #[instrument(skip_all)]
900 pub(super) async fn handle_local_event(
901 &self,
902 txn_id: OwnedTransactionId,
903 content: AnyMessageLikeEventContent,
904 send_handle: Option<SendHandle>,
905 ) {
906 let sender = self.room_data_provider.own_user_id().to_owned();
907 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
908
909 let date_divider_mode = self.settings.date_divider_mode.clone();
910
911 let mut state = self.state.write().await;
912 state
913 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
914 .await;
915 }
916
917 #[instrument(skip(self))]
922 pub(super) async fn update_event_send_state(
923 &self,
924 txn_id: &TransactionId,
925 send_state: EventSendState,
926 ) {
927 let mut state = self.state.write().await;
928 let mut txn = state.transaction();
929
930 let new_event_id: Option<&EventId> =
931 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
932
933 if rfind_event_item(&txn.items, |it| {
936 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
937 })
938 .is_some()
939 {
940 trace!("Remote echo received before send-event response");
942
943 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
944
945 if let Some((idx, _)) = local_echo {
949 warn!("Message echo got duplicated, removing the local one");
950 txn.items.remove(idx);
951
952 let mut adjuster =
954 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
955 adjuster.run(&mut txn.items, &mut txn.meta);
956 }
957
958 txn.commit();
959 return;
960 }
961
962 let result = rfind_event_item(&txn.items, |it| {
964 it.transaction_id() == Some(txn_id)
965 || new_event_id.is_some()
966 && it.event_id() == new_event_id
967 && it.as_local().is_some()
968 });
969
970 let Some((idx, item)) = result else {
971 if let Some(new_event_id) = new_event_id {
976 if txn.meta.aggregations.mark_aggregation_as_sent(
977 txn_id.to_owned(),
978 new_event_id.to_owned(),
979 &mut txn.items,
980 &txn.meta.room_version_rules,
981 ) {
982 trace!("Aggregation marked as sent");
983 txn.commit();
984 return;
985 }
986
987 trace!("Sent aggregation was not found");
988 }
989
990 warn!("Timeline item not found, can't update send state");
991 return;
992 };
993
994 let Some(local_item) = item.as_local() else {
995 warn!("We looked for a local item, but it transitioned to remote.");
996 return;
997 };
998
999 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1002 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1003 }
1004
1005 if let Some(new_event_id) = new_event_id {
1008 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1009 }
1010
1011 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1012 txn.items.replace(idx, new_item);
1013
1014 txn.commit();
1015 }
1016
1017 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1018 let mut state = self.state.write().await;
1019
1020 if let Some((idx, _)) =
1021 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1022 {
1023 let mut txn = state.transaction();
1024
1025 txn.items.remove(idx);
1026
1027 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1030 adjuster.run(&mut txn.items, &mut txn.meta);
1031
1032 txn.meta.update_read_marker(&mut txn.items);
1033
1034 txn.commit();
1035
1036 debug!("discarded local echo");
1037 return true;
1038 }
1039
1040 let mut txn = state.transaction();
1043
1044 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1046 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1047 &mut txn.items,
1048 ) {
1049 Ok(val) => val,
1050 Err(err) => {
1051 warn!("error when discarding local echo for an aggregation: {err}");
1052 true
1054 }
1055 };
1056
1057 if found_aggregation {
1058 txn.commit();
1059 }
1060
1061 found_aggregation
1062 }
1063
1064 pub(super) async fn replace_local_echo(
1065 &self,
1066 txn_id: &TransactionId,
1067 content: AnyMessageLikeEventContent,
1068 ) -> bool {
1069 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1070 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1075 return false;
1076 };
1077
1078 let mut state = self.state.write().await;
1079 let mut txn = state.transaction();
1080
1081 let Some((idx, prev_item)) =
1082 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1083 else {
1084 debug!("Can't find local echo to replace");
1085 return false;
1086 };
1087
1088 let ti_kind = {
1091 let Some(prev_local_item) = prev_item.as_local() else {
1092 warn!("We looked for a local item, but it transitioned as remote??");
1093 return false;
1094 };
1095 let progress = as_variant!(&prev_local_item.send_state,
1097 EventSendState::NotSentYet { progress } => progress.clone())
1098 .flatten();
1099 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1100 };
1101
1102 let new_item = TimelineItem::new(
1104 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1105 content.msgtype,
1106 content.mentions,
1107 prev_item.content().reactions().cloned().unwrap_or_default(),
1108 prev_item.content().thread_root(),
1109 prev_item.content().in_reply_to(),
1110 prev_item.content().thread_summary(),
1111 )),
1112 prev_item.internal_id.to_owned(),
1113 );
1114
1115 txn.items.replace(idx, new_item);
1116
1117 txn.commit();
1121
1122 debug!("Replaced local echo");
1123 true
1124 }
1125
1126 pub(crate) async fn retry_event_decryption_inner(&self, session_ids: Option<BTreeSet<String>>) {
1127 self.decryption_retry_task
1128 .decrypt(self.room_data_provider.clone(), session_ids, self.settings.clone())
1129 .await;
1130 }
1131
1132 pub(super) async fn set_sender_profiles_pending(&self) {
1133 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1134 }
1135
1136 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1137 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1138 }
1139
1140 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1141 self.state.write().await.items.for_each(|mut entry| {
1142 let Some(event_item) = entry.as_event() else { return };
1143 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1144 let new_item = entry.with_kind(TimelineItemKind::Event(
1145 event_item.with_sender_profile(profile_state.clone()),
1146 ));
1147 ObservableItemsEntry::replace(&mut entry, new_item);
1148 }
1149 });
1150 }
1151
1152 pub(super) async fn update_missing_sender_profiles(&self) {
1153 trace!("Updating missing sender profiles");
1154
1155 let mut state = self.state.write().await;
1156 let mut entries = state.items.entries();
1157 while let Some(mut entry) = entries.next() {
1158 let Some(event_item) = entry.as_event() else { continue };
1159 let event_id = event_item.event_id().map(debug);
1160 let transaction_id = event_item.transaction_id().map(debug);
1161
1162 if event_item.sender_profile().is_ready() {
1163 trace!(event_id, transaction_id, "Profile already set");
1164 continue;
1165 }
1166
1167 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1168 Some(profile) => {
1169 trace!(event_id, transaction_id, "Adding profile");
1170 let updated_item =
1171 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1172 let new_item = entry.with_kind(updated_item);
1173 ObservableItemsEntry::replace(&mut entry, new_item);
1174 }
1175 None => {
1176 if !event_item.sender_profile().is_unavailable() {
1177 trace!(event_id, transaction_id, "Marking profile unavailable");
1178 let updated_item =
1179 event_item.with_sender_profile(TimelineDetails::Unavailable);
1180 let new_item = entry.with_kind(updated_item);
1181 ObservableItemsEntry::replace(&mut entry, new_item);
1182 } else {
1183 debug!(event_id, transaction_id, "Profile already marked unavailable");
1184 }
1185 }
1186 }
1187 }
1188
1189 trace!("Done updating missing sender profiles");
1190 }
1191
1192 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1194 trace!("Forcing update of sender profiles: {sender_ids:?}");
1195
1196 let mut state = self.state.write().await;
1197 let mut entries = state.items.entries();
1198 while let Some(mut entry) = entries.next() {
1199 let Some(event_item) = entry.as_event() else { continue };
1200 if !sender_ids.contains(event_item.sender()) {
1201 continue;
1202 }
1203
1204 let event_id = event_item.event_id().map(debug);
1205 let transaction_id = event_item.transaction_id().map(debug);
1206
1207 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1208 Some(profile) => {
1209 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1210 {
1211 debug!(event_id, transaction_id, "Profile already up-to-date");
1212 } else {
1213 trace!(event_id, transaction_id, "Updating profile");
1214 let updated_item =
1215 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1216 let new_item = entry.with_kind(updated_item);
1217 ObservableItemsEntry::replace(&mut entry, new_item);
1218 }
1219 }
1220 None => {
1221 if !event_item.sender_profile().is_unavailable() {
1222 trace!(event_id, transaction_id, "Marking profile unavailable");
1223 let updated_item =
1224 event_item.with_sender_profile(TimelineDetails::Unavailable);
1225 let new_item = entry.with_kind(updated_item);
1226 ObservableItemsEntry::replace(&mut entry, new_item);
1227 } else {
1228 debug!(event_id, transaction_id, "Profile already marked unavailable");
1229 }
1230 }
1231 }
1232 }
1233
1234 trace!("Done forcing update of sender profiles");
1235 }
1236
1237 #[cfg(test)]
1238 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1239 let own_user_id = self.room_data_provider.own_user_id();
1240 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1241 }
1242
1243 pub(super) async fn latest_user_read_receipt(
1247 &self,
1248 user_id: &UserId,
1249 ) -> Option<(OwnedEventId, Receipt)> {
1250 let receipt_thread = self.focus.receipt_thread();
1251
1252 self.state
1253 .read()
1254 .await
1255 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1256 .await
1257 }
1258
1259 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1262 &self,
1263 user_id: &UserId,
1264 ) -> Option<OwnedEventId> {
1265 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1266 }
1267
1268 pub async fn subscribe_own_user_read_receipts_changed(
1270 &self,
1271 ) -> impl Stream<Item = ()> + use<P> {
1272 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1273 }
1274
1275 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1277 match echo.content {
1278 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1279 let content = match serialized_event.deserialize() {
1280 Ok(d) => d,
1281 Err(err) => {
1282 warn!("error deserializing local echo: {err}");
1283 return;
1284 }
1285 };
1286
1287 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1288 .await;
1289
1290 if let Some(send_error) = send_error {
1291 self.update_event_send_state(
1292 &echo.transaction_id,
1293 EventSendState::SendingFailed {
1294 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1295 send_error,
1296 ))),
1297 is_recoverable: false,
1298 },
1299 )
1300 .await;
1301 }
1302 }
1303
1304 LocalEchoContent::React { key, send_handle, applies_to } => {
1305 self.handle_local_reaction(key, send_handle, applies_to).await;
1306 }
1307 }
1308 }
1309
1310 #[instrument(skip(self, send_handle))]
1312 async fn handle_local_reaction(
1313 &self,
1314 reaction_key: String,
1315 send_handle: SendReactionHandle,
1316 applies_to: OwnedTransactionId,
1317 ) {
1318 let mut state = self.state.write().await;
1319 let mut tr = state.transaction();
1320
1321 let target = TimelineEventItemId::TransactionId(applies_to);
1322
1323 let reaction_txn_id = send_handle.transaction_id().to_owned();
1324 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1325 let aggregation = Aggregation::new(
1326 TimelineEventItemId::TransactionId(reaction_txn_id),
1327 AggregationKind::Reaction {
1328 key: reaction_key.clone(),
1329 sender: self.room_data_provider.own_user_id().to_owned(),
1330 timestamp: MilliSecondsSinceUnixEpoch::now(),
1331 reaction_status,
1332 },
1333 );
1334
1335 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1336 find_item_and_apply_aggregation(
1337 &tr.meta.aggregations,
1338 &mut tr.items,
1339 &target,
1340 aggregation,
1341 &tr.meta.room_version_rules,
1342 );
1343
1344 tr.commit();
1345 }
1346
1347 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1349 match update {
1350 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1351 self.handle_local_echo(echo).await;
1352 }
1353
1354 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1355 if !self.discard_local_echo(&transaction_id).await {
1356 warn!("couldn't find the local echo to discard");
1357 }
1358 }
1359
1360 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1361 let content = match new_content.deserialize() {
1362 Ok(d) => d,
1363 Err(err) => {
1364 warn!("error deserializing local echo (upon edit): {err}");
1365 return;
1366 }
1367 };
1368
1369 if !self.replace_local_echo(&transaction_id, content).await {
1370 warn!("couldn't find the local echo to replace");
1371 }
1372 }
1373
1374 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1375 self.update_event_send_state(
1376 &transaction_id,
1377 EventSendState::SendingFailed { error, is_recoverable },
1378 )
1379 .await;
1380 }
1381
1382 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1383 self.update_event_send_state(
1384 &transaction_id,
1385 EventSendState::NotSentYet { progress: None },
1386 )
1387 .await;
1388 }
1389
1390 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1391 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1392 .await;
1393 }
1394
1395 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1396 self.update_event_send_state(
1397 &related_to,
1398 EventSendState::NotSentYet {
1399 progress: Some(MediaUploadProgress { index, progress }),
1400 },
1401 )
1402 .await;
1403 }
1404 }
1405 }
1406
1407 pub async fn insert_timeline_start_if_missing(&self) {
1410 let mut state = self.state.write().await;
1411 let mut txn = state.transaction();
1412 txn.items.push_timeline_start_if_missing(
1413 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1414 );
1415 txn.commit();
1416 }
1417
1418 pub(super) async fn make_replied_to(
1424 &self,
1425 event: TimelineEvent,
1426 ) -> Result<Option<EmbeddedEvent>, Error> {
1427 let state = self.state.read().await;
1428 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1429 }
1430}
1431
1432impl TimelineController {
1433 pub(super) fn room(&self) -> &Room {
1434 &self.room_data_provider
1435 }
1436
1437 #[instrument(skip(self))]
1440 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1441 let state_guard = self.state.write().await;
1442 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1443 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1444 let remote_item = item
1445 .as_remote()
1446 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1447 .clone();
1448
1449 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1450 debug!("Event is not a message");
1451 return Ok(());
1452 };
1453 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1454 debug!("Event is not a reply");
1455 return Ok(());
1456 };
1457 if let TimelineDetails::Pending = &in_reply_to.event {
1458 debug!("Replied-to event is already being fetched");
1459 return Ok(());
1460 }
1461 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1462 debug!("Replied-to event has already been fetched");
1463 return Ok(());
1464 }
1465
1466 let internal_id = item.internal_id.to_owned();
1467 let item = item.clone();
1468 let event = fetch_replied_to_event(
1469 state_guard,
1470 &self.state,
1471 index,
1472 &item,
1473 internal_id,
1474 &msglike,
1475 &in_reply_to.event_id,
1476 self.room(),
1477 )
1478 .await?;
1479
1480 let mut state = self.state.write().await;
1483 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1484 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1485
1486 let TimelineItemContent::MsgLike(MsgLikeContent {
1489 kind: MsgLikeKind::Message(message),
1490 reactions,
1491 thread_root,
1492 in_reply_to,
1493 thread_summary,
1494 }) = item.content().clone()
1495 else {
1496 info!("Event is no longer a message (redacted?)");
1497 return Ok(());
1498 };
1499 let Some(in_reply_to) = in_reply_to else {
1500 warn!("Event no longer has a reply (bug?)");
1501 return Ok(());
1502 };
1503
1504 trace!("Updating in-reply-to details");
1507 let internal_id = item.internal_id.to_owned();
1508 let mut item = item.clone();
1509 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1510 kind: MsgLikeKind::Message(message),
1511 reactions,
1512 thread_root,
1513 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1514 thread_summary,
1515 }));
1516 state.items.replace(index, TimelineItem::new(item, internal_id));
1517
1518 Ok(())
1519 }
1520
1521 pub(super) fn infer_thread_for_read_receipt(
1527 &self,
1528 receipt_type: &SendReceiptType,
1529 ) -> ReceiptThread {
1530 if matches!(receipt_type, SendReceiptType::FullyRead) {
1531 ReceiptThread::Unthreaded
1532 } else {
1533 self.focus.receipt_thread()
1534 }
1535 }
1536
1537 pub(super) async fn should_send_receipt(
1541 &self,
1542 receipt_type: &SendReceiptType,
1543 receipt_thread: &ReceiptThread,
1544 event_id: &EventId,
1545 ) -> bool {
1546 let own_user_id = self.room().own_user_id();
1547 let state = self.state.read().await;
1548 let room = self.room();
1549
1550 match receipt_type {
1551 SendReceiptType::Read => {
1552 if let Some((old_pub_read, _)) = state
1553 .meta
1554 .user_receipt(
1555 own_user_id,
1556 ReceiptType::Read,
1557 receipt_thread.clone(),
1558 room,
1559 state.items.all_remote_events(),
1560 )
1561 .await
1562 {
1563 trace!(%old_pub_read, "found a previous public receipt");
1564 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1565 &old_pub_read,
1566 event_id,
1567 state.items.all_remote_events(),
1568 ) {
1569 trace!(
1570 "event referred to new receipt is {relative_pos:?} the previous receipt"
1571 );
1572 return relative_pos == RelativePosition::After;
1573 }
1574 }
1575 }
1576
1577 SendReceiptType::ReadPrivate => {
1580 if let Some((old_priv_read, _)) =
1581 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1582 {
1583 trace!(%old_priv_read, "found a previous private receipt");
1584 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1585 &old_priv_read,
1586 event_id,
1587 state.items.all_remote_events(),
1588 ) {
1589 trace!(
1590 "event referred to new receipt is {relative_pos:?} the previous receipt"
1591 );
1592 return relative_pos == RelativePosition::After;
1593 }
1594 }
1595 }
1596
1597 SendReceiptType::FullyRead => {
1598 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1599 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1600 &prev_event_id,
1601 event_id,
1602 state.items.all_remote_events(),
1603 )
1604 {
1605 return relative_pos == RelativePosition::After;
1606 }
1607 }
1608
1609 _ => {}
1610 }
1611
1612 true
1614 }
1615
1616 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1619 let state = self.state.read().await;
1620 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1621 }
1622
1623 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1624 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1625 self.retry_event_decryption_inner(session_ids).await
1626 }
1627
1628 pub(super) async fn map_pagination_status(
1636 &self,
1637 status: RoomPaginationStatus,
1638 ) -> RoomPaginationStatus {
1639 match status {
1640 RoomPaginationStatus::Idle { hit_timeline_start } => {
1641 if hit_timeline_start {
1642 let state = self.state.read().await;
1643 if state.meta.subscriber_skip_count.get() > 0 {
1647 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1648 }
1649 }
1650 }
1651 RoomPaginationStatus::Paginating => {}
1652 }
1653
1654 status
1656 }
1657}
1658
1659#[allow(clippy::too_many_arguments)]
1660async fn fetch_replied_to_event<P: RoomDataProvider>(
1661 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1662 state_lock: &RwLock<TimelineState<P>>,
1663 index: usize,
1664 item: &EventTimelineItem,
1665 internal_id: TimelineUniqueId,
1666 msglike: &MsgLikeContent,
1667 in_reply_to: &EventId,
1668 room: &Room,
1669) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1670 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1671 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1672 trace!("Found replied-to event locally");
1673 return Ok(details);
1674 }
1675
1676 trace!("Setting in-reply-to details to pending");
1679 let in_reply_to_details =
1680 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1681
1682 let event_item = item
1683 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1684
1685 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1686 state_guard.items.replace(index, new_timeline_item);
1687
1688 drop(state_guard);
1690
1691 trace!("Fetching replied-to event");
1692 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1693 Ok(timeline_event) => {
1694 let state = state_lock.read().await;
1695
1696 let replied_to_item =
1697 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1698
1699 if let Some(item) = replied_to_item {
1700 TimelineDetails::Ready(Box::new(item))
1701 } else {
1702 return Err(Error::UnsupportedEvent);
1704 }
1705 }
1706
1707 Err(e) => TimelineDetails::Error(Arc::new(e)),
1708 };
1709
1710 Ok(res)
1711}