1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::{VectorDiff, VectorSubscriberStream};
20use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::Result;
25use matrix_sdk::{
26 deserialized_responses::TimelineEvent,
27 event_cache::{RoomEventCache, RoomPaginationStatus},
28 paginators::{PaginationResult, PaginationToken, Paginator},
29 send_queue::{
30 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31 },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40 AnySyncTimelineEvent, MessageLikeEventType,
41 poll::unstable_start::UnstablePollStartEventContent,
42 reaction::ReactionEventContent,
43 receipt::{Receipt, ReceiptThread, ReceiptType},
44 relation::Annotation,
45 room::message::{MessageType, Relation},
46 },
47 room_version_rules::RoomVersionRules,
48 serde::Raw,
49};
50use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54 metadata::{RelativePosition, TimelineMetadata},
55 observable_items::{
56 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57 ObservableItemsTransactionEntry,
58 },
59 state::TimelineState,
60 state_transaction::TimelineStateTransaction,
61};
62use super::{
63 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
66 algorithms::{rfind_event_by_id, rfind_event_item},
67 event_item::{ReactionStatus, RemoteEventOrigin},
68 item::TimelineUniqueId,
69 subscriber::TimelineSubscriber,
70 traits::RoomDataProvider,
71};
72use crate::{
73 timeline::{
74 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75 algorithms::rfind_event_by_item_id,
76 date_dividers::DateDividerAdjuster,
77 event_item::TimelineItemHandle,
78 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79 },
80 unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
94use matrix_sdk_common::serde_helpers::extract_thread_root;
95
96#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
103 Live {
105 hide_threaded_events: bool,
107 },
108
109 Event {
112 paginator: OnceCell<AnyPaginator<P>>,
114 },
115
116 Thread {
118 root_event_id: OwnedEventId,
120 },
121
122 PinnedEvents {
123 loader: PinnedEventsLoader,
124 },
125}
126
127#[derive(Debug)]
128pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
129 Unthreaded {
130 paginator: Paginator<P>,
132 hide_threaded_events: bool,
134 },
135 Threaded(ThreadedEventsLoader<P>),
136}
137
138impl<P: RoomDataProvider> AnyPaginator<P> {
139 pub async fn paginate_backwards(
148 &self,
149 num_events: u16,
150 ) -> Result<PaginationResult, PaginatorError> {
151 match self {
152 Self::Unthreaded { paginator, .. } => {
153 paginator.paginate_backward(num_events.into()).await
154 }
155 Self::Threaded(threaded_paginator) => {
156 threaded_paginator.paginate_backwards(num_events.into()).await
157 }
158 }
159 }
160
161 pub async fn paginate_forwards(
169 &self,
170 num_events: u16,
171 ) -> Result<PaginationResult, PaginatorError> {
172 match self {
173 Self::Unthreaded { paginator, .. } => {
174 paginator.paginate_forward(num_events.into()).await
175 }
176 Self::Threaded(threaded_paginator) => {
177 threaded_paginator.paginate_forwards(num_events.into()).await
178 }
179 }
180 }
181
182 pub fn hide_threaded_events(&self) -> bool {
184 match self {
185 Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
186 Self::Threaded(_) => false,
187 }
188 }
189
190 pub fn thread_root(&self) -> Option<&EventId> {
193 match self {
194 Self::Unthreaded { .. } => None,
195 Self::Threaded(thread_events_loader) => {
196 Some(thread_events_loader.thread_root_event_id())
197 }
198 }
199 }
200}
201
202impl<P: RoomDataProvider> TimelineFocusKind<P> {
203 pub(super) fn receipt_thread(&self) -> ReceiptThread {
210 if let Some(thread_root) = self.thread_root() {
211 ReceiptThread::Thread(thread_root.to_owned())
212 } else if self.hide_threaded_events() {
213 ReceiptThread::Main
214 } else {
215 ReceiptThread::Unthreaded
216 }
217 }
218
219 fn hide_threaded_events(&self) -> bool {
221 match self {
222 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
223 TimelineFocusKind::Event { paginator } => {
224 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
225 }
226 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
227 }
228 }
229
230 fn is_thread(&self) -> bool {
233 self.thread_root().is_some()
234 }
235
236 fn thread_root(&self) -> Option<&EventId> {
238 match self {
239 TimelineFocusKind::Event { paginator, .. } => {
240 paginator.get().and_then(|paginator| paginator.thread_root())
241 }
242 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
243 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
244 }
245 }
246}
247
248#[derive(Clone, Debug)]
249pub(super) struct TimelineController<P: RoomDataProvider = Room> {
250 state: Arc<RwLock<TimelineState<P>>>,
252
253 focus: Arc<TimelineFocusKind<P>>,
255
256 pub(crate) room_data_provider: P,
261
262 pub(super) settings: TimelineSettings,
264
265 decryption_retry_task: DecryptionRetryTask<P, P>,
268}
269
270#[derive(Clone)]
271pub(super) struct TimelineSettings {
272 pub(super) track_read_receipts: bool,
274
275 pub(super) event_filter: Arc<TimelineEventFilterFn>,
278
279 pub(super) add_failed_to_parse: bool,
281
282 pub(super) date_divider_mode: DateDividerMode,
284}
285
286#[cfg(not(tarpaulin_include))]
287impl fmt::Debug for TimelineSettings {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.debug_struct("TimelineSettings")
290 .field("track_read_receipts", &self.track_read_receipts)
291 .field("add_failed_to_parse", &self.add_failed_to_parse)
292 .finish_non_exhaustive()
293 }
294}
295
296impl Default for TimelineSettings {
297 fn default() -> Self {
298 Self {
299 track_read_receipts: false,
300 event_filter: Arc::new(default_event_filter),
301 add_failed_to_parse: true,
302 date_divider_mode: DateDividerMode::Daily,
303 }
304 }
305}
306
307pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
317 match event {
318 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
319 if ev.redacts(&rules.redaction).is_some() {
320 false
323 } else {
324 ev.event_type() != MessageLikeEventType::Reaction
327 }
328 }
329
330 AnySyncTimelineEvent::MessageLike(msg) => {
331 match msg.original_content() {
332 None => {
333 msg.event_type() != MessageLikeEventType::Reaction
336 }
337
338 Some(original_content) => {
339 match original_content {
340 AnyMessageLikeEventContent::RoomMessage(content) => {
341 if content
342 .relates_to
343 .as_ref()
344 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
345 {
346 return false;
348 }
349
350 match content.msgtype {
351 MessageType::Audio(_)
352 | MessageType::Emote(_)
353 | MessageType::File(_)
354 | MessageType::Image(_)
355 | MessageType::Location(_)
356 | MessageType::Notice(_)
357 | MessageType::ServerNotice(_)
358 | MessageType::Text(_)
359 | MessageType::Video(_)
360 | MessageType::VerificationRequest(_) => true,
361 #[cfg(feature = "unstable-msc4274")]
362 MessageType::Gallery(_) => true,
363 _ => false,
364 }
365 }
366
367 AnyMessageLikeEventContent::Sticker(_)
368 | AnyMessageLikeEventContent::UnstablePollStart(
369 UnstablePollStartEventContent::New(_),
370 )
371 | AnyMessageLikeEventContent::CallInvite(_)
372 | AnyMessageLikeEventContent::RtcNotification(_)
373 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
374
375 _ => false,
376 }
377 }
378 }
379 }
380
381 AnySyncTimelineEvent::State(_) => {
382 true
384 }
385 }
386}
387
388impl<P: RoomDataProvider> TimelineController<P> {
389 pub(super) fn new(
390 room_data_provider: P,
391 focus: TimelineFocus,
392 internal_id_prefix: Option<String>,
393 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
394 is_room_encrypted: bool,
395 settings: TimelineSettings,
396 ) -> Self {
397 let focus = match focus {
398 TimelineFocus::Live { hide_threaded_events } => {
399 TimelineFocusKind::Live { hide_threaded_events }
400 }
401
402 TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
403
404 TimelineFocus::Thread { root_event_id, .. } => {
405 TimelineFocusKind::Thread { root_event_id }
406 }
407
408 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
409 TimelineFocusKind::PinnedEvents {
410 loader: PinnedEventsLoader::new(
411 Arc::new(room_data_provider.clone()),
412 max_events_to_load as usize,
413 max_concurrent_requests as usize,
414 ),
415 }
416 }
417 };
418
419 let focus = Arc::new(focus);
420 let state = Arc::new(RwLock::new(TimelineState::new(
421 focus.clone(),
422 room_data_provider.own_user_id().to_owned(),
423 room_data_provider.room_version_rules(),
424 internal_id_prefix,
425 unable_to_decrypt_hook,
426 is_room_encrypted,
427 )));
428
429 let decryption_retry_task =
430 DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
431
432 Self { state, focus, room_data_provider, settings, decryption_retry_task }
433 }
434
435 pub(super) async fn init_focus(
442 &self,
443 focus: &TimelineFocus,
444 room_event_cache: &RoomEventCache,
445 ) -> Result<bool, Error> {
446 match focus {
447 TimelineFocus::Live { .. } => {
448 let events = room_event_cache.events().await;
450
451 let has_events = !events.is_empty();
452
453 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
454
455 match room_event_cache.pagination().status().get() {
456 RoomPaginationStatus::Idle { hit_timeline_start } => {
457 if hit_timeline_start {
458 self.insert_timeline_start_if_missing().await;
461 }
462 }
463 RoomPaginationStatus::Paginating => {}
464 }
465
466 Ok(has_events)
467 }
468
469 TimelineFocus::Event { target: event_id, num_context_events, hide_threaded_events } => {
470 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
471 unreachable!();
473 };
474
475 let event_paginator = Paginator::new(self.room_data_provider.clone());
476
477 let start_from_result = event_paginator
480 .start_from(event_id, (*num_context_events).into())
481 .await
482 .map_err(PaginationError::Paginator)?;
483
484 let thread_root_event_id = start_from_result
486 .events
487 .iter()
488 .find(
489 |event| {
490 if let Some(id) = event.event_id() { id == *event_id } else { false }
491 },
492 )
493 .and_then(|event| extract_thread_root(event.raw()));
494
495 let _ = paginator.set(match thread_root_event_id {
496 Some(root_id) => {
497 let mut tokens = event_paginator.tokens();
498
499 let includes_root_event = start_from_result.events.iter().any(|event| {
503 if let Some(id) = event.event_id() { id == root_id } else { false }
504 });
505
506 if includes_root_event {
507 tokens.previous = PaginationToken::HitEnd;
510 }
511
512 AnyPaginator::Threaded(ThreadedEventsLoader::new(
513 self.room_data_provider.clone(),
514 root_id,
515 tokens,
516 ))
517 }
518
519 None => AnyPaginator::Unthreaded {
520 paginator: event_paginator,
521 hide_threaded_events: *hide_threaded_events,
522 },
523 });
524
525 let has_events = !start_from_result.events.is_empty();
526 let events = start_from_result.events;
527
528 match paginator.get().expect("Paginator was not instantiated") {
529 AnyPaginator::Unthreaded { .. } => {
530 self.replace_with_initial_remote_events(
531 events,
532 RemoteEventOrigin::Pagination,
533 )
534 .await;
535 }
536
537 AnyPaginator::Threaded(threaded_events_loader) => {
538 let thread_root = threaded_events_loader.thread_root_event_id();
541 let events_in_thread = events.into_iter().filter(|event| {
542 extract_thread_root(event.raw())
543 .is_some_and(|event_thread_root| event_thread_root == thread_root)
544 || event.event_id().as_deref() == Some(thread_root)
545 });
546
547 self.replace_with_initial_remote_events(
548 events_in_thread,
549 RemoteEventOrigin::Pagination,
550 )
551 .await;
552 }
553 }
554
555 Ok(has_events)
556 }
557
558 TimelineFocus::Thread { root_event_id, .. } => {
559 let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await;
560 let has_events = !events.is_empty();
561
562 let mut related_events = Vector::new();
566 for event_id in events.iter().filter_map(|event| event.event_id()) {
567 if let Some((_original, related)) =
568 room_event_cache.find_event_with_relations(&event_id, None).await
569 {
570 related_events.extend(related);
571 }
572 }
573
574 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
575
576 if !related_events.is_empty() {
578 self.handle_remote_aggregations(
579 vec![VectorDiff::Append { values: related_events }],
580 RemoteEventOrigin::Cache,
581 )
582 .await;
583 }
584
585 Ok(has_events)
586 }
587
588 TimelineFocus::PinnedEvents { .. } => {
589 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
590 unreachable!();
592 };
593
594 let Some(loaded_events) =
595 loader.load_events().await.map_err(Error::PinnedEventsError)?
596 else {
597 return Ok(false);
599 };
600
601 let has_events = !loaded_events.is_empty();
602
603 self.replace_with_initial_remote_events(
604 loaded_events,
605 RemoteEventOrigin::Pagination,
606 )
607 .await;
608
609 Ok(has_events)
610 }
611 }
612 }
613
614 pub async fn handle_encryption_state_changes(&self) {
619 let mut room_info = self.room_data_provider.room_info();
620
621 let mark_encrypted = || async {
623 let mut state = self.state.write().await;
624 state.meta.is_room_encrypted = true;
625 state.mark_all_events_as_encrypted();
626 };
627
628 if room_info.get().encryption_state().is_encrypted() {
629 mark_encrypted().await;
632 return;
633 }
634
635 while let Some(info) = room_info.next().await {
636 if info.encryption_state().is_encrypted() {
637 mark_encrypted().await;
638 break;
641 }
642 }
643 }
644
645 pub(crate) async fn reload_pinned_events(
646 &self,
647 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
648 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
649 loader.load_events().await
650 } else {
651 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
652 }
653 }
654
655 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
664 let state = self.state.read().await;
665
666 let (count, needs) = state
667 .meta
668 .subscriber_skip_count
669 .compute_next_when_paginating_backwards(num_events.into());
670
671 let is_live_timeline = true;
673 state.meta.subscriber_skip_count.update(count, is_live_timeline);
674
675 needs
676 }
677
678 pub(super) async fn focused_paginate_backwards(
683 &self,
684 num_events: u16,
685 ) -> Result<bool, PaginationError> {
686 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
687 TimelineFocusKind::Live { .. }
688 | TimelineFocusKind::PinnedEvents { .. }
689 | TimelineFocusKind::Thread { .. } => {
690 return Err(PaginationError::NotSupported);
691 }
692 TimelineFocusKind::Event { paginator, .. } => paginator
693 .get()
694 .expect("Paginator was not instantiated")
695 .paginate_backwards(num_events)
696 .await
697 .map_err(PaginationError::Paginator)?,
698 };
699
700 self.handle_remote_events_with_diffs(
703 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
704 RemoteEventOrigin::Pagination,
705 )
706 .await;
707
708 Ok(hit_end_of_timeline)
709 }
710
711 pub(super) async fn focused_paginate_forwards(
716 &self,
717 num_events: u16,
718 ) -> Result<bool, PaginationError> {
719 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
720 TimelineFocusKind::Live { .. }
721 | TimelineFocusKind::PinnedEvents { .. }
722 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
723
724 TimelineFocusKind::Event { paginator, .. } => paginator
725 .get()
726 .expect("Paginator was not instantiated")
727 .paginate_forwards(num_events)
728 .await
729 .map_err(PaginationError::Paginator)?,
730 };
731
732 self.handle_remote_events_with_diffs(
735 vec![VectorDiff::Append { values: events.into() }],
736 RemoteEventOrigin::Pagination,
737 )
738 .await;
739
740 Ok(hit_end_of_timeline)
741 }
742
743 pub(super) fn is_live(&self) -> bool {
745 matches!(&*self.focus, TimelineFocusKind::Live { .. })
746 }
747
748 pub(super) fn is_threaded(&self) -> bool {
750 self.focus.is_thread()
751 }
752
753 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
756 self.focus.thread_root().map(ToOwned::to_owned)
757 }
758
759 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
763 self.state.read().await.items.clone_items()
764 }
765
766 #[cfg(test)]
767 pub(super) async fn subscribe_raw(
768 &self,
769 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
770 self.state.read().await.items.subscribe().into_values_and_stream()
771 }
772
773 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
774 let state = self.state.read().await;
775
776 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
777 }
778
779 pub(super) async fn subscribe_filter_map<U, F>(
780 &self,
781 f: F,
782 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
783 where
784 U: Clone,
785 F: Fn(Arc<TimelineItem>) -> Option<U>,
786 {
787 self.state.read().await.items.subscribe().filter_map(f)
788 }
789
790 #[instrument(skip_all)]
794 pub(super) async fn toggle_reaction_local(
795 &self,
796 item_id: &TimelineEventItemId,
797 key: &str,
798 ) -> Result<bool, Error> {
799 let mut state = self.state.write().await;
800
801 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
802 warn!("Timeline item not found, can't add reaction");
803 return Err(Error::FailedToToggleReaction);
804 };
805
806 let user_id = self.room_data_provider.own_user_id();
807 let prev_status = item
808 .content()
809 .reactions()
810 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
811
812 let Some(prev_status) = prev_status else {
813 match item.handle() {
815 TimelineItemHandle::Local(send_handle) => {
816 if send_handle
817 .react(key.to_owned())
818 .await
819 .map_err(|err| Error::SendQueueError(err.into()))?
820 .is_some()
821 {
822 trace!("adding a reaction to a local echo");
823 return Ok(true);
824 }
825
826 warn!("couldn't toggle reaction for local echo");
827 return Ok(false);
828 }
829
830 TimelineItemHandle::Remote(event_id) => {
831 trace!("adding a reaction to a remote echo");
835 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
836 self.room_data_provider
837 .send(ReactionEventContent::from(annotation).into())
838 .await?;
839 return Ok(true);
840 }
841 }
842 };
843
844 trace!("removing a previous reaction");
845 match prev_status {
846 ReactionStatus::LocalToLocal(send_reaction_handle) => {
847 if let Some(handle) = send_reaction_handle {
848 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
849 warn!("unexpectedly unable to abort sending of local reaction");
852 }
853 } else {
854 warn!("no send reaction handle (this should only happen in testing contexts)");
855 }
856 }
857
858 ReactionStatus::LocalToRemote(send_handle) => {
859 trace!("aborting send of the previous reaction that was a local echo");
862 if let Some(handle) = send_handle {
863 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
864 warn!("unexpectedly unable to abort sending of local reaction");
867 }
868 } else {
869 warn!("no send handle (this should only happen in testing contexts)");
870 }
871 }
872
873 ReactionStatus::RemoteToRemote(event_id) => {
874 let Some(annotated_event_id) =
876 item.as_remote().map(|event_item| event_item.event_id.clone())
877 else {
878 warn!("remote reaction to remote event, but the associated item isn't remote");
879 return Ok(false);
880 };
881
882 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
883 let reaction_info = reactions.remove_reaction(user_id, key);
884
885 if reaction_info.is_some() {
886 let new_item = item.with_reactions(reactions);
887 state.items.replace(item_pos, new_item);
888 } else {
889 warn!(
890 "reaction is missing on the item, not removing it locally, \
891 but sending redaction."
892 );
893 }
894
895 drop(state);
897
898 trace!("sending redact for a previous reaction");
899 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
900 if let Some(reaction_info) = reaction_info {
901 debug!("sending redact failed, adding the reaction back to the list");
902
903 let mut state = self.state.write().await;
904 if let Some((item_pos, item)) =
905 rfind_event_by_id(&state.items, &annotated_event_id)
906 {
907 let mut reactions =
909 item.content().reactions().cloned().unwrap_or_default();
910 reactions
911 .entry(key.to_owned())
912 .or_default()
913 .insert(user_id.to_owned(), reaction_info);
914 let new_item = item.with_reactions(reactions);
915 state.items.replace(item_pos, new_item);
916 } else {
917 warn!(
918 "couldn't find item to re-add reaction anymore; \
919 maybe it's been redacted?"
920 );
921 }
922 }
923
924 return Err(err);
925 }
926 }
927 }
928
929 Ok(false)
930 }
931
932 pub(super) async fn handle_remote_events_with_diffs(
934 &self,
935 diffs: Vec<VectorDiff<TimelineEvent>>,
936 origin: RemoteEventOrigin,
937 ) {
938 if diffs.is_empty() {
939 return;
940 }
941
942 let mut state = self.state.write().await;
943 state
944 .handle_remote_events_with_diffs(
945 diffs,
946 origin,
947 &self.room_data_provider,
948 &self.settings,
949 )
950 .await
951 }
952
953 pub(super) async fn handle_remote_aggregations(
955 &self,
956 diffs: Vec<VectorDiff<TimelineEvent>>,
957 origin: RemoteEventOrigin,
958 ) {
959 if diffs.is_empty() {
960 return;
961 }
962
963 let mut state = self.state.write().await;
964 state
965 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
966 .await
967 }
968
969 pub(super) async fn clear(&self) {
970 self.state.write().await.clear();
971 }
972
973 pub(super) async fn replace_with_initial_remote_events<Events>(
981 &self,
982 events: Events,
983 origin: RemoteEventOrigin,
984 ) where
985 Events: IntoIterator,
986 <Events as IntoIterator>::Item: Into<TimelineEvent>,
987 {
988 let mut state = self.state.write().await;
989
990 let track_read_markers = self.settings.track_read_receipts;
991 if track_read_markers {
992 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
993 state
994 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
995 .await;
996 }
997
998 let mut events = events.into_iter().peekable();
1004 if !state.items.is_empty() || events.peek().is_some() {
1005 state
1006 .replace_with_remote_events(
1007 events,
1008 origin,
1009 &self.room_data_provider,
1010 &self.settings,
1011 )
1012 .await;
1013 }
1014
1015 if track_read_markers {
1016 if let Some(fully_read_event_id) =
1017 self.room_data_provider.load_fully_read_marker().await
1018 {
1019 state.handle_fully_read_marker(fully_read_event_id);
1020 } else if let Some(latest_receipt_event_id) = state
1021 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1022 {
1023 debug!("no `m.fully_read` marker found, falling back to read receipt");
1025 state.handle_fully_read_marker(latest_receipt_event_id);
1026 }
1027 }
1028 }
1029
1030 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1031 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1032 }
1033
1034 pub(super) async fn handle_ephemeral_events(
1035 &self,
1036 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1037 ) {
1038 let mut state = self.state.write().await;
1039 state.handle_ephemeral_events(events, &self.room_data_provider).await;
1040 }
1041
1042 #[instrument(skip_all)]
1044 pub(super) async fn handle_local_event(
1045 &self,
1046 txn_id: OwnedTransactionId,
1047 content: AnyMessageLikeEventContent,
1048 send_handle: Option<SendHandle>,
1049 ) {
1050 let sender = self.room_data_provider.own_user_id().to_owned();
1051 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1052
1053 let date_divider_mode = self.settings.date_divider_mode.clone();
1054
1055 let mut state = self.state.write().await;
1056 state
1057 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1058 .await;
1059 }
1060
1061 #[instrument(skip(self))]
1066 pub(super) async fn update_event_send_state(
1067 &self,
1068 txn_id: &TransactionId,
1069 send_state: EventSendState,
1070 ) {
1071 let mut state = self.state.write().await;
1072 let mut txn = state.transaction();
1073
1074 let new_event_id: Option<&EventId> =
1075 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1076
1077 if rfind_event_item(&txn.items, |it| {
1080 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1081 })
1082 .is_some()
1083 {
1084 trace!("Remote echo received before send-event response");
1086
1087 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1088
1089 if let Some((idx, _)) = local_echo {
1093 warn!("Message echo got duplicated, removing the local one");
1094 txn.items.remove(idx);
1095
1096 let mut adjuster =
1098 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1099 adjuster.run(&mut txn.items, &mut txn.meta);
1100 }
1101
1102 txn.commit();
1103 return;
1104 }
1105
1106 let result = rfind_event_item(&txn.items, |it| {
1108 it.transaction_id() == Some(txn_id)
1109 || new_event_id.is_some()
1110 && it.event_id() == new_event_id
1111 && it.as_local().is_some()
1112 });
1113
1114 let Some((idx, item)) = result else {
1115 if let Some(new_event_id) = new_event_id {
1120 if txn.meta.aggregations.mark_aggregation_as_sent(
1121 txn_id.to_owned(),
1122 new_event_id.to_owned(),
1123 &mut txn.items,
1124 &txn.meta.room_version_rules,
1125 ) {
1126 trace!("Aggregation marked as sent");
1127 txn.commit();
1128 return;
1129 }
1130
1131 trace!("Sent aggregation was not found");
1132 }
1133
1134 warn!("Timeline item not found, can't update send state");
1135 return;
1136 };
1137
1138 let Some(local_item) = item.as_local() else {
1139 warn!("We looked for a local item, but it transitioned to remote.");
1140 return;
1141 };
1142
1143 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1146 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1147 }
1148
1149 if let Some(new_event_id) = new_event_id {
1152 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1153 }
1154
1155 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1156 txn.items.replace(idx, new_item);
1157
1158 txn.commit();
1159 }
1160
1161 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1162 let mut state = self.state.write().await;
1163
1164 if let Some((idx, _)) =
1165 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1166 {
1167 let mut txn = state.transaction();
1168
1169 txn.items.remove(idx);
1170
1171 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1174 adjuster.run(&mut txn.items, &mut txn.meta);
1175
1176 txn.meta.update_read_marker(&mut txn.items);
1177
1178 txn.commit();
1179
1180 debug!("discarded local echo");
1181 return true;
1182 }
1183
1184 let mut txn = state.transaction();
1187
1188 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1190 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1191 &mut txn.items,
1192 ) {
1193 Ok(val) => val,
1194 Err(err) => {
1195 warn!("error when discarding local echo for an aggregation: {err}");
1196 true
1198 }
1199 };
1200
1201 if found_aggregation {
1202 txn.commit();
1203 }
1204
1205 found_aggregation
1206 }
1207
1208 pub(super) async fn replace_local_echo(
1209 &self,
1210 txn_id: &TransactionId,
1211 content: AnyMessageLikeEventContent,
1212 ) -> bool {
1213 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1214 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1219 return false;
1220 };
1221
1222 let mut state = self.state.write().await;
1223 let mut txn = state.transaction();
1224
1225 let Some((idx, prev_item)) =
1226 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1227 else {
1228 debug!("Can't find local echo to replace");
1229 return false;
1230 };
1231
1232 let ti_kind = {
1235 let Some(prev_local_item) = prev_item.as_local() else {
1236 warn!("We looked for a local item, but it transitioned as remote??");
1237 return false;
1238 };
1239 let progress = as_variant!(&prev_local_item.send_state,
1241 EventSendState::NotSentYet { progress } => progress.clone())
1242 .flatten();
1243 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1244 };
1245
1246 let new_item = TimelineItem::new(
1248 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1249 content.msgtype,
1250 content.mentions,
1251 prev_item.content().reactions().cloned().unwrap_or_default(),
1252 prev_item.content().thread_root(),
1253 prev_item.content().in_reply_to(),
1254 prev_item.content().thread_summary(),
1255 )),
1256 prev_item.internal_id.to_owned(),
1257 );
1258
1259 txn.items.replace(idx, new_item);
1260
1261 txn.commit();
1265
1266 debug!("Replaced local echo");
1267 true
1268 }
1269
1270 pub(crate) async fn retry_event_decryption_inner(&self, session_ids: Option<BTreeSet<String>>) {
1271 self.decryption_retry_task
1272 .decrypt(self.room_data_provider.clone(), session_ids, self.settings.clone())
1273 .await;
1274 }
1275
1276 pub(super) async fn set_sender_profiles_pending(&self) {
1277 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1278 }
1279
1280 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1281 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1282 }
1283
1284 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1285 self.state.write().await.items.for_each(|mut entry| {
1286 let Some(event_item) = entry.as_event() else { return };
1287 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1288 let new_item = entry.with_kind(TimelineItemKind::Event(
1289 event_item.with_sender_profile(profile_state.clone()),
1290 ));
1291 ObservableItemsEntry::replace(&mut entry, new_item);
1292 }
1293 });
1294 }
1295
1296 pub(super) async fn update_missing_sender_profiles(&self) {
1297 trace!("Updating missing sender profiles");
1298
1299 let mut state = self.state.write().await;
1300 let mut entries = state.items.entries();
1301 while let Some(mut entry) = entries.next() {
1302 let Some(event_item) = entry.as_event() else { continue };
1303 let event_id = event_item.event_id().map(debug);
1304 let transaction_id = event_item.transaction_id().map(debug);
1305
1306 if event_item.sender_profile().is_ready() {
1307 trace!(event_id, transaction_id, "Profile already set");
1308 continue;
1309 }
1310
1311 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1312 Some(profile) => {
1313 trace!(event_id, transaction_id, "Adding profile");
1314 let updated_item =
1315 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1316 let new_item = entry.with_kind(updated_item);
1317 ObservableItemsEntry::replace(&mut entry, new_item);
1318 }
1319 None => {
1320 if !event_item.sender_profile().is_unavailable() {
1321 trace!(event_id, transaction_id, "Marking profile unavailable");
1322 let updated_item =
1323 event_item.with_sender_profile(TimelineDetails::Unavailable);
1324 let new_item = entry.with_kind(updated_item);
1325 ObservableItemsEntry::replace(&mut entry, new_item);
1326 } else {
1327 debug!(event_id, transaction_id, "Profile already marked unavailable");
1328 }
1329 }
1330 }
1331 }
1332
1333 trace!("Done updating missing sender profiles");
1334 }
1335
1336 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1338 trace!("Forcing update of sender profiles: {sender_ids:?}");
1339
1340 let mut state = self.state.write().await;
1341 let mut entries = state.items.entries();
1342 while let Some(mut entry) = entries.next() {
1343 let Some(event_item) = entry.as_event() else { continue };
1344 if !sender_ids.contains(event_item.sender()) {
1345 continue;
1346 }
1347
1348 let event_id = event_item.event_id().map(debug);
1349 let transaction_id = event_item.transaction_id().map(debug);
1350
1351 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1352 Some(profile) => {
1353 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1354 {
1355 debug!(event_id, transaction_id, "Profile already up-to-date");
1356 } else {
1357 trace!(event_id, transaction_id, "Updating profile");
1358 let updated_item =
1359 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1360 let new_item = entry.with_kind(updated_item);
1361 ObservableItemsEntry::replace(&mut entry, new_item);
1362 }
1363 }
1364 None => {
1365 if !event_item.sender_profile().is_unavailable() {
1366 trace!(event_id, transaction_id, "Marking profile unavailable");
1367 let updated_item =
1368 event_item.with_sender_profile(TimelineDetails::Unavailable);
1369 let new_item = entry.with_kind(updated_item);
1370 ObservableItemsEntry::replace(&mut entry, new_item);
1371 } else {
1372 debug!(event_id, transaction_id, "Profile already marked unavailable");
1373 }
1374 }
1375 }
1376 }
1377
1378 trace!("Done forcing update of sender profiles");
1379 }
1380
1381 #[cfg(test)]
1382 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1383 let own_user_id = self.room_data_provider.own_user_id();
1384 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1385 }
1386
1387 pub(super) async fn latest_user_read_receipt(
1391 &self,
1392 user_id: &UserId,
1393 ) -> Option<(OwnedEventId, Receipt)> {
1394 let receipt_thread = self.focus.receipt_thread();
1395
1396 self.state
1397 .read()
1398 .await
1399 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1400 .await
1401 }
1402
1403 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1406 &self,
1407 user_id: &UserId,
1408 ) -> Option<OwnedEventId> {
1409 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1410 }
1411
1412 pub async fn subscribe_own_user_read_receipts_changed(
1414 &self,
1415 ) -> impl Stream<Item = ()> + use<P> {
1416 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1417 }
1418
1419 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1421 match echo.content {
1422 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1423 let content = match serialized_event.deserialize() {
1424 Ok(d) => d,
1425 Err(err) => {
1426 warn!("error deserializing local echo: {err}");
1427 return;
1428 }
1429 };
1430
1431 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1432 .await;
1433
1434 if let Some(send_error) = send_error {
1435 self.update_event_send_state(
1436 &echo.transaction_id,
1437 EventSendState::SendingFailed {
1438 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1439 send_error,
1440 ))),
1441 is_recoverable: false,
1442 },
1443 )
1444 .await;
1445 }
1446 }
1447
1448 LocalEchoContent::React { key, send_handle, applies_to } => {
1449 self.handle_local_reaction(key, send_handle, applies_to).await;
1450 }
1451 }
1452 }
1453
1454 #[instrument(skip(self, send_handle))]
1456 async fn handle_local_reaction(
1457 &self,
1458 reaction_key: String,
1459 send_handle: SendReactionHandle,
1460 applies_to: OwnedTransactionId,
1461 ) {
1462 let mut state = self.state.write().await;
1463 let mut tr = state.transaction();
1464
1465 let target = TimelineEventItemId::TransactionId(applies_to);
1466
1467 let reaction_txn_id = send_handle.transaction_id().to_owned();
1468 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1469 let aggregation = Aggregation::new(
1470 TimelineEventItemId::TransactionId(reaction_txn_id),
1471 AggregationKind::Reaction {
1472 key: reaction_key.clone(),
1473 sender: self.room_data_provider.own_user_id().to_owned(),
1474 timestamp: MilliSecondsSinceUnixEpoch::now(),
1475 reaction_status,
1476 },
1477 );
1478
1479 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1480 find_item_and_apply_aggregation(
1481 &tr.meta.aggregations,
1482 &mut tr.items,
1483 &target,
1484 aggregation,
1485 &tr.meta.room_version_rules,
1486 );
1487
1488 tr.commit();
1489 }
1490
1491 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1493 match update {
1494 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1495 self.handle_local_echo(echo).await;
1496 }
1497
1498 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1499 if !self.discard_local_echo(&transaction_id).await {
1500 warn!("couldn't find the local echo to discard");
1501 }
1502 }
1503
1504 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1505 let content = match new_content.deserialize() {
1506 Ok(d) => d,
1507 Err(err) => {
1508 warn!("error deserializing local echo (upon edit): {err}");
1509 return;
1510 }
1511 };
1512
1513 if !self.replace_local_echo(&transaction_id, content).await {
1514 warn!("couldn't find the local echo to replace");
1515 }
1516 }
1517
1518 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1519 self.update_event_send_state(
1520 &transaction_id,
1521 EventSendState::SendingFailed { error, is_recoverable },
1522 )
1523 .await;
1524 }
1525
1526 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1527 self.update_event_send_state(
1528 &transaction_id,
1529 EventSendState::NotSentYet { progress: None },
1530 )
1531 .await;
1532 }
1533
1534 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1535 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1536 .await;
1537 }
1538
1539 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1540 self.update_event_send_state(
1541 &related_to,
1542 EventSendState::NotSentYet {
1543 progress: Some(MediaUploadProgress { index, progress }),
1544 },
1545 )
1546 .await;
1547 }
1548 }
1549 }
1550
1551 pub async fn insert_timeline_start_if_missing(&self) {
1554 let mut state = self.state.write().await;
1555 let mut txn = state.transaction();
1556 txn.items.push_timeline_start_if_missing(
1557 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1558 );
1559 txn.commit();
1560 }
1561
1562 pub(super) async fn make_replied_to(
1568 &self,
1569 event: TimelineEvent,
1570 ) -> Result<Option<EmbeddedEvent>, Error> {
1571 let state = self.state.read().await;
1572 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1573 }
1574}
1575
1576impl TimelineController {
1577 pub(super) fn room(&self) -> &Room {
1578 &self.room_data_provider
1579 }
1580
1581 #[instrument(skip(self))]
1584 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1585 let state_guard = self.state.write().await;
1586 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1587 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1588 let remote_item = item
1589 .as_remote()
1590 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1591 .clone();
1592
1593 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1594 debug!("Event is not a message");
1595 return Ok(());
1596 };
1597 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1598 debug!("Event is not a reply");
1599 return Ok(());
1600 };
1601 if let TimelineDetails::Pending = &in_reply_to.event {
1602 debug!("Replied-to event is already being fetched");
1603 return Ok(());
1604 }
1605 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1606 debug!("Replied-to event has already been fetched");
1607 return Ok(());
1608 }
1609
1610 let internal_id = item.internal_id.to_owned();
1611 let item = item.clone();
1612 let event = fetch_replied_to_event(
1613 state_guard,
1614 &self.state,
1615 index,
1616 &item,
1617 internal_id,
1618 &msglike,
1619 &in_reply_to.event_id,
1620 self.room(),
1621 )
1622 .await?;
1623
1624 let mut state = self.state.write().await;
1627 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1628 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1629
1630 let TimelineItemContent::MsgLike(MsgLikeContent {
1633 kind: MsgLikeKind::Message(message),
1634 reactions,
1635 thread_root,
1636 in_reply_to,
1637 thread_summary,
1638 }) = item.content().clone()
1639 else {
1640 info!("Event is no longer a message (redacted?)");
1641 return Ok(());
1642 };
1643 let Some(in_reply_to) = in_reply_to else {
1644 warn!("Event no longer has a reply (bug?)");
1645 return Ok(());
1646 };
1647
1648 trace!("Updating in-reply-to details");
1651 let internal_id = item.internal_id.to_owned();
1652 let mut item = item.clone();
1653 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1654 kind: MsgLikeKind::Message(message),
1655 reactions,
1656 thread_root,
1657 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1658 thread_summary,
1659 }));
1660 state.items.replace(index, TimelineItem::new(item, internal_id));
1661
1662 Ok(())
1663 }
1664
1665 pub(super) fn infer_thread_for_read_receipt(
1671 &self,
1672 receipt_type: &SendReceiptType,
1673 ) -> ReceiptThread {
1674 if matches!(receipt_type, SendReceiptType::FullyRead) {
1675 ReceiptThread::Unthreaded
1676 } else {
1677 self.focus.receipt_thread()
1678 }
1679 }
1680
1681 pub(super) async fn should_send_receipt(
1685 &self,
1686 receipt_type: &SendReceiptType,
1687 receipt_thread: &ReceiptThread,
1688 event_id: &EventId,
1689 ) -> bool {
1690 let own_user_id = self.room().own_user_id();
1691 let state = self.state.read().await;
1692 let room = self.room();
1693
1694 match receipt_type {
1695 SendReceiptType::Read => {
1696 if let Some((old_pub_read, _)) = state
1697 .meta
1698 .user_receipt(
1699 own_user_id,
1700 ReceiptType::Read,
1701 receipt_thread.clone(),
1702 room,
1703 state.items.all_remote_events(),
1704 )
1705 .await
1706 {
1707 trace!(%old_pub_read, "found a previous public receipt");
1708 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1709 &old_pub_read,
1710 event_id,
1711 state.items.all_remote_events(),
1712 ) {
1713 trace!(
1714 "event referred to new receipt is {relative_pos:?} the previous receipt"
1715 );
1716 return relative_pos == RelativePosition::After;
1717 }
1718 }
1719 }
1720
1721 SendReceiptType::ReadPrivate => {
1724 if let Some((old_priv_read, _)) =
1725 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1726 {
1727 trace!(%old_priv_read, "found a previous private receipt");
1728 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1729 &old_priv_read,
1730 event_id,
1731 state.items.all_remote_events(),
1732 ) {
1733 trace!(
1734 "event referred to new receipt is {relative_pos:?} the previous receipt"
1735 );
1736 return relative_pos == RelativePosition::After;
1737 }
1738 }
1739 }
1740
1741 SendReceiptType::FullyRead => {
1742 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1743 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1744 &prev_event_id,
1745 event_id,
1746 state.items.all_remote_events(),
1747 )
1748 {
1749 return relative_pos == RelativePosition::After;
1750 }
1751 }
1752
1753 _ => {}
1754 }
1755
1756 true
1758 }
1759
1760 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1763 let state = self.state.read().await;
1764 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1765 }
1766
1767 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1768 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1769 self.retry_event_decryption_inner(session_ids).await
1770 }
1771
1772 pub(super) async fn map_pagination_status(
1780 &self,
1781 status: RoomPaginationStatus,
1782 ) -> RoomPaginationStatus {
1783 match status {
1784 RoomPaginationStatus::Idle { hit_timeline_start } => {
1785 if hit_timeline_start {
1786 let state = self.state.read().await;
1787 if state.meta.subscriber_skip_count.get() > 0 {
1791 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1792 }
1793 }
1794 }
1795 RoomPaginationStatus::Paginating => {}
1796 }
1797
1798 status
1800 }
1801}
1802
1803impl<P: RoomDataProvider> TimelineController<P> {
1804 pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1806 &self.focus
1807 }
1808}
1809
1810#[allow(clippy::too_many_arguments)]
1811async fn fetch_replied_to_event<P: RoomDataProvider>(
1812 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1813 state_lock: &RwLock<TimelineState<P>>,
1814 index: usize,
1815 item: &EventTimelineItem,
1816 internal_id: TimelineUniqueId,
1817 msglike: &MsgLikeContent,
1818 in_reply_to: &EventId,
1819 room: &Room,
1820) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1821 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1822 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1823 trace!("Found replied-to event locally");
1824 return Ok(details);
1825 }
1826
1827 trace!("Setting in-reply-to details to pending");
1830 let in_reply_to_details =
1831 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1832
1833 let event_item = item
1834 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1835
1836 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1837 state_guard.items.replace(index, new_timeline_item);
1838
1839 drop(state_guard);
1841
1842 trace!("Fetching replied-to event");
1843 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1844 Ok(timeline_event) => {
1845 let state = state_lock.read().await;
1846
1847 let replied_to_item =
1848 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1849
1850 if let Some(item) = replied_to_item {
1851 TimelineDetails::Ready(Box::new(item))
1852 } else {
1853 return Err(Error::UnsupportedEvent);
1855 }
1856 }
1857
1858 Err(e) => TimelineDetails::Error(Arc::new(e)),
1859 };
1860
1861 Ok(res)
1862}