1use std::{
16 collections::BTreeSet,
17 fmt,
18 sync::{Arc, OnceLock},
19};
20
21use as_variant::as_variant;
22use eyeball_im::{VectorDiff, VectorSubscriberStream};
23use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26#[cfg(test)]
27use matrix_sdk::Result;
28use matrix_sdk::{
29 deserialized_responses::TimelineEvent,
30 event_cache::{DecryptionRetryRequest, EventFocusThreadMode, PaginationStatus, RoomEventCache},
31 send_queue::{
32 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
33 },
34};
35#[cfg(test)]
36use ruma::events::receipt::ReceiptEventContent;
37use ruma::{
38 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
39 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
40 events::{
41 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
42 AnySyncTimelineEvent, MessageLikeEventType,
43 poll::unstable_start::UnstablePollStartEventContent,
44 reaction::ReactionEventContent,
45 receipt::{Receipt, ReceiptThread, ReceiptType},
46 relation::Annotation,
47 room::message::{MessageType, Relation},
48 },
49 room_version_rules::RoomVersionRules,
50 serde::Raw,
51};
52use tokio::sync::{RwLock, RwLockWriteGuard};
53use tracing::{debug, error, field::debug, info, instrument, trace, warn};
54
55pub(super) use self::{
56 metadata::{RelativePosition, TimelineMetadata},
57 observable_items::{
58 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
59 ObservableItemsTransactionEntry,
60 },
61 state::TimelineState,
62 state_transaction::TimelineStateTransaction,
63};
64use super::{
65 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
66 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
67 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
68 TimelineReadReceiptTracking, VirtualTimelineItem,
69 algorithms::{rfind_event_by_id, rfind_event_item},
70 event_item::{ReactionStatus, RemoteEventOrigin},
71 item::TimelineUniqueId,
72 subscriber::TimelineSubscriber,
73 traits::RoomDataProvider,
74};
75use crate::{
76 timeline::{
77 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
78 algorithms::rfind_event_by_item_id,
79 controller::decryption_retry_task::compute_redecryption_candidates,
80 date_dividers::DateDividerAdjuster, event_item::TimelineItemHandle,
81 },
82 unable_to_decrypt_hook::UtdHookManager,
83};
84
85pub(in crate::timeline) mod aggregations;
86mod decryption_retry_task;
87mod metadata;
88mod observable_items;
89mod read_receipts;
90mod state;
91mod state_transaction;
92
93pub(super) use aggregations::*;
94pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
95
96#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind {
103 Live {
105 hide_threaded_events: bool,
107 },
108
109 Event {
112 focused_event_id: OwnedEventId,
114
115 thread_root: OnceLock<OwnedEventId>,
121
122 thread_mode: TimelineEventFocusThreadMode,
125 },
126
127 Thread {
129 root_event_id: OwnedEventId,
131 },
132
133 PinnedEvents,
134}
135
136impl TimelineFocusKind {
137 pub(super) fn receipt_thread(&self) -> ReceiptThread {
144 if let Some(thread_root) = self.thread_root() {
145 ReceiptThread::Thread(thread_root.to_owned())
146 } else if self.hide_threaded_events() {
147 ReceiptThread::Main
148 } else {
149 ReceiptThread::Unthreaded
150 }
151 }
152
153 fn hide_threaded_events(&self) -> bool {
155 match self {
156 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
157 TimelineFocusKind::Event { thread_mode, .. } => {
158 matches!(
159 thread_mode,
160 TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
161 )
162 }
163 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
164 }
165 }
166
167 fn is_thread(&self) -> bool {
170 self.thread_root().is_some()
171 }
172
173 fn thread_root(&self) -> Option<&EventId> {
175 match self {
176 TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
177 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
178 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
179 }
180 }
181}
182
183#[derive(Clone, Debug)]
184pub(super) struct TimelineController<P: RoomDataProvider = Room> {
185 state: Arc<RwLock<TimelineState<P>>>,
187
188 focus: Arc<TimelineFocusKind>,
190
191 pub(crate) room_data_provider: P,
196
197 pub(super) settings: TimelineSettings,
199}
200
201#[derive(Clone)]
202pub(super) struct TimelineSettings {
203 pub(super) track_read_receipts: TimelineReadReceiptTracking,
206
207 pub(super) event_filter: Arc<TimelineEventFilterFn>,
210
211 pub(super) add_failed_to_parse: bool,
213
214 pub(super) date_divider_mode: DateDividerMode,
216}
217
218#[cfg(not(tarpaulin_include))]
219impl fmt::Debug for TimelineSettings {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.debug_struct("TimelineSettings")
222 .field("track_read_receipts", &self.track_read_receipts)
223 .field("add_failed_to_parse", &self.add_failed_to_parse)
224 .finish_non_exhaustive()
225 }
226}
227
228impl Default for TimelineSettings {
229 fn default() -> Self {
230 Self {
231 track_read_receipts: TimelineReadReceiptTracking::Disabled,
232 event_filter: Arc::new(default_event_filter),
233 add_failed_to_parse: true,
234 date_divider_mode: DateDividerMode::Daily,
235 }
236 }
237}
238
239pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
249 match event {
250 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
251 if ev.redacts(&rules.redaction).is_some() {
252 false
255 } else {
256 ev.event_type() != MessageLikeEventType::Reaction
259 }
260 }
261
262 AnySyncTimelineEvent::MessageLike(msg) => {
263 match msg.original_content() {
264 None => {
265 msg.event_type() != MessageLikeEventType::Reaction
268 }
269
270 Some(original_content) => {
271 match original_content {
272 AnyMessageLikeEventContent::RoomMessage(content) => {
273 if content
274 .relates_to
275 .as_ref()
276 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
277 {
278 return false;
280 }
281
282 match content.msgtype {
283 MessageType::Audio(_)
284 | MessageType::Emote(_)
285 | MessageType::File(_)
286 | MessageType::Image(_)
287 | MessageType::Location(_)
288 | MessageType::Notice(_)
289 | MessageType::ServerNotice(_)
290 | MessageType::Text(_)
291 | MessageType::Video(_)
292 | MessageType::VerificationRequest(_) => true,
293 #[cfg(feature = "unstable-msc4274")]
294 MessageType::Gallery(_) => true,
295 _ => false,
296 }
297 }
298
299 AnyMessageLikeEventContent::Sticker(_)
300 | AnyMessageLikeEventContent::UnstablePollStart(
301 UnstablePollStartEventContent::New(_),
302 )
303 | AnyMessageLikeEventContent::CallInvite(_)
304 | AnyMessageLikeEventContent::RtcNotification(_)
305 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
306
307 AnyMessageLikeEventContent::Beacon(_) => false,
311
312 _ => false,
313 }
314 }
315 }
316 }
317
318 AnySyncTimelineEvent::State(_) => {
319 true
321 }
322 }
323}
324
325impl<P: RoomDataProvider> TimelineController<P> {
326 pub(super) fn new(
327 room_data_provider: P,
328 focus: TimelineFocus,
329 internal_id_prefix: Option<String>,
330 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
331 is_room_encrypted: bool,
332 settings: TimelineSettings,
333 ) -> Self {
334 let focus = match focus {
335 TimelineFocus::Live { hide_threaded_events } => {
336 TimelineFocusKind::Live { hide_threaded_events }
337 }
338
339 TimelineFocus::Event { target, thread_mode, .. } => {
340 TimelineFocusKind::Event {
341 focused_event_id: target,
342 thread_root: OnceLock::new(),
344 thread_mode,
345 }
346 }
347
348 TimelineFocus::Thread { root_event_id, .. } => {
349 TimelineFocusKind::Thread { root_event_id }
350 }
351
352 TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
353 };
354
355 let focus = Arc::new(focus);
356 let state = Arc::new(RwLock::new(TimelineState::new(
357 focus.clone(),
358 room_data_provider.own_user_id().to_owned(),
359 room_data_provider.room_version_rules(),
360 internal_id_prefix,
361 unable_to_decrypt_hook,
362 is_room_encrypted,
363 )));
364
365 Self { state, focus, room_data_provider, settings }
366 }
367
368 pub(super) async fn init_focus(
375 &self,
376 focus: &TimelineFocus,
377 room_event_cache: &RoomEventCache,
378 ) -> Result<bool, Error> {
379 match focus {
380 TimelineFocus::Live { .. } => {
381 let events = room_event_cache.events().await?;
383
384 let has_events = !events.is_empty();
385
386 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
387
388 match room_event_cache.pagination().status().get() {
389 PaginationStatus::Idle { hit_timeline_start } => {
390 if hit_timeline_start {
391 self.insert_timeline_start_if_missing().await;
394 }
395 }
396 PaginationStatus::Paginating => {}
397 }
398
399 Ok(has_events)
400 }
401
402 TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
403 let event_cache_thread_mode = match thread_mode {
405 TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
406 TimelineEventFocusThreadMode::Automatic { .. } => {
407 EventFocusThreadMode::Automatic
408 }
409 };
410
411 let cache = room_event_cache
412 .get_or_create_event_focused_cache(
413 event_id.clone(),
414 *num_context_events,
415 event_cache_thread_mode,
416 )
417 .await
418 .map_err(PaginationError::EventCache)?;
419
420 let (events, _receiver) = cache.subscribe().await;
421
422 let has_events = !events.is_empty();
423
424 match &*self.focus {
427 TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
428 if let Some(thread_root) = cache.thread_root().await {
429 focus_thread_root.get_or_init(|| thread_root);
430 }
431 }
432 TimelineFocusKind::Live { .. }
433 | TimelineFocusKind::Thread { .. }
434 | TimelineFocusKind::PinnedEvents => {
435 panic!("unexpected focus for an event-focused timeline")
436 }
437 }
438
439 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
440 .await;
441
442 Ok(has_events)
443 }
444
445 TimelineFocus::Thread { root_event_id, .. } => {
446 self.init_with_thread_root(root_event_id, room_event_cache).await
447 }
448
449 TimelineFocus::PinnedEvents => {
450 let (initial_events, _update_receiver) =
451 room_event_cache.subscribe_to_pinned_events().await?;
452
453 let has_events = !initial_events.is_empty();
454
455 self.replace_with_initial_remote_events(
456 initial_events,
457 RemoteEventOrigin::Pagination,
458 )
459 .await;
460
461 Ok(has_events)
462 }
463 }
464 }
465
466 pub(super) async fn init_with_thread_root(
471 &self,
472 root_event_id: &OwnedEventId,
473 room_event_cache: &RoomEventCache,
474 ) -> Result<bool, Error> {
475 let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
476 let has_events = !events.is_empty();
477
478 let mut related_events = Vector::new();
482 for event_id in events.iter().filter_map(|event| event.event_id()) {
483 if let Some((_original, related)) =
484 room_event_cache.find_event_with_relations(&event_id, None).await?
485 {
486 related_events.extend(related);
487 }
488 }
489
490 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
491
492 if !related_events.is_empty() {
494 self.handle_remote_aggregations(
495 vec![VectorDiff::Append { values: related_events }],
496 RemoteEventOrigin::Cache,
497 )
498 .await;
499 }
500
501 Ok(has_events)
502 }
503
504 pub async fn handle_encryption_state_changes(&self) {
509 let mut room_info = self.room_data_provider.room_info();
510
511 let mark_encrypted = || async {
513 let mut state = self.state.write().await;
514 state.meta.is_room_encrypted = true;
515 state.mark_all_events_as_encrypted();
516 };
517
518 if room_info.get().encryption_state().is_encrypted() {
519 mark_encrypted().await;
522 return;
523 }
524
525 while let Some(info) = room_info.next().await {
526 if info.encryption_state().is_encrypted() {
527 mark_encrypted().await;
528 break;
531 }
532 }
533 }
534
535 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
544 let state = self.state.read().await;
545
546 let (count, needs) = state
547 .meta
548 .subscriber_skip_count
549 .compute_next_when_paginating_backwards(num_events.into());
550
551 let is_live_timeline = true;
553 state.meta.subscriber_skip_count.update(count, is_live_timeline);
554
555 needs
556 }
557
558 pub(super) fn is_live(&self) -> bool {
560 matches!(&*self.focus, TimelineFocusKind::Live { .. })
561 }
562
563 pub(super) fn is_threaded(&self) -> bool {
565 self.focus.is_thread()
566 }
567
568 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
571 self.focus.thread_root().map(ToOwned::to_owned)
572 }
573
574 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
578 self.state.read().await.items.clone_items()
579 }
580
581 #[cfg(test)]
582 pub(super) async fn subscribe_raw(
583 &self,
584 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
585 self.state.read().await.items.subscribe().into_values_and_stream()
586 }
587
588 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
589 let state = self.state.read().await;
590
591 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
592 }
593
594 pub(super) async fn subscribe_filter_map<U, F>(
595 &self,
596 f: F,
597 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
598 where
599 U: Clone,
600 F: Fn(Arc<TimelineItem>) -> Option<U>,
601 {
602 self.state.read().await.items.subscribe().filter_map(f)
603 }
604
605 #[instrument(skip_all)]
609 pub(super) async fn toggle_reaction_local(
610 &self,
611 item_id: &TimelineEventItemId,
612 key: &str,
613 ) -> Result<bool, Error> {
614 let mut state = self.state.write().await;
615
616 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
617 warn!("Timeline item not found, can't add reaction");
618 return Err(Error::FailedToToggleReaction);
619 };
620
621 let user_id = self.room_data_provider.own_user_id();
622 let prev_status = item
623 .content()
624 .reactions()
625 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
626
627 let Some(prev_status) = prev_status else {
628 match item.handle() {
630 TimelineItemHandle::Local(send_handle) => {
631 if send_handle
632 .react(key.to_owned())
633 .await
634 .map_err(|err| Error::SendQueueError(err.into()))?
635 .is_some()
636 {
637 trace!("adding a reaction to a local echo");
638 return Ok(true);
639 }
640
641 warn!("couldn't toggle reaction for local echo");
642 return Ok(false);
643 }
644
645 TimelineItemHandle::Remote(event_id) => {
646 trace!("adding a reaction to a remote echo");
650 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
651 self.room_data_provider
652 .send(ReactionEventContent::from(annotation).into())
653 .await?;
654 return Ok(true);
655 }
656 }
657 };
658
659 trace!("removing a previous reaction");
660 match prev_status {
661 ReactionStatus::LocalToLocal(send_reaction_handle) => {
662 if let Some(handle) = send_reaction_handle {
663 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
664 warn!("unexpectedly unable to abort sending of local reaction");
667 }
668 } else {
669 warn!("no send reaction handle (this should only happen in testing contexts)");
670 }
671 }
672
673 ReactionStatus::LocalToRemote(send_handle) => {
674 trace!("aborting send of the previous reaction that was a local echo");
677 if let Some(handle) = send_handle {
678 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
679 warn!("unexpectedly unable to abort sending of local reaction");
682 }
683 } else {
684 warn!("no send handle (this should only happen in testing contexts)");
685 }
686 }
687
688 ReactionStatus::RemoteToRemote(event_id) => {
689 let Some(annotated_event_id) =
691 item.as_remote().map(|event_item| event_item.event_id.clone())
692 else {
693 warn!("remote reaction to remote event, but the associated item isn't remote");
694 return Ok(false);
695 };
696
697 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
698 let reaction_info = reactions.remove_reaction(user_id, key);
699
700 if reaction_info.is_some() {
701 let new_item = item.with_reactions(reactions);
702 state.items.replace(item_pos, new_item);
703 } else {
704 warn!(
705 "reaction is missing on the item, not removing it locally, \
706 but sending redaction."
707 );
708 }
709
710 drop(state);
712
713 trace!("sending redact for a previous reaction");
714 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
715 if let Some(reaction_info) = reaction_info {
716 debug!("sending redact failed, adding the reaction back to the list");
717
718 let mut state = self.state.write().await;
719 if let Some((item_pos, item)) =
720 rfind_event_by_id(&state.items, &annotated_event_id)
721 {
722 let mut reactions =
724 item.content().reactions().cloned().unwrap_or_default();
725 reactions
726 .entry(key.to_owned())
727 .or_default()
728 .insert(user_id.to_owned(), reaction_info);
729 let new_item = item.with_reactions(reactions);
730 state.items.replace(item_pos, new_item);
731 } else {
732 warn!(
733 "couldn't find item to re-add reaction anymore; \
734 maybe it's been redacted?"
735 );
736 }
737 }
738
739 return Err(err);
740 }
741 }
742 }
743
744 Ok(false)
745 }
746
747 pub(super) async fn handle_remote_events_with_diffs(
749 &self,
750 diffs: Vec<VectorDiff<TimelineEvent>>,
751 origin: RemoteEventOrigin,
752 ) {
753 if diffs.is_empty() {
754 return;
755 }
756
757 let mut state = self.state.write().await;
758 state
759 .handle_remote_events_with_diffs(
760 diffs,
761 origin,
762 &self.room_data_provider,
763 &self.settings,
764 )
765 .await
766 }
767
768 pub(super) async fn handle_remote_aggregations(
770 &self,
771 diffs: Vec<VectorDiff<TimelineEvent>>,
772 origin: RemoteEventOrigin,
773 ) {
774 if diffs.is_empty() {
775 return;
776 }
777
778 let mut state = self.state.write().await;
779 state
780 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
781 .await
782 }
783
784 pub(super) async fn clear(&self) {
785 self.state.write().await.clear();
786 }
787
788 pub(super) async fn replace_with_initial_remote_events<Events>(
796 &self,
797 events: Events,
798 origin: RemoteEventOrigin,
799 ) where
800 Events: IntoIterator,
801 <Events as IntoIterator>::Item: Into<TimelineEvent>,
802 {
803 let mut state = self.state.write().await;
804
805 let track_read_markers = &self.settings.track_read_receipts;
806 if track_read_markers.is_enabled() {
807 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
808 state
809 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
810 .await;
811 }
812
813 let mut events = events.into_iter().peekable();
819 if !state.items.is_empty() || events.peek().is_some() {
820 state
821 .replace_with_remote_events(
822 events,
823 origin,
824 &self.room_data_provider,
825 &self.settings,
826 )
827 .await;
828 }
829
830 if track_read_markers.is_enabled() {
831 if let Some(fully_read_event_id) =
832 self.room_data_provider.load_fully_read_marker().await
833 {
834 state.handle_fully_read_marker(fully_read_event_id);
835 } else if let Some(latest_receipt_event_id) = state
836 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
837 {
838 debug!("no `m.fully_read` marker found, falling back to read receipt");
840 state.handle_fully_read_marker(latest_receipt_event_id);
841 }
842 }
843 }
844
845 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
846 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
847 }
848
849 pub(super) async fn handle_ephemeral_events(
850 &self,
851 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
852 ) {
853 if events.is_empty() {
855 return;
856 }
857 let mut state = self.state.write().await;
858 state.handle_ephemeral_events(events, &self.room_data_provider).await;
859 }
860
861 #[instrument(skip_all)]
863 pub(super) async fn handle_local_event(
864 &self,
865 txn_id: OwnedTransactionId,
866 content: AnyMessageLikeEventContent,
867 send_handle: Option<SendHandle>,
868 ) {
869 let sender = self.room_data_provider.own_user_id().to_owned();
870 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
871
872 let date_divider_mode = self.settings.date_divider_mode.clone();
873
874 let mut state = self.state.write().await;
875 state
876 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
877 .await;
878 }
879
880 #[instrument(skip(self))]
885 pub(super) async fn update_event_send_state(
886 &self,
887 txn_id: &TransactionId,
888 send_state: EventSendState,
889 ) {
890 let mut state = self.state.write().await;
891 let mut txn = state.transaction();
892
893 let new_event_id: Option<&EventId> =
894 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
895
896 if rfind_event_item(&txn.items, |it| {
899 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
900 })
901 .is_some()
902 {
903 trace!("Remote echo received before send-event response");
905
906 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
907
908 if let Some((idx, _)) = local_echo {
912 warn!("Message echo got duplicated, removing the local one");
913 txn.items.remove(idx);
914
915 let mut adjuster =
917 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
918 adjuster.run(&mut txn.items, &mut txn.meta);
919 }
920
921 txn.commit();
922 return;
923 }
924
925 let result = rfind_event_item(&txn.items, |it| {
927 it.transaction_id() == Some(txn_id)
928 || new_event_id.is_some()
929 && it.event_id() == new_event_id
930 && it.as_local().is_some()
931 });
932
933 let Some((idx, item)) = result else {
934 if let Some(new_event_id) = new_event_id {
939 if txn.meta.aggregations.mark_aggregation_as_sent(
940 txn_id.to_owned(),
941 new_event_id.to_owned(),
942 &mut txn.items,
943 &txn.meta.room_version_rules,
944 ) {
945 trace!("Aggregation marked as sent");
946 txn.commit();
947 return;
948 }
949
950 trace!("Sent aggregation was not found");
951 }
952
953 warn!("Timeline item not found, can't update send state");
954 return;
955 };
956
957 let Some(local_item) = item.as_local() else {
958 warn!("We looked for a local item, but it transitioned to remote.");
959 return;
960 };
961
962 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
965 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
966 }
967
968 if let Some(new_event_id) = new_event_id {
971 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
972 }
973
974 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
975 txn.items.replace(idx, new_item);
976
977 txn.commit();
978 }
979
980 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
981 let mut state = self.state.write().await;
982
983 if let Some((idx, _)) =
984 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
985 {
986 let mut txn = state.transaction();
987
988 txn.items.remove(idx);
989
990 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
993 adjuster.run(&mut txn.items, &mut txn.meta);
994
995 txn.meta.update_read_marker(&mut txn.items);
996
997 txn.commit();
998
999 debug!("discarded local echo");
1000 return true;
1001 }
1002
1003 let mut txn = state.transaction();
1006
1007 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1009 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1010 &mut txn.items,
1011 ) {
1012 Ok(val) => val,
1013 Err(err) => {
1014 warn!("error when discarding local echo for an aggregation: {err}");
1015 true
1017 }
1018 };
1019
1020 if found_aggregation {
1021 txn.commit();
1022 }
1023
1024 found_aggregation
1025 }
1026
1027 pub(super) async fn replace_local_echo(
1028 &self,
1029 txn_id: &TransactionId,
1030 content: AnyMessageLikeEventContent,
1031 ) -> bool {
1032 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1033 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1038 return false;
1039 };
1040
1041 let mut state = self.state.write().await;
1042 let mut txn = state.transaction();
1043
1044 let Some((idx, prev_item)) =
1045 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1046 else {
1047 debug!("Can't find local echo to replace");
1048 return false;
1049 };
1050
1051 let ti_kind = {
1054 let Some(prev_local_item) = prev_item.as_local() else {
1055 warn!("We looked for a local item, but it transitioned as remote??");
1056 return false;
1057 };
1058 let progress = as_variant!(&prev_local_item.send_state,
1060 EventSendState::NotSentYet { progress } => progress.clone())
1061 .flatten();
1062 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1063 };
1064
1065 let new_item = TimelineItem::new(
1067 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1068 content.msgtype,
1069 content.mentions,
1070 prev_item.content().reactions().cloned().unwrap_or_default(),
1071 prev_item.content().thread_root(),
1072 prev_item.content().in_reply_to(),
1073 prev_item.content().thread_summary(),
1074 )),
1075 prev_item.internal_id.to_owned(),
1076 );
1077
1078 txn.items.replace(idx, new_item);
1079
1080 txn.commit();
1084
1085 debug!("Replaced local echo");
1086 true
1087 }
1088
1089 pub(super) async fn compute_redecryption_candidates(
1090 &self,
1091 ) -> (BTreeSet<String>, BTreeSet<String>) {
1092 let state = self.state.read().await;
1093 compute_redecryption_candidates(&state.items)
1094 }
1095
1096 pub(super) async fn set_sender_profiles_pending(&self) {
1097 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1098 }
1099
1100 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1101 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1102 }
1103
1104 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1105 self.state.write().await.items.for_each(|mut entry| {
1106 let Some(event_item) = entry.as_event() else { return };
1107 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1108 let new_item = entry.with_kind(TimelineItemKind::Event(
1109 event_item.with_sender_profile(profile_state.clone()),
1110 ));
1111 ObservableItemsEntry::replace(&mut entry, new_item);
1112 }
1113 });
1114 }
1115
1116 pub(super) async fn update_missing_sender_profiles(&self) {
1117 trace!("Updating missing sender profiles");
1118
1119 let mut state = self.state.write().await;
1120 let mut entries = state.items.entries();
1121 while let Some(mut entry) = entries.next() {
1122 let Some(event_item) = entry.as_event() else { continue };
1123 let event_id = event_item.event_id().map(debug);
1124 let transaction_id = event_item.transaction_id().map(debug);
1125
1126 if event_item.sender_profile().is_ready() {
1127 trace!(event_id, transaction_id, "Profile already set");
1128 continue;
1129 }
1130
1131 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1132 Some(profile) => {
1133 trace!(event_id, transaction_id, "Adding profile");
1134 let updated_item =
1135 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1136 let new_item = entry.with_kind(updated_item);
1137 ObservableItemsEntry::replace(&mut entry, new_item);
1138 }
1139 None => {
1140 if !event_item.sender_profile().is_unavailable() {
1141 trace!(event_id, transaction_id, "Marking profile unavailable");
1142 let updated_item =
1143 event_item.with_sender_profile(TimelineDetails::Unavailable);
1144 let new_item = entry.with_kind(updated_item);
1145 ObservableItemsEntry::replace(&mut entry, new_item);
1146 } else {
1147 debug!(event_id, transaction_id, "Profile already marked unavailable");
1148 }
1149 }
1150 }
1151 }
1152
1153 trace!("Done updating missing sender profiles");
1154 }
1155
1156 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1158 trace!("Forcing update of sender profiles: {sender_ids:?}");
1159
1160 let mut state = self.state.write().await;
1161 let mut entries = state.items.entries();
1162 while let Some(mut entry) = entries.next() {
1163 let Some(event_item) = entry.as_event() else { continue };
1164 if !sender_ids.contains(event_item.sender()) {
1165 continue;
1166 }
1167
1168 let event_id = event_item.event_id().map(debug);
1169 let transaction_id = event_item.transaction_id().map(debug);
1170
1171 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1172 Some(profile) => {
1173 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1174 {
1175 debug!(event_id, transaction_id, "Profile already up-to-date");
1176 } else {
1177 trace!(event_id, transaction_id, "Updating profile");
1178 let updated_item =
1179 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1180 let new_item = entry.with_kind(updated_item);
1181 ObservableItemsEntry::replace(&mut entry, new_item);
1182 }
1183 }
1184 None => {
1185 if !event_item.sender_profile().is_unavailable() {
1186 trace!(event_id, transaction_id, "Marking profile unavailable");
1187 let updated_item =
1188 event_item.with_sender_profile(TimelineDetails::Unavailable);
1189 let new_item = entry.with_kind(updated_item);
1190 ObservableItemsEntry::replace(&mut entry, new_item);
1191 } else {
1192 debug!(event_id, transaction_id, "Profile already marked unavailable");
1193 }
1194 }
1195 }
1196 }
1197
1198 trace!("Done forcing update of sender profiles");
1199 }
1200
1201 #[cfg(test)]
1202 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1203 let own_user_id = self.room_data_provider.own_user_id();
1204 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1205 }
1206
1207 pub(super) async fn latest_user_read_receipt(
1211 &self,
1212 user_id: &UserId,
1213 ) -> Option<(OwnedEventId, Receipt)> {
1214 let receipt_thread = self.focus.receipt_thread();
1215
1216 self.state
1217 .read()
1218 .await
1219 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1220 .await
1221 }
1222
1223 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1226 &self,
1227 user_id: &UserId,
1228 ) -> Option<OwnedEventId> {
1229 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1230 }
1231
1232 pub async fn subscribe_own_user_read_receipts_changed(
1234 &self,
1235 ) -> impl Stream<Item = ()> + use<P> {
1236 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1237 }
1238
1239 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1241 match echo.content {
1242 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1243 let content = match serialized_event.deserialize() {
1244 Ok(d) => d,
1245 Err(err) => {
1246 warn!("error deserializing local echo: {err}");
1247 return;
1248 }
1249 };
1250
1251 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1252 .await;
1253
1254 if let Some(send_error) = send_error {
1255 self.update_event_send_state(
1256 &echo.transaction_id,
1257 EventSendState::SendingFailed {
1258 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1259 send_error,
1260 ))),
1261 is_recoverable: false,
1262 },
1263 )
1264 .await;
1265 }
1266 }
1267
1268 LocalEchoContent::React { key, send_handle, applies_to } => {
1269 self.handle_local_reaction(key, send_handle, applies_to).await;
1270 }
1271 }
1272 }
1273
1274 #[instrument(skip(self, send_handle))]
1276 async fn handle_local_reaction(
1277 &self,
1278 reaction_key: String,
1279 send_handle: SendReactionHandle,
1280 applies_to: OwnedTransactionId,
1281 ) {
1282 let mut state = self.state.write().await;
1283 let mut tr = state.transaction();
1284
1285 let target = TimelineEventItemId::TransactionId(applies_to);
1286
1287 let reaction_txn_id = send_handle.transaction_id().to_owned();
1288 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1289 let aggregation = Aggregation::new(
1290 TimelineEventItemId::TransactionId(reaction_txn_id),
1291 AggregationKind::Reaction {
1292 key: reaction_key.clone(),
1293 sender: self.room_data_provider.own_user_id().to_owned(),
1294 timestamp: MilliSecondsSinceUnixEpoch::now(),
1295 reaction_status,
1296 },
1297 );
1298
1299 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1300 find_item_and_apply_aggregation(
1301 &tr.meta.aggregations,
1302 &mut tr.items,
1303 &target,
1304 aggregation,
1305 &tr.meta.room_version_rules,
1306 );
1307
1308 tr.commit();
1309 }
1310
1311 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1313 match update {
1314 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1315 self.handle_local_echo(echo).await;
1316 }
1317
1318 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1319 if !self.discard_local_echo(&transaction_id).await {
1320 warn!("couldn't find the local echo to discard");
1321 }
1322 }
1323
1324 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1325 let content = match new_content.deserialize() {
1326 Ok(d) => d,
1327 Err(err) => {
1328 warn!("error deserializing local echo (upon edit): {err}");
1329 return;
1330 }
1331 };
1332
1333 if !self.replace_local_echo(&transaction_id, content).await {
1334 warn!("couldn't find the local echo to replace");
1335 }
1336 }
1337
1338 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1339 self.update_event_send_state(
1340 &transaction_id,
1341 EventSendState::SendingFailed { error, is_recoverable },
1342 )
1343 .await;
1344 }
1345
1346 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1347 self.update_event_send_state(
1348 &transaction_id,
1349 EventSendState::NotSentYet { progress: None },
1350 )
1351 .await;
1352 }
1353
1354 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1355 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1356 .await;
1357 }
1358
1359 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1360 self.update_event_send_state(
1361 &related_to,
1362 EventSendState::NotSentYet {
1363 progress: Some(MediaUploadProgress { index, progress }),
1364 },
1365 )
1366 .await;
1367 }
1368 }
1369 }
1370
1371 pub async fn insert_timeline_start_if_missing(&self) {
1374 let mut state = self.state.write().await;
1375 let mut txn = state.transaction();
1376 txn.items.push_timeline_start_if_missing(
1377 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1378 );
1379 txn.commit();
1380 }
1381
1382 pub(super) async fn make_replied_to(
1388 &self,
1389 event: TimelineEvent,
1390 ) -> Result<Option<EmbeddedEvent>, Error> {
1391 let state = self.state.read().await;
1392 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1393 }
1394}
1395
1396impl TimelineController {
1397 pub(super) fn room(&self) -> &Room {
1398 &self.room_data_provider
1399 }
1400
1401 #[instrument(skip(self))]
1404 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1405 let state_guard = self.state.write().await;
1406 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1407 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1408 let remote_item = item
1409 .as_remote()
1410 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1411 .clone();
1412
1413 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1414 debug!("Event is not a message");
1415 return Ok(());
1416 };
1417 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1418 debug!("Event is not a reply");
1419 return Ok(());
1420 };
1421 if let TimelineDetails::Pending = &in_reply_to.event {
1422 debug!("Replied-to event is already being fetched");
1423 return Ok(());
1424 }
1425 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1426 debug!("Replied-to event has already been fetched");
1427 return Ok(());
1428 }
1429
1430 let internal_id = item.internal_id.to_owned();
1431 let item = item.clone();
1432 let event = fetch_replied_to_event(
1433 state_guard,
1434 &self.state,
1435 index,
1436 &item,
1437 internal_id,
1438 &msglike,
1439 &in_reply_to.event_id,
1440 self.room(),
1441 )
1442 .await?;
1443
1444 let mut state = self.state.write().await;
1447 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1448 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1449
1450 let TimelineItemContent::MsgLike(MsgLikeContent {
1453 kind: MsgLikeKind::Message(message),
1454 reactions,
1455 thread_root,
1456 in_reply_to,
1457 thread_summary,
1458 }) = item.content().clone()
1459 else {
1460 info!("Event is no longer a message (redacted?)");
1461 return Ok(());
1462 };
1463 let Some(in_reply_to) = in_reply_to else {
1464 warn!("Event no longer has a reply (bug?)");
1465 return Ok(());
1466 };
1467
1468 trace!("Updating in-reply-to details");
1471 let internal_id = item.internal_id.to_owned();
1472 let mut item = item.clone();
1473 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1474 kind: MsgLikeKind::Message(message),
1475 reactions,
1476 thread_root,
1477 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1478 thread_summary,
1479 }));
1480 state.items.replace(index, TimelineItem::new(item, internal_id));
1481
1482 Ok(())
1483 }
1484
1485 pub(super) fn infer_thread_for_read_receipt(
1491 &self,
1492 receipt_type: &SendReceiptType,
1493 ) -> ReceiptThread {
1494 if matches!(receipt_type, SendReceiptType::FullyRead) {
1495 ReceiptThread::Unthreaded
1496 } else {
1497 self.focus.receipt_thread()
1498 }
1499 }
1500
1501 pub(super) async fn should_send_receipt(
1505 &self,
1506 receipt_type: &SendReceiptType,
1507 receipt_thread: &ReceiptThread,
1508 event_id: &EventId,
1509 ) -> bool {
1510 let own_user_id = self.room().own_user_id();
1511 let state = self.state.read().await;
1512 let room = self.room();
1513
1514 match receipt_type {
1515 SendReceiptType::Read => {
1516 if let Some((old_pub_read, _)) = state
1517 .meta
1518 .user_receipt(
1519 own_user_id,
1520 ReceiptType::Read,
1521 receipt_thread.clone(),
1522 room,
1523 state.items.all_remote_events(),
1524 )
1525 .await
1526 {
1527 trace!(%old_pub_read, "found a previous public receipt");
1528 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1529 &old_pub_read,
1530 event_id,
1531 state.items.all_remote_events(),
1532 ) {
1533 trace!(
1534 "event referred to new receipt is {relative_pos:?} the previous receipt"
1535 );
1536 return relative_pos == RelativePosition::After;
1537 }
1538 }
1539 }
1540
1541 SendReceiptType::ReadPrivate => {
1544 if let Some((old_priv_read, _)) =
1545 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1546 {
1547 trace!(%old_priv_read, "found a previous private receipt");
1548 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1549 &old_priv_read,
1550 event_id,
1551 state.items.all_remote_events(),
1552 ) {
1553 trace!(
1554 "event referred to new receipt is {relative_pos:?} the previous receipt"
1555 );
1556 return relative_pos == RelativePosition::After;
1557 }
1558 }
1559 }
1560
1561 SendReceiptType::FullyRead => {
1562 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1563 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1564 &prev_event_id,
1565 event_id,
1566 state.items.all_remote_events(),
1567 )
1568 {
1569 return relative_pos == RelativePosition::After;
1570 }
1571 }
1572
1573 _ => {}
1574 }
1575
1576 true
1578 }
1579
1580 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1583 let state = self.state.read().await;
1584 let filter_out_thread_events = match self.focus() {
1585 TimelineFocusKind::Thread { .. } => false,
1586 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
1587 TimelineFocusKind::Event { .. } => {
1588 false
1590 }
1591 TimelineFocusKind::PinnedEvents => true,
1592 };
1593
1594 state
1595 .items
1596 .all_remote_events()
1597 .iter()
1598 .rev()
1599 .filter_map(|event_meta| {
1600 if !filter_out_thread_events {
1601 Some(event_meta.event_id.clone())
1603 } else if event_meta.thread_root_id.is_none() {
1604 if let Some(TimelineEventItemId::EventId(target_event_id)) =
1610 state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1611 event_meta.event_id.clone(),
1612 ))
1613 && let Some(target_meta) =
1614 state.items.all_remote_events().get_by_event_id(target_event_id)
1615 && target_meta.thread_root_id.is_some()
1616 {
1617 None
1619 } else {
1620 Some(event_meta.event_id.clone())
1623 }
1624 } else {
1625 None
1628 }
1629 })
1630 .next()
1631 }
1632
1633 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1634 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1635 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1636
1637 let request = DecryptionRetryRequest {
1638 room_id: self.room().room_id().to_owned(),
1639 utd_session_ids: utds,
1640 refresh_info_session_ids: decrypted,
1641 };
1642
1643 self.room().client().event_cache().request_decryption(request);
1644 }
1645
1646 pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1654 match status {
1655 PaginationStatus::Idle { hit_timeline_start } => {
1656 if hit_timeline_start {
1657 let state = self.state.read().await;
1658 if state.meta.subscriber_skip_count.get() > 0 {
1662 return PaginationStatus::Idle { hit_timeline_start: false };
1663 }
1664 }
1665 }
1666 PaginationStatus::Paginating => {}
1667 }
1668
1669 status
1671 }
1672}
1673
1674impl<P: RoomDataProvider> TimelineController<P> {
1675 pub(super) fn focus(&self) -> &TimelineFocusKind {
1677 &self.focus
1678 }
1679}
1680
1681#[allow(clippy::too_many_arguments)]
1682async fn fetch_replied_to_event<P: RoomDataProvider>(
1683 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1684 state_lock: &RwLock<TimelineState<P>>,
1685 index: usize,
1686 item: &EventTimelineItem,
1687 internal_id: TimelineUniqueId,
1688 msglike: &MsgLikeContent,
1689 in_reply_to: &EventId,
1690 room: &Room,
1691) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1692 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1693 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1694 trace!("Found replied-to event locally");
1695 return Ok(details);
1696 }
1697
1698 trace!("Setting in-reply-to details to pending");
1701 let in_reply_to_details =
1702 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1703
1704 let event_item = item
1705 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1706
1707 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1708 state_guard.items.replace(index, new_timeline_item);
1709
1710 drop(state_guard);
1712
1713 trace!("Fetching replied-to event");
1714 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1715 Ok(timeline_event) => {
1716 let state = state_lock.read().await;
1717
1718 let replied_to_item =
1719 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1720
1721 if let Some(item) = replied_to_item {
1722 TimelineDetails::Ready(Box::new(item))
1723 } else {
1724 return Err(Error::UnsupportedEvent);
1726 }
1727 }
1728
1729 Err(e) => TimelineDetails::Error(Arc::new(e)),
1730 };
1731
1732 Ok(res)
1733}