1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25 config::RequestConfig,
26 deserialized_responses::TimelineEvent,
27 event_cache::{DecryptionRetryRequest, PaginationStatus, RoomEventCache},
28 paginators::{PaginationResult, PaginationToken, Paginator},
29 send_queue::{
30 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31 },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40 AnySyncTimelineEvent, MessageLikeEventType,
41 poll::unstable_start::UnstablePollStartEventContent,
42 reaction::ReactionEventContent,
43 receipt::{Receipt, ReceiptThread, ReceiptType},
44 relation::{Annotation, RelationType},
45 room::message::{MessageType, Relation},
46 },
47 room_version_rules::RoomVersionRules,
48 serde::Raw,
49};
50use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54 metadata::{RelativePosition, TimelineMetadata},
55 observable_items::{
56 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57 ObservableItemsTransactionEntry,
58 },
59 state::TimelineState,
60 state_transaction::TimelineStateTransaction,
61};
62use super::{
63 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
66 TimelineReadReceiptTracking, VirtualTimelineItem,
67 algorithms::{rfind_event_by_id, rfind_event_item},
68 event_item::{ReactionStatus, RemoteEventOrigin},
69 item::TimelineUniqueId,
70 subscriber::TimelineSubscriber,
71 traits::RoomDataProvider,
72};
73use crate::{
74 timeline::{
75 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
76 algorithms::rfind_event_by_item_id,
77 controller::decryption_retry_task::compute_redecryption_candidates,
78 date_dividers::DateDividerAdjuster, event_item::TimelineItemHandle,
79 },
80 unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
94use matrix_sdk_common::serde_helpers::extract_thread_root;
95
96#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
103 Live {
105 hide_threaded_events: bool,
107 },
108
109 Event {
112 paginator: OnceCell<AnyPaginator<P>>,
114 },
115
116 Thread {
118 root_event_id: OwnedEventId,
120 },
121
122 PinnedEvents,
123}
124
125#[derive(Debug)]
126pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
127 Unthreaded {
128 paginator: Paginator<P>,
130 hide_threaded_events: bool,
132 },
133 Threaded(ThreadedEventsLoader<P>),
134}
135
136impl<P: RoomDataProvider> AnyPaginator<P> {
137 pub async fn paginate_backwards(
146 &self,
147 num_events: u16,
148 ) -> Result<PaginationResult, PaginatorError> {
149 match self {
150 Self::Unthreaded { paginator, .. } => {
151 paginator.paginate_backward(num_events.into()).await
152 }
153 Self::Threaded(threaded_paginator) => {
154 threaded_paginator.paginate_backwards(num_events.into()).await
155 }
156 }
157 }
158
159 pub async fn paginate_forwards(
167 &self,
168 num_events: u16,
169 ) -> Result<PaginationResult, PaginatorError> {
170 match self {
171 Self::Unthreaded { paginator, .. } => {
172 paginator.paginate_forward(num_events.into()).await
173 }
174 Self::Threaded(threaded_paginator) => {
175 threaded_paginator.paginate_forwards(num_events.into()).await
176 }
177 }
178 }
179
180 pub fn hide_threaded_events(&self) -> bool {
182 match self {
183 Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
184 Self::Threaded(_) => false,
185 }
186 }
187
188 pub fn thread_root(&self) -> Option<&EventId> {
191 match self {
192 Self::Unthreaded { .. } => None,
193 Self::Threaded(thread_events_loader) => {
194 Some(thread_events_loader.thread_root_event_id())
195 }
196 }
197 }
198}
199
200impl<P: RoomDataProvider> TimelineFocusKind<P> {
201 pub(super) fn receipt_thread(&self) -> ReceiptThread {
208 if let Some(thread_root) = self.thread_root() {
209 ReceiptThread::Thread(thread_root.to_owned())
210 } else if self.hide_threaded_events() {
211 ReceiptThread::Main
212 } else {
213 ReceiptThread::Unthreaded
214 }
215 }
216
217 fn hide_threaded_events(&self) -> bool {
219 match self {
220 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
221 TimelineFocusKind::Event { paginator } => {
222 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
223 }
224 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
225 }
226 }
227
228 fn is_thread(&self) -> bool {
231 self.thread_root().is_some()
232 }
233
234 fn thread_root(&self) -> Option<&EventId> {
236 match self {
237 TimelineFocusKind::Event { paginator, .. } => {
238 paginator.get().and_then(|paginator| paginator.thread_root())
239 }
240 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
241 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
242 }
243 }
244}
245
246#[derive(Clone, Debug)]
247pub(super) struct TimelineController<P: RoomDataProvider = Room> {
248 state: Arc<RwLock<TimelineState<P>>>,
250
251 focus: Arc<TimelineFocusKind<P>>,
253
254 pub(crate) room_data_provider: P,
259
260 pub(super) settings: TimelineSettings,
262}
263
264#[derive(Clone)]
265pub(super) struct TimelineSettings {
266 pub(super) track_read_receipts: TimelineReadReceiptTracking,
269
270 pub(super) event_filter: Arc<TimelineEventFilterFn>,
273
274 pub(super) add_failed_to_parse: bool,
276
277 pub(super) date_divider_mode: DateDividerMode,
279}
280
281#[cfg(not(tarpaulin_include))]
282impl fmt::Debug for TimelineSettings {
283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284 f.debug_struct("TimelineSettings")
285 .field("track_read_receipts", &self.track_read_receipts)
286 .field("add_failed_to_parse", &self.add_failed_to_parse)
287 .finish_non_exhaustive()
288 }
289}
290
291impl Default for TimelineSettings {
292 fn default() -> Self {
293 Self {
294 track_read_receipts: TimelineReadReceiptTracking::Disabled,
295 event_filter: Arc::new(default_event_filter),
296 add_failed_to_parse: true,
297 date_divider_mode: DateDividerMode::Daily,
298 }
299 }
300}
301
302pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
312 match event {
313 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
314 if ev.redacts(&rules.redaction).is_some() {
315 false
318 } else {
319 ev.event_type() != MessageLikeEventType::Reaction
322 }
323 }
324
325 AnySyncTimelineEvent::MessageLike(msg) => {
326 match msg.original_content() {
327 None => {
328 msg.event_type() != MessageLikeEventType::Reaction
331 }
332
333 Some(original_content) => {
334 match original_content {
335 AnyMessageLikeEventContent::RoomMessage(content) => {
336 if content
337 .relates_to
338 .as_ref()
339 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
340 {
341 return false;
343 }
344
345 match content.msgtype {
346 MessageType::Audio(_)
347 | MessageType::Emote(_)
348 | MessageType::File(_)
349 | MessageType::Image(_)
350 | MessageType::Location(_)
351 | MessageType::Notice(_)
352 | MessageType::ServerNotice(_)
353 | MessageType::Text(_)
354 | MessageType::Video(_)
355 | MessageType::VerificationRequest(_) => true,
356 #[cfg(feature = "unstable-msc4274")]
357 MessageType::Gallery(_) => true,
358 _ => false,
359 }
360 }
361
362 AnyMessageLikeEventContent::Sticker(_)
363 | AnyMessageLikeEventContent::UnstablePollStart(
364 UnstablePollStartEventContent::New(_),
365 )
366 | AnyMessageLikeEventContent::CallInvite(_)
367 | AnyMessageLikeEventContent::RtcNotification(_)
368 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
369
370 _ => false,
371 }
372 }
373 }
374 }
375
376 AnySyncTimelineEvent::State(_) => {
377 true
379 }
380 }
381}
382
383impl<P: RoomDataProvider> TimelineController<P> {
384 pub(super) fn new(
385 room_data_provider: P,
386 focus: TimelineFocus,
387 internal_id_prefix: Option<String>,
388 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
389 is_room_encrypted: bool,
390 settings: TimelineSettings,
391 ) -> Self {
392 let focus = match focus {
393 TimelineFocus::Live { hide_threaded_events } => {
394 TimelineFocusKind::Live { hide_threaded_events }
395 }
396
397 TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
398
399 TimelineFocus::Thread { root_event_id, .. } => {
400 TimelineFocusKind::Thread { root_event_id }
401 }
402
403 TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
404 };
405
406 let focus = Arc::new(focus);
407 let state = Arc::new(RwLock::new(TimelineState::new(
408 focus.clone(),
409 room_data_provider.own_user_id().to_owned(),
410 room_data_provider.room_version_rules(),
411 internal_id_prefix,
412 unable_to_decrypt_hook,
413 is_room_encrypted,
414 )));
415
416 Self { state, focus, room_data_provider, settings }
417 }
418
419 pub(super) async fn init_focus(
426 &self,
427 focus: &TimelineFocus,
428 room_event_cache: &RoomEventCache,
429 ) -> Result<bool, Error> {
430 match focus {
431 TimelineFocus::Live { .. } => {
432 let events = room_event_cache.events().await?;
434
435 let has_events = !events.is_empty();
436
437 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
438
439 match room_event_cache.pagination().status().get() {
440 PaginationStatus::Idle { hit_timeline_start } => {
441 if hit_timeline_start {
442 self.insert_timeline_start_if_missing().await;
445 }
446 }
447 PaginationStatus::Paginating => {}
448 }
449
450 Ok(has_events)
451 }
452
453 TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
454 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
455 unreachable!();
457 };
458
459 let event_paginator = Paginator::new(self.room_data_provider.clone());
460
461 let load_events_with_context = || async {
462 event_paginator
464 .start_from(event_id, (*num_context_events).into())
465 .await
466 .map(|r| r.events)
467 .map_err(PaginationError::Paginator)
468 };
469
470 let events = if *num_context_events == 0 {
471 let request_config = Some(RequestConfig::default().retry_limit(3));
474 let relations_filter =
475 Some(vec![RelationType::Annotation, RelationType::Replacement]);
476
477 match self
479 .room_data_provider
480 .load_event_with_relations(event_id, request_config, relations_filter)
481 .await
482 {
483 Ok((event, related_events)) => {
484 let mut events = vec![event];
485 events.extend(related_events);
486 events
487 }
488 Err(err) => {
489 error!("error when loading focussed event: {err}");
490 load_events_with_context().await?
492 }
493 }
494 } else {
495 load_events_with_context().await?
497 };
498
499 let extracted_thread_root = events
501 .iter()
502 .find(
503 |event| {
504 if let Some(id) = event.event_id() { id == *event_id } else { false }
505 },
506 )
507 .and_then(|event| extract_thread_root(event.raw()));
508
509 let (thread_root_event_id, hide_threaded_events) = match thread_mode {
511 TimelineEventFocusThreadMode::ForceThread => {
512 (extracted_thread_root.or_else(|| Some(event_id.clone())), false)
515 }
516 TimelineEventFocusThreadMode::Automatic { hide_threaded_events } => {
517 (extracted_thread_root, *hide_threaded_events)
518 }
519 };
520
521 let _ = paginator.set(match thread_root_event_id {
522 Some(root_id) => {
523 let mut tokens = event_paginator.tokens();
524
525 let includes_root_event = events.iter().any(|event| {
529 if let Some(id) = event.event_id() { id == root_id } else { false }
530 });
531
532 if includes_root_event {
533 tokens.previous = PaginationToken::HitEnd;
536 }
537
538 AnyPaginator::Threaded(ThreadedEventsLoader::new(
539 self.room_data_provider.clone(),
540 root_id,
541 tokens,
542 ))
543 }
544
545 None => AnyPaginator::Unthreaded {
546 paginator: event_paginator,
547 hide_threaded_events,
548 },
549 });
550
551 let has_events = !events.is_empty();
552
553 match paginator.get().expect("Paginator was not instantiated") {
554 AnyPaginator::Unthreaded { .. } => {
555 self.replace_with_initial_remote_events(
556 events,
557 RemoteEventOrigin::Pagination,
558 )
559 .await;
560 }
561
562 AnyPaginator::Threaded(threaded_events_loader) => {
563 let thread_root = threaded_events_loader.thread_root_event_id();
566 let events_in_thread = events.into_iter().filter(|event| {
567 extract_thread_root(event.raw())
568 .is_some_and(|event_thread_root| event_thread_root == thread_root)
569 || event.event_id().as_deref() == Some(thread_root)
570 });
571
572 self.replace_with_initial_remote_events(
573 events_in_thread,
574 RemoteEventOrigin::Pagination,
575 )
576 .await;
577 }
578 }
579
580 Ok(has_events)
581 }
582
583 TimelineFocus::Thread { root_event_id, .. } => {
584 self.init_with_thread_root(root_event_id, room_event_cache).await
585 }
586
587 TimelineFocus::PinnedEvents => {
588 let (initial_events, _update_receiver) =
589 room_event_cache.subscribe_to_pinned_events().await?;
590
591 let has_events = !initial_events.is_empty();
592
593 self.replace_with_initial_remote_events(
594 initial_events,
595 RemoteEventOrigin::Pagination,
596 )
597 .await;
598
599 Ok(has_events)
600 }
601 }
602 }
603
604 pub(super) async fn init_with_thread_root(
609 &self,
610 root_event_id: &OwnedEventId,
611 room_event_cache: &RoomEventCache,
612 ) -> Result<bool, Error> {
613 let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
614 let has_events = !events.is_empty();
615
616 let mut related_events = Vector::new();
620 for event_id in events.iter().filter_map(|event| event.event_id()) {
621 if let Some((_original, related)) =
622 room_event_cache.find_event_with_relations(&event_id, None).await?
623 {
624 related_events.extend(related);
625 }
626 }
627
628 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
629
630 if !related_events.is_empty() {
632 self.handle_remote_aggregations(
633 vec![VectorDiff::Append { values: related_events }],
634 RemoteEventOrigin::Cache,
635 )
636 .await;
637 }
638
639 Ok(has_events)
640 }
641
642 pub async fn handle_encryption_state_changes(&self) {
647 let mut room_info = self.room_data_provider.room_info();
648
649 let mark_encrypted = || async {
651 let mut state = self.state.write().await;
652 state.meta.is_room_encrypted = true;
653 state.mark_all_events_as_encrypted();
654 };
655
656 if room_info.get().encryption_state().is_encrypted() {
657 mark_encrypted().await;
660 return;
661 }
662
663 while let Some(info) = room_info.next().await {
664 if info.encryption_state().is_encrypted() {
665 mark_encrypted().await;
666 break;
669 }
670 }
671 }
672
673 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
682 let state = self.state.read().await;
683
684 let (count, needs) = state
685 .meta
686 .subscriber_skip_count
687 .compute_next_when_paginating_backwards(num_events.into());
688
689 let is_live_timeline = true;
691 state.meta.subscriber_skip_count.update(count, is_live_timeline);
692
693 needs
694 }
695
696 pub(super) async fn focused_paginate_backwards(
701 &self,
702 num_events: u16,
703 ) -> Result<bool, PaginationError> {
704 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
705 TimelineFocusKind::Live { .. }
706 | TimelineFocusKind::PinnedEvents
707 | TimelineFocusKind::Thread { .. } => {
708 return Err(PaginationError::NotSupported);
709 }
710 TimelineFocusKind::Event { paginator, .. } => paginator
711 .get()
712 .expect("Paginator was not instantiated")
713 .paginate_backwards(num_events)
714 .await
715 .map_err(PaginationError::Paginator)?,
716 };
717
718 self.handle_remote_events_with_diffs(
721 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
722 RemoteEventOrigin::Pagination,
723 )
724 .await;
725
726 Ok(hit_end_of_timeline)
727 }
728
729 pub(super) async fn focused_paginate_forwards(
734 &self,
735 num_events: u16,
736 ) -> Result<bool, PaginationError> {
737 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
738 TimelineFocusKind::Live { .. }
739 | TimelineFocusKind::PinnedEvents
740 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
741
742 TimelineFocusKind::Event { paginator, .. } => paginator
743 .get()
744 .expect("Paginator was not instantiated")
745 .paginate_forwards(num_events)
746 .await
747 .map_err(PaginationError::Paginator)?,
748 };
749
750 self.handle_remote_events_with_diffs(
753 vec![VectorDiff::Append { values: events.into() }],
754 RemoteEventOrigin::Pagination,
755 )
756 .await;
757
758 Ok(hit_end_of_timeline)
759 }
760
761 pub(super) fn is_live(&self) -> bool {
763 matches!(&*self.focus, TimelineFocusKind::Live { .. })
764 }
765
766 pub(super) fn is_threaded(&self) -> bool {
768 self.focus.is_thread()
769 }
770
771 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
774 self.focus.thread_root().map(ToOwned::to_owned)
775 }
776
777 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
781 self.state.read().await.items.clone_items()
782 }
783
784 #[cfg(test)]
785 pub(super) async fn subscribe_raw(
786 &self,
787 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
788 self.state.read().await.items.subscribe().into_values_and_stream()
789 }
790
791 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
792 let state = self.state.read().await;
793
794 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
795 }
796
797 pub(super) async fn subscribe_filter_map<U, F>(
798 &self,
799 f: F,
800 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
801 where
802 U: Clone,
803 F: Fn(Arc<TimelineItem>) -> Option<U>,
804 {
805 self.state.read().await.items.subscribe().filter_map(f)
806 }
807
808 #[instrument(skip_all)]
812 pub(super) async fn toggle_reaction_local(
813 &self,
814 item_id: &TimelineEventItemId,
815 key: &str,
816 ) -> Result<bool, Error> {
817 let mut state = self.state.write().await;
818
819 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
820 warn!("Timeline item not found, can't add reaction");
821 return Err(Error::FailedToToggleReaction);
822 };
823
824 let user_id = self.room_data_provider.own_user_id();
825 let prev_status = item
826 .content()
827 .reactions()
828 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
829
830 let Some(prev_status) = prev_status else {
831 match item.handle() {
833 TimelineItemHandle::Local(send_handle) => {
834 if send_handle
835 .react(key.to_owned())
836 .await
837 .map_err(|err| Error::SendQueueError(err.into()))?
838 .is_some()
839 {
840 trace!("adding a reaction to a local echo");
841 return Ok(true);
842 }
843
844 warn!("couldn't toggle reaction for local echo");
845 return Ok(false);
846 }
847
848 TimelineItemHandle::Remote(event_id) => {
849 trace!("adding a reaction to a remote echo");
853 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
854 self.room_data_provider
855 .send(ReactionEventContent::from(annotation).into())
856 .await?;
857 return Ok(true);
858 }
859 }
860 };
861
862 trace!("removing a previous reaction");
863 match prev_status {
864 ReactionStatus::LocalToLocal(send_reaction_handle) => {
865 if let Some(handle) = send_reaction_handle {
866 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
867 warn!("unexpectedly unable to abort sending of local reaction");
870 }
871 } else {
872 warn!("no send reaction handle (this should only happen in testing contexts)");
873 }
874 }
875
876 ReactionStatus::LocalToRemote(send_handle) => {
877 trace!("aborting send of the previous reaction that was a local echo");
880 if let Some(handle) = send_handle {
881 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
882 warn!("unexpectedly unable to abort sending of local reaction");
885 }
886 } else {
887 warn!("no send handle (this should only happen in testing contexts)");
888 }
889 }
890
891 ReactionStatus::RemoteToRemote(event_id) => {
892 let Some(annotated_event_id) =
894 item.as_remote().map(|event_item| event_item.event_id.clone())
895 else {
896 warn!("remote reaction to remote event, but the associated item isn't remote");
897 return Ok(false);
898 };
899
900 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
901 let reaction_info = reactions.remove_reaction(user_id, key);
902
903 if reaction_info.is_some() {
904 let new_item = item.with_reactions(reactions);
905 state.items.replace(item_pos, new_item);
906 } else {
907 warn!(
908 "reaction is missing on the item, not removing it locally, \
909 but sending redaction."
910 );
911 }
912
913 drop(state);
915
916 trace!("sending redact for a previous reaction");
917 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
918 if let Some(reaction_info) = reaction_info {
919 debug!("sending redact failed, adding the reaction back to the list");
920
921 let mut state = self.state.write().await;
922 if let Some((item_pos, item)) =
923 rfind_event_by_id(&state.items, &annotated_event_id)
924 {
925 let mut reactions =
927 item.content().reactions().cloned().unwrap_or_default();
928 reactions
929 .entry(key.to_owned())
930 .or_default()
931 .insert(user_id.to_owned(), reaction_info);
932 let new_item = item.with_reactions(reactions);
933 state.items.replace(item_pos, new_item);
934 } else {
935 warn!(
936 "couldn't find item to re-add reaction anymore; \
937 maybe it's been redacted?"
938 );
939 }
940 }
941
942 return Err(err);
943 }
944 }
945 }
946
947 Ok(false)
948 }
949
950 pub(super) async fn handle_remote_events_with_diffs(
952 &self,
953 diffs: Vec<VectorDiff<TimelineEvent>>,
954 origin: RemoteEventOrigin,
955 ) {
956 if diffs.is_empty() {
957 return;
958 }
959
960 let mut state = self.state.write().await;
961 state
962 .handle_remote_events_with_diffs(
963 diffs,
964 origin,
965 &self.room_data_provider,
966 &self.settings,
967 )
968 .await
969 }
970
971 pub(super) async fn handle_remote_aggregations(
973 &self,
974 diffs: Vec<VectorDiff<TimelineEvent>>,
975 origin: RemoteEventOrigin,
976 ) {
977 if diffs.is_empty() {
978 return;
979 }
980
981 let mut state = self.state.write().await;
982 state
983 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
984 .await
985 }
986
987 pub(super) async fn clear(&self) {
988 self.state.write().await.clear();
989 }
990
991 pub(super) async fn replace_with_initial_remote_events<Events>(
999 &self,
1000 events: Events,
1001 origin: RemoteEventOrigin,
1002 ) where
1003 Events: IntoIterator,
1004 <Events as IntoIterator>::Item: Into<TimelineEvent>,
1005 {
1006 let mut state = self.state.write().await;
1007
1008 let track_read_markers = &self.settings.track_read_receipts;
1009 if track_read_markers.is_enabled() {
1010 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
1011 state
1012 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
1013 .await;
1014 }
1015
1016 let mut events = events.into_iter().peekable();
1022 if !state.items.is_empty() || events.peek().is_some() {
1023 state
1024 .replace_with_remote_events(
1025 events,
1026 origin,
1027 &self.room_data_provider,
1028 &self.settings,
1029 )
1030 .await;
1031 }
1032
1033 if track_read_markers.is_enabled() {
1034 if let Some(fully_read_event_id) =
1035 self.room_data_provider.load_fully_read_marker().await
1036 {
1037 state.handle_fully_read_marker(fully_read_event_id);
1038 } else if let Some(latest_receipt_event_id) = state
1039 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1040 {
1041 debug!("no `m.fully_read` marker found, falling back to read receipt");
1043 state.handle_fully_read_marker(latest_receipt_event_id);
1044 }
1045 }
1046 }
1047
1048 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1049 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1050 }
1051
1052 pub(super) async fn handle_ephemeral_events(
1053 &self,
1054 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1055 ) {
1056 if events.is_empty() {
1058 return;
1059 }
1060 let mut state = self.state.write().await;
1061 state.handle_ephemeral_events(events, &self.room_data_provider).await;
1062 }
1063
1064 #[instrument(skip_all)]
1066 pub(super) async fn handle_local_event(
1067 &self,
1068 txn_id: OwnedTransactionId,
1069 content: AnyMessageLikeEventContent,
1070 send_handle: Option<SendHandle>,
1071 ) {
1072 let sender = self.room_data_provider.own_user_id().to_owned();
1073 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1074
1075 let date_divider_mode = self.settings.date_divider_mode.clone();
1076
1077 let mut state = self.state.write().await;
1078 state
1079 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1080 .await;
1081 }
1082
1083 #[instrument(skip(self))]
1088 pub(super) async fn update_event_send_state(
1089 &self,
1090 txn_id: &TransactionId,
1091 send_state: EventSendState,
1092 ) {
1093 let mut state = self.state.write().await;
1094 let mut txn = state.transaction();
1095
1096 let new_event_id: Option<&EventId> =
1097 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1098
1099 if rfind_event_item(&txn.items, |it| {
1102 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1103 })
1104 .is_some()
1105 {
1106 trace!("Remote echo received before send-event response");
1108
1109 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1110
1111 if let Some((idx, _)) = local_echo {
1115 warn!("Message echo got duplicated, removing the local one");
1116 txn.items.remove(idx);
1117
1118 let mut adjuster =
1120 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1121 adjuster.run(&mut txn.items, &mut txn.meta);
1122 }
1123
1124 txn.commit();
1125 return;
1126 }
1127
1128 let result = rfind_event_item(&txn.items, |it| {
1130 it.transaction_id() == Some(txn_id)
1131 || new_event_id.is_some()
1132 && it.event_id() == new_event_id
1133 && it.as_local().is_some()
1134 });
1135
1136 let Some((idx, item)) = result else {
1137 if let Some(new_event_id) = new_event_id {
1142 if txn.meta.aggregations.mark_aggregation_as_sent(
1143 txn_id.to_owned(),
1144 new_event_id.to_owned(),
1145 &mut txn.items,
1146 &txn.meta.room_version_rules,
1147 ) {
1148 trace!("Aggregation marked as sent");
1149 txn.commit();
1150 return;
1151 }
1152
1153 trace!("Sent aggregation was not found");
1154 }
1155
1156 warn!("Timeline item not found, can't update send state");
1157 return;
1158 };
1159
1160 let Some(local_item) = item.as_local() else {
1161 warn!("We looked for a local item, but it transitioned to remote.");
1162 return;
1163 };
1164
1165 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1168 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1169 }
1170
1171 if let Some(new_event_id) = new_event_id {
1174 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1175 }
1176
1177 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1178 txn.items.replace(idx, new_item);
1179
1180 txn.commit();
1181 }
1182
1183 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1184 let mut state = self.state.write().await;
1185
1186 if let Some((idx, _)) =
1187 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1188 {
1189 let mut txn = state.transaction();
1190
1191 txn.items.remove(idx);
1192
1193 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1196 adjuster.run(&mut txn.items, &mut txn.meta);
1197
1198 txn.meta.update_read_marker(&mut txn.items);
1199
1200 txn.commit();
1201
1202 debug!("discarded local echo");
1203 return true;
1204 }
1205
1206 let mut txn = state.transaction();
1209
1210 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1212 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1213 &mut txn.items,
1214 ) {
1215 Ok(val) => val,
1216 Err(err) => {
1217 warn!("error when discarding local echo for an aggregation: {err}");
1218 true
1220 }
1221 };
1222
1223 if found_aggregation {
1224 txn.commit();
1225 }
1226
1227 found_aggregation
1228 }
1229
1230 pub(super) async fn replace_local_echo(
1231 &self,
1232 txn_id: &TransactionId,
1233 content: AnyMessageLikeEventContent,
1234 ) -> bool {
1235 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1236 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1241 return false;
1242 };
1243
1244 let mut state = self.state.write().await;
1245 let mut txn = state.transaction();
1246
1247 let Some((idx, prev_item)) =
1248 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1249 else {
1250 debug!("Can't find local echo to replace");
1251 return false;
1252 };
1253
1254 let ti_kind = {
1257 let Some(prev_local_item) = prev_item.as_local() else {
1258 warn!("We looked for a local item, but it transitioned as remote??");
1259 return false;
1260 };
1261 let progress = as_variant!(&prev_local_item.send_state,
1263 EventSendState::NotSentYet { progress } => progress.clone())
1264 .flatten();
1265 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1266 };
1267
1268 let new_item = TimelineItem::new(
1270 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1271 content.msgtype,
1272 content.mentions,
1273 prev_item.content().reactions().cloned().unwrap_or_default(),
1274 prev_item.content().thread_root(),
1275 prev_item.content().in_reply_to(),
1276 prev_item.content().thread_summary(),
1277 )),
1278 prev_item.internal_id.to_owned(),
1279 );
1280
1281 txn.items.replace(idx, new_item);
1282
1283 txn.commit();
1287
1288 debug!("Replaced local echo");
1289 true
1290 }
1291
1292 pub(super) async fn compute_redecryption_candidates(
1293 &self,
1294 ) -> (BTreeSet<String>, BTreeSet<String>) {
1295 let state = self.state.read().await;
1296 compute_redecryption_candidates(&state.items)
1297 }
1298
1299 pub(super) async fn set_sender_profiles_pending(&self) {
1300 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1301 }
1302
1303 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1304 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1305 }
1306
1307 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1308 self.state.write().await.items.for_each(|mut entry| {
1309 let Some(event_item) = entry.as_event() else { return };
1310 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1311 let new_item = entry.with_kind(TimelineItemKind::Event(
1312 event_item.with_sender_profile(profile_state.clone()),
1313 ));
1314 ObservableItemsEntry::replace(&mut entry, new_item);
1315 }
1316 });
1317 }
1318
1319 pub(super) async fn update_missing_sender_profiles(&self) {
1320 trace!("Updating missing sender profiles");
1321
1322 let mut state = self.state.write().await;
1323 let mut entries = state.items.entries();
1324 while let Some(mut entry) = entries.next() {
1325 let Some(event_item) = entry.as_event() else { continue };
1326 let event_id = event_item.event_id().map(debug);
1327 let transaction_id = event_item.transaction_id().map(debug);
1328
1329 if event_item.sender_profile().is_ready() {
1330 trace!(event_id, transaction_id, "Profile already set");
1331 continue;
1332 }
1333
1334 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1335 Some(profile) => {
1336 trace!(event_id, transaction_id, "Adding profile");
1337 let updated_item =
1338 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1339 let new_item = entry.with_kind(updated_item);
1340 ObservableItemsEntry::replace(&mut entry, new_item);
1341 }
1342 None => {
1343 if !event_item.sender_profile().is_unavailable() {
1344 trace!(event_id, transaction_id, "Marking profile unavailable");
1345 let updated_item =
1346 event_item.with_sender_profile(TimelineDetails::Unavailable);
1347 let new_item = entry.with_kind(updated_item);
1348 ObservableItemsEntry::replace(&mut entry, new_item);
1349 } else {
1350 debug!(event_id, transaction_id, "Profile already marked unavailable");
1351 }
1352 }
1353 }
1354 }
1355
1356 trace!("Done updating missing sender profiles");
1357 }
1358
1359 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1361 trace!("Forcing update of sender profiles: {sender_ids:?}");
1362
1363 let mut state = self.state.write().await;
1364 let mut entries = state.items.entries();
1365 while let Some(mut entry) = entries.next() {
1366 let Some(event_item) = entry.as_event() else { continue };
1367 if !sender_ids.contains(event_item.sender()) {
1368 continue;
1369 }
1370
1371 let event_id = event_item.event_id().map(debug);
1372 let transaction_id = event_item.transaction_id().map(debug);
1373
1374 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1375 Some(profile) => {
1376 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1377 {
1378 debug!(event_id, transaction_id, "Profile already up-to-date");
1379 } else {
1380 trace!(event_id, transaction_id, "Updating profile");
1381 let updated_item =
1382 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1383 let new_item = entry.with_kind(updated_item);
1384 ObservableItemsEntry::replace(&mut entry, new_item);
1385 }
1386 }
1387 None => {
1388 if !event_item.sender_profile().is_unavailable() {
1389 trace!(event_id, transaction_id, "Marking profile unavailable");
1390 let updated_item =
1391 event_item.with_sender_profile(TimelineDetails::Unavailable);
1392 let new_item = entry.with_kind(updated_item);
1393 ObservableItemsEntry::replace(&mut entry, new_item);
1394 } else {
1395 debug!(event_id, transaction_id, "Profile already marked unavailable");
1396 }
1397 }
1398 }
1399 }
1400
1401 trace!("Done forcing update of sender profiles");
1402 }
1403
1404 #[cfg(test)]
1405 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1406 let own_user_id = self.room_data_provider.own_user_id();
1407 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1408 }
1409
1410 pub(super) async fn latest_user_read_receipt(
1414 &self,
1415 user_id: &UserId,
1416 ) -> Option<(OwnedEventId, Receipt)> {
1417 let receipt_thread = self.focus.receipt_thread();
1418
1419 self.state
1420 .read()
1421 .await
1422 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1423 .await
1424 }
1425
1426 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1429 &self,
1430 user_id: &UserId,
1431 ) -> Option<OwnedEventId> {
1432 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1433 }
1434
1435 pub async fn subscribe_own_user_read_receipts_changed(
1437 &self,
1438 ) -> impl Stream<Item = ()> + use<P> {
1439 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1440 }
1441
1442 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1444 match echo.content {
1445 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1446 let content = match serialized_event.deserialize() {
1447 Ok(d) => d,
1448 Err(err) => {
1449 warn!("error deserializing local echo: {err}");
1450 return;
1451 }
1452 };
1453
1454 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1455 .await;
1456
1457 if let Some(send_error) = send_error {
1458 self.update_event_send_state(
1459 &echo.transaction_id,
1460 EventSendState::SendingFailed {
1461 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1462 send_error,
1463 ))),
1464 is_recoverable: false,
1465 },
1466 )
1467 .await;
1468 }
1469 }
1470
1471 LocalEchoContent::React { key, send_handle, applies_to } => {
1472 self.handle_local_reaction(key, send_handle, applies_to).await;
1473 }
1474 }
1475 }
1476
1477 #[instrument(skip(self, send_handle))]
1479 async fn handle_local_reaction(
1480 &self,
1481 reaction_key: String,
1482 send_handle: SendReactionHandle,
1483 applies_to: OwnedTransactionId,
1484 ) {
1485 let mut state = self.state.write().await;
1486 let mut tr = state.transaction();
1487
1488 let target = TimelineEventItemId::TransactionId(applies_to);
1489
1490 let reaction_txn_id = send_handle.transaction_id().to_owned();
1491 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1492 let aggregation = Aggregation::new(
1493 TimelineEventItemId::TransactionId(reaction_txn_id),
1494 AggregationKind::Reaction {
1495 key: reaction_key.clone(),
1496 sender: self.room_data_provider.own_user_id().to_owned(),
1497 timestamp: MilliSecondsSinceUnixEpoch::now(),
1498 reaction_status,
1499 },
1500 );
1501
1502 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1503 find_item_and_apply_aggregation(
1504 &tr.meta.aggregations,
1505 &mut tr.items,
1506 &target,
1507 aggregation,
1508 &tr.meta.room_version_rules,
1509 );
1510
1511 tr.commit();
1512 }
1513
1514 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1516 match update {
1517 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1518 self.handle_local_echo(echo).await;
1519 }
1520
1521 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1522 if !self.discard_local_echo(&transaction_id).await {
1523 warn!("couldn't find the local echo to discard");
1524 }
1525 }
1526
1527 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1528 let content = match new_content.deserialize() {
1529 Ok(d) => d,
1530 Err(err) => {
1531 warn!("error deserializing local echo (upon edit): {err}");
1532 return;
1533 }
1534 };
1535
1536 if !self.replace_local_echo(&transaction_id, content).await {
1537 warn!("couldn't find the local echo to replace");
1538 }
1539 }
1540
1541 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1542 self.update_event_send_state(
1543 &transaction_id,
1544 EventSendState::SendingFailed { error, is_recoverable },
1545 )
1546 .await;
1547 }
1548
1549 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1550 self.update_event_send_state(
1551 &transaction_id,
1552 EventSendState::NotSentYet { progress: None },
1553 )
1554 .await;
1555 }
1556
1557 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1558 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1559 .await;
1560 }
1561
1562 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1563 self.update_event_send_state(
1564 &related_to,
1565 EventSendState::NotSentYet {
1566 progress: Some(MediaUploadProgress { index, progress }),
1567 },
1568 )
1569 .await;
1570 }
1571 }
1572 }
1573
1574 pub async fn insert_timeline_start_if_missing(&self) {
1577 let mut state = self.state.write().await;
1578 let mut txn = state.transaction();
1579 txn.items.push_timeline_start_if_missing(
1580 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1581 );
1582 txn.commit();
1583 }
1584
1585 pub(super) async fn make_replied_to(
1591 &self,
1592 event: TimelineEvent,
1593 ) -> Result<Option<EmbeddedEvent>, Error> {
1594 let state = self.state.read().await;
1595 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1596 }
1597}
1598
1599impl TimelineController {
1600 pub(super) fn room(&self) -> &Room {
1601 &self.room_data_provider
1602 }
1603
1604 #[instrument(skip(self))]
1607 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1608 let state_guard = self.state.write().await;
1609 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1610 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1611 let remote_item = item
1612 .as_remote()
1613 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1614 .clone();
1615
1616 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1617 debug!("Event is not a message");
1618 return Ok(());
1619 };
1620 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1621 debug!("Event is not a reply");
1622 return Ok(());
1623 };
1624 if let TimelineDetails::Pending = &in_reply_to.event {
1625 debug!("Replied-to event is already being fetched");
1626 return Ok(());
1627 }
1628 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1629 debug!("Replied-to event has already been fetched");
1630 return Ok(());
1631 }
1632
1633 let internal_id = item.internal_id.to_owned();
1634 let item = item.clone();
1635 let event = fetch_replied_to_event(
1636 state_guard,
1637 &self.state,
1638 index,
1639 &item,
1640 internal_id,
1641 &msglike,
1642 &in_reply_to.event_id,
1643 self.room(),
1644 )
1645 .await?;
1646
1647 let mut state = self.state.write().await;
1650 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1651 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1652
1653 let TimelineItemContent::MsgLike(MsgLikeContent {
1656 kind: MsgLikeKind::Message(message),
1657 reactions,
1658 thread_root,
1659 in_reply_to,
1660 thread_summary,
1661 }) = item.content().clone()
1662 else {
1663 info!("Event is no longer a message (redacted?)");
1664 return Ok(());
1665 };
1666 let Some(in_reply_to) = in_reply_to else {
1667 warn!("Event no longer has a reply (bug?)");
1668 return Ok(());
1669 };
1670
1671 trace!("Updating in-reply-to details");
1674 let internal_id = item.internal_id.to_owned();
1675 let mut item = item.clone();
1676 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1677 kind: MsgLikeKind::Message(message),
1678 reactions,
1679 thread_root,
1680 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1681 thread_summary,
1682 }));
1683 state.items.replace(index, TimelineItem::new(item, internal_id));
1684
1685 Ok(())
1686 }
1687
1688 pub(super) fn infer_thread_for_read_receipt(
1694 &self,
1695 receipt_type: &SendReceiptType,
1696 ) -> ReceiptThread {
1697 if matches!(receipt_type, SendReceiptType::FullyRead) {
1698 ReceiptThread::Unthreaded
1699 } else {
1700 self.focus.receipt_thread()
1701 }
1702 }
1703
1704 pub(super) async fn should_send_receipt(
1708 &self,
1709 receipt_type: &SendReceiptType,
1710 receipt_thread: &ReceiptThread,
1711 event_id: &EventId,
1712 ) -> bool {
1713 let own_user_id = self.room().own_user_id();
1714 let state = self.state.read().await;
1715 let room = self.room();
1716
1717 match receipt_type {
1718 SendReceiptType::Read => {
1719 if let Some((old_pub_read, _)) = state
1720 .meta
1721 .user_receipt(
1722 own_user_id,
1723 ReceiptType::Read,
1724 receipt_thread.clone(),
1725 room,
1726 state.items.all_remote_events(),
1727 )
1728 .await
1729 {
1730 trace!(%old_pub_read, "found a previous public receipt");
1731 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1732 &old_pub_read,
1733 event_id,
1734 state.items.all_remote_events(),
1735 ) {
1736 trace!(
1737 "event referred to new receipt is {relative_pos:?} the previous receipt"
1738 );
1739 return relative_pos == RelativePosition::After;
1740 }
1741 }
1742 }
1743
1744 SendReceiptType::ReadPrivate => {
1747 if let Some((old_priv_read, _)) =
1748 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1749 {
1750 trace!(%old_priv_read, "found a previous private receipt");
1751 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1752 &old_priv_read,
1753 event_id,
1754 state.items.all_remote_events(),
1755 ) {
1756 trace!(
1757 "event referred to new receipt is {relative_pos:?} the previous receipt"
1758 );
1759 return relative_pos == RelativePosition::After;
1760 }
1761 }
1762 }
1763
1764 SendReceiptType::FullyRead => {
1765 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1766 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1767 &prev_event_id,
1768 event_id,
1769 state.items.all_remote_events(),
1770 )
1771 {
1772 return relative_pos == RelativePosition::After;
1773 }
1774 }
1775
1776 _ => {}
1777 }
1778
1779 true
1781 }
1782
1783 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1786 let state = self.state.read().await;
1787 let filter_out_thread_events = match self.focus() {
1788 TimelineFocusKind::Thread { .. } => false,
1789 TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1790 TimelineFocusKind::Event { paginator } => {
1791 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1792 }
1793 _ => true,
1794 };
1795
1796 state
1801 .items
1802 .all_remote_events()
1803 .iter()
1804 .rev()
1805 .filter_map(|item| {
1806 if !filter_out_thread_events || item.thread_root_id.is_none() {
1807 Some(item.event_id.clone())
1808 } else {
1809 None
1810 }
1811 })
1812 .next()
1813 }
1814
1815 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1816 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1817 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1818
1819 let request = DecryptionRetryRequest {
1820 room_id: self.room().room_id().to_owned(),
1821 utd_session_ids: utds,
1822 refresh_info_session_ids: decrypted,
1823 };
1824
1825 self.room().client().event_cache().request_decryption(request);
1826 }
1827
1828 pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1836 match status {
1837 PaginationStatus::Idle { hit_timeline_start } => {
1838 if hit_timeline_start {
1839 let state = self.state.read().await;
1840 if state.meta.subscriber_skip_count.get() > 0 {
1844 return PaginationStatus::Idle { hit_timeline_start: false };
1845 }
1846 }
1847 }
1848 PaginationStatus::Paginating => {}
1849 }
1850
1851 status
1853 }
1854}
1855
1856impl<P: RoomDataProvider> TimelineController<P> {
1857 pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1859 &self.focus
1860 }
1861}
1862
1863#[allow(clippy::too_many_arguments)]
1864async fn fetch_replied_to_event<P: RoomDataProvider>(
1865 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1866 state_lock: &RwLock<TimelineState<P>>,
1867 index: usize,
1868 item: &EventTimelineItem,
1869 internal_id: TimelineUniqueId,
1870 msglike: &MsgLikeContent,
1871 in_reply_to: &EventId,
1872 room: &Room,
1873) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1874 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1875 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1876 trace!("Found replied-to event locally");
1877 return Ok(details);
1878 }
1879
1880 trace!("Setting in-reply-to details to pending");
1883 let in_reply_to_details =
1884 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1885
1886 let event_item = item
1887 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1888
1889 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1890 state_guard.items.replace(index, new_timeline_item);
1891
1892 drop(state_guard);
1894
1895 trace!("Fetching replied-to event");
1896 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1897 Ok(timeline_event) => {
1898 let state = state_lock.read().await;
1899
1900 let replied_to_item =
1901 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1902
1903 if let Some(item) = replied_to_item {
1904 TimelineDetails::Ready(Box::new(item))
1905 } else {
1906 return Err(Error::UnsupportedEvent);
1908 }
1909 }
1910
1911 Err(e) => TimelineDetails::Error(Arc::new(e)),
1912 };
1913
1914 Ok(res)
1915}