1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25 config::RequestConfig,
26 deserialized_responses::TimelineEvent,
27 event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
28 paginators::{PaginationResult, PaginationToken, Paginator},
29 send_queue::{
30 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
31 },
32};
33#[cfg(test)]
34use ruma::events::receipt::ReceiptEventContent;
35use ruma::{
36 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
40 AnySyncTimelineEvent, MessageLikeEventType,
41 poll::unstable_start::UnstablePollStartEventContent,
42 reaction::ReactionEventContent,
43 receipt::{Receipt, ReceiptThread, ReceiptType},
44 relation::{Annotation, RelationType},
45 room::message::{MessageType, Relation},
46 },
47 room_version_rules::RoomVersionRules,
48 serde::Raw,
49};
50use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53pub(super) use self::{
54 metadata::{RelativePosition, TimelineMetadata},
55 observable_items::{
56 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
57 ObservableItemsTransactionEntry,
58 },
59 state::TimelineState,
60 state_transaction::TimelineStateTransaction,
61};
62use super::{
63 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
64 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
65 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
66 TimelineReadReceiptTracking, VirtualTimelineItem,
67 algorithms::{rfind_event_by_id, rfind_event_item},
68 event_item::{ReactionStatus, RemoteEventOrigin},
69 item::TimelineUniqueId,
70 subscriber::TimelineSubscriber,
71 traits::RoomDataProvider,
72};
73use crate::{
74 timeline::{
75 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
76 algorithms::rfind_event_by_item_id,
77 controller::decryption_retry_task::compute_redecryption_candidates,
78 date_dividers::DateDividerAdjuster,
79 event_item::TimelineItemHandle,
80 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
81 },
82 unable_to_decrypt_hook::UtdHookManager,
83};
84
85pub(in crate::timeline) mod aggregations;
86mod decryption_retry_task;
87mod metadata;
88mod observable_items;
89mod read_receipts;
90mod state;
91mod state_transaction;
92
93pub(super) use aggregations::*;
94pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
95use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
96use matrix_sdk_common::serde_helpers::extract_thread_root;
97
98#[derive(Debug)]
104pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
105 Live {
107 hide_threaded_events: bool,
109 },
110
111 Event {
114 paginator: OnceCell<AnyPaginator<P>>,
116 },
117
118 Thread {
120 root_event_id: OwnedEventId,
122 },
123
124 PinnedEvents {
125 loader: PinnedEventsLoader,
126 },
127}
128
129#[derive(Debug)]
130pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
131 Unthreaded {
132 paginator: Paginator<P>,
134 hide_threaded_events: bool,
136 },
137 Threaded(ThreadedEventsLoader<P>),
138}
139
140impl<P: RoomDataProvider> AnyPaginator<P> {
141 pub async fn paginate_backwards(
150 &self,
151 num_events: u16,
152 ) -> Result<PaginationResult, PaginatorError> {
153 match self {
154 Self::Unthreaded { paginator, .. } => {
155 paginator.paginate_backward(num_events.into()).await
156 }
157 Self::Threaded(threaded_paginator) => {
158 threaded_paginator.paginate_backwards(num_events.into()).await
159 }
160 }
161 }
162
163 pub async fn paginate_forwards(
171 &self,
172 num_events: u16,
173 ) -> Result<PaginationResult, PaginatorError> {
174 match self {
175 Self::Unthreaded { paginator, .. } => {
176 paginator.paginate_forward(num_events.into()).await
177 }
178 Self::Threaded(threaded_paginator) => {
179 threaded_paginator.paginate_forwards(num_events.into()).await
180 }
181 }
182 }
183
184 pub fn hide_threaded_events(&self) -> bool {
186 match self {
187 Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
188 Self::Threaded(_) => false,
189 }
190 }
191
192 pub fn thread_root(&self) -> Option<&EventId> {
195 match self {
196 Self::Unthreaded { .. } => None,
197 Self::Threaded(thread_events_loader) => {
198 Some(thread_events_loader.thread_root_event_id())
199 }
200 }
201 }
202}
203
204impl<P: RoomDataProvider> TimelineFocusKind<P> {
205 pub(super) fn receipt_thread(&self) -> ReceiptThread {
212 if let Some(thread_root) = self.thread_root() {
213 ReceiptThread::Thread(thread_root.to_owned())
214 } else if self.hide_threaded_events() {
215 ReceiptThread::Main
216 } else {
217 ReceiptThread::Unthreaded
218 }
219 }
220
221 fn hide_threaded_events(&self) -> bool {
223 match self {
224 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
225 TimelineFocusKind::Event { paginator } => {
226 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
227 }
228 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
229 }
230 }
231
232 fn is_thread(&self) -> bool {
235 self.thread_root().is_some()
236 }
237
238 fn thread_root(&self) -> Option<&EventId> {
240 match self {
241 TimelineFocusKind::Event { paginator, .. } => {
242 paginator.get().and_then(|paginator| paginator.thread_root())
243 }
244 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
245 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
246 }
247 }
248}
249
250#[derive(Clone, Debug)]
251pub(super) struct TimelineController<P: RoomDataProvider = Room> {
252 state: Arc<RwLock<TimelineState<P>>>,
254
255 focus: Arc<TimelineFocusKind<P>>,
257
258 pub(crate) room_data_provider: P,
263
264 pub(super) settings: TimelineSettings,
266}
267
268#[derive(Clone)]
269pub(super) struct TimelineSettings {
270 pub(super) track_read_receipts: TimelineReadReceiptTracking,
273
274 pub(super) event_filter: Arc<TimelineEventFilterFn>,
277
278 pub(super) add_failed_to_parse: bool,
280
281 pub(super) date_divider_mode: DateDividerMode,
283}
284
285#[cfg(not(tarpaulin_include))]
286impl fmt::Debug for TimelineSettings {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 f.debug_struct("TimelineSettings")
289 .field("track_read_receipts", &self.track_read_receipts)
290 .field("add_failed_to_parse", &self.add_failed_to_parse)
291 .finish_non_exhaustive()
292 }
293}
294
295impl Default for TimelineSettings {
296 fn default() -> Self {
297 Self {
298 track_read_receipts: TimelineReadReceiptTracking::Disabled,
299 event_filter: Arc::new(default_event_filter),
300 add_failed_to_parse: true,
301 date_divider_mode: DateDividerMode::Daily,
302 }
303 }
304}
305
306pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
316 match event {
317 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
318 if ev.redacts(&rules.redaction).is_some() {
319 false
322 } else {
323 ev.event_type() != MessageLikeEventType::Reaction
326 }
327 }
328
329 AnySyncTimelineEvent::MessageLike(msg) => {
330 match msg.original_content() {
331 None => {
332 msg.event_type() != MessageLikeEventType::Reaction
335 }
336
337 Some(original_content) => {
338 match original_content {
339 AnyMessageLikeEventContent::RoomMessage(content) => {
340 if content
341 .relates_to
342 .as_ref()
343 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
344 {
345 return false;
347 }
348
349 match content.msgtype {
350 MessageType::Audio(_)
351 | MessageType::Emote(_)
352 | MessageType::File(_)
353 | MessageType::Image(_)
354 | MessageType::Location(_)
355 | MessageType::Notice(_)
356 | MessageType::ServerNotice(_)
357 | MessageType::Text(_)
358 | MessageType::Video(_)
359 | MessageType::VerificationRequest(_) => true,
360 #[cfg(feature = "unstable-msc4274")]
361 MessageType::Gallery(_) => true,
362 _ => false,
363 }
364 }
365
366 AnyMessageLikeEventContent::Sticker(_)
367 | AnyMessageLikeEventContent::UnstablePollStart(
368 UnstablePollStartEventContent::New(_),
369 )
370 | AnyMessageLikeEventContent::CallInvite(_)
371 | AnyMessageLikeEventContent::RtcNotification(_)
372 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
373
374 _ => false,
375 }
376 }
377 }
378 }
379
380 AnySyncTimelineEvent::State(_) => {
381 true
383 }
384 }
385}
386
387impl<P: RoomDataProvider> TimelineController<P> {
388 pub(super) fn new(
389 room_data_provider: P,
390 focus: TimelineFocus,
391 internal_id_prefix: Option<String>,
392 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
393 is_room_encrypted: bool,
394 settings: TimelineSettings,
395 ) -> Self {
396 let focus = match focus {
397 TimelineFocus::Live { hide_threaded_events } => {
398 TimelineFocusKind::Live { hide_threaded_events }
399 }
400
401 TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
402
403 TimelineFocus::Thread { root_event_id, .. } => {
404 TimelineFocusKind::Thread { root_event_id }
405 }
406
407 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
408 TimelineFocusKind::PinnedEvents {
409 loader: PinnedEventsLoader::new(
410 Arc::new(room_data_provider.clone()),
411 max_events_to_load as usize,
412 max_concurrent_requests as usize,
413 ),
414 }
415 }
416 };
417
418 let focus = Arc::new(focus);
419 let state = Arc::new(RwLock::new(TimelineState::new(
420 focus.clone(),
421 room_data_provider.own_user_id().to_owned(),
422 room_data_provider.room_version_rules(),
423 internal_id_prefix,
424 unable_to_decrypt_hook,
425 is_room_encrypted,
426 )));
427
428 Self { state, focus, room_data_provider, settings }
429 }
430
431 pub(super) async fn init_focus(
438 &self,
439 focus: &TimelineFocus,
440 room_event_cache: &RoomEventCache,
441 ) -> Result<bool, Error> {
442 match focus {
443 TimelineFocus::Live { .. } => {
444 let events = room_event_cache.events().await?;
446
447 let has_events = !events.is_empty();
448
449 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
450
451 match room_event_cache.pagination().status().get() {
452 RoomPaginationStatus::Idle { hit_timeline_start } => {
453 if hit_timeline_start {
454 self.insert_timeline_start_if_missing().await;
457 }
458 }
459 RoomPaginationStatus::Paginating => {}
460 }
461
462 Ok(has_events)
463 }
464
465 TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
466 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
467 unreachable!();
469 };
470
471 let event_paginator = Paginator::new(self.room_data_provider.clone());
472
473 let load_events_with_context = || async {
474 event_paginator
476 .start_from(event_id, (*num_context_events).into())
477 .await
478 .map(|r| r.events)
479 .map_err(PaginationError::Paginator)
480 };
481
482 let events = if *num_context_events == 0 {
483 let request_config = Some(RequestConfig::default().retry_limit(3));
486 let relations_filter =
487 Some(vec![RelationType::Annotation, RelationType::Replacement]);
488
489 match self
491 .room_data_provider
492 .load_event_with_relations(event_id, request_config, relations_filter)
493 .await
494 {
495 Ok((event, related_events)) => {
496 let mut events = vec![event];
497 events.extend(related_events);
498 events
499 }
500 Err(err) => {
501 error!("error when loading focussed event: {err}");
502 load_events_with_context().await?
504 }
505 }
506 } else {
507 load_events_with_context().await?
509 };
510
511 let extracted_thread_root = events
513 .iter()
514 .find(
515 |event| {
516 if let Some(id) = event.event_id() { id == *event_id } else { false }
517 },
518 )
519 .and_then(|event| extract_thread_root(event.raw()));
520
521 let (thread_root_event_id, hide_threaded_events) = match thread_mode {
523 TimelineEventFocusThreadMode::ForceThread => {
524 (extracted_thread_root.or_else(|| Some(event_id.clone())), false)
527 }
528 TimelineEventFocusThreadMode::Automatic { hide_threaded_events } => {
529 (extracted_thread_root, *hide_threaded_events)
530 }
531 };
532
533 let _ = paginator.set(match thread_root_event_id {
534 Some(root_id) => {
535 let mut tokens = event_paginator.tokens();
536
537 let includes_root_event = events.iter().any(|event| {
541 if let Some(id) = event.event_id() { id == root_id } else { false }
542 });
543
544 if includes_root_event {
545 tokens.previous = PaginationToken::HitEnd;
548 }
549
550 AnyPaginator::Threaded(ThreadedEventsLoader::new(
551 self.room_data_provider.clone(),
552 root_id,
553 tokens,
554 ))
555 }
556
557 None => AnyPaginator::Unthreaded {
558 paginator: event_paginator,
559 hide_threaded_events,
560 },
561 });
562
563 let has_events = !events.is_empty();
564
565 match paginator.get().expect("Paginator was not instantiated") {
566 AnyPaginator::Unthreaded { .. } => {
567 self.replace_with_initial_remote_events(
568 events,
569 RemoteEventOrigin::Pagination,
570 )
571 .await;
572 }
573
574 AnyPaginator::Threaded(threaded_events_loader) => {
575 let thread_root = threaded_events_loader.thread_root_event_id();
578 let events_in_thread = events.into_iter().filter(|event| {
579 extract_thread_root(event.raw())
580 .is_some_and(|event_thread_root| event_thread_root == thread_root)
581 || event.event_id().as_deref() == Some(thread_root)
582 });
583
584 self.replace_with_initial_remote_events(
585 events_in_thread,
586 RemoteEventOrigin::Pagination,
587 )
588 .await;
589 }
590 }
591
592 Ok(has_events)
593 }
594
595 TimelineFocus::Thread { root_event_id, .. } => {
596 let (events, _) =
597 room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
598 let has_events = !events.is_empty();
599
600 let mut related_events = Vector::new();
604 for event_id in events.iter().filter_map(|event| event.event_id()) {
605 if let Some((_original, related)) =
606 room_event_cache.find_event_with_relations(&event_id, None).await?
607 {
608 related_events.extend(related);
609 }
610 }
611
612 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
613
614 if !related_events.is_empty() {
616 self.handle_remote_aggregations(
617 vec![VectorDiff::Append { values: related_events }],
618 RemoteEventOrigin::Cache,
619 )
620 .await;
621 }
622
623 Ok(has_events)
624 }
625
626 TimelineFocus::PinnedEvents { .. } => {
627 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
628 unreachable!();
630 };
631
632 let Some(loaded_events) =
633 loader.load_events().await.map_err(Error::PinnedEventsError)?
634 else {
635 return Ok(false);
637 };
638
639 let has_events = !loaded_events.is_empty();
640
641 self.replace_with_initial_remote_events(
642 loaded_events,
643 RemoteEventOrigin::Pagination,
644 )
645 .await;
646
647 Ok(has_events)
648 }
649 }
650 }
651
652 pub async fn handle_encryption_state_changes(&self) {
657 let mut room_info = self.room_data_provider.room_info();
658
659 let mark_encrypted = || async {
661 let mut state = self.state.write().await;
662 state.meta.is_room_encrypted = true;
663 state.mark_all_events_as_encrypted();
664 };
665
666 if room_info.get().encryption_state().is_encrypted() {
667 mark_encrypted().await;
670 return;
671 }
672
673 while let Some(info) = room_info.next().await {
674 if info.encryption_state().is_encrypted() {
675 mark_encrypted().await;
676 break;
679 }
680 }
681 }
682
683 pub(crate) async fn reload_pinned_events(
684 &self,
685 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
686 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
687 loader.load_events().await
688 } else {
689 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
690 }
691 }
692
693 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
702 let state = self.state.read().await;
703
704 let (count, needs) = state
705 .meta
706 .subscriber_skip_count
707 .compute_next_when_paginating_backwards(num_events.into());
708
709 let is_live_timeline = true;
711 state.meta.subscriber_skip_count.update(count, is_live_timeline);
712
713 needs
714 }
715
716 pub(super) async fn focused_paginate_backwards(
721 &self,
722 num_events: u16,
723 ) -> Result<bool, PaginationError> {
724 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
725 TimelineFocusKind::Live { .. }
726 | TimelineFocusKind::PinnedEvents { .. }
727 | TimelineFocusKind::Thread { .. } => {
728 return Err(PaginationError::NotSupported);
729 }
730 TimelineFocusKind::Event { paginator, .. } => paginator
731 .get()
732 .expect("Paginator was not instantiated")
733 .paginate_backwards(num_events)
734 .await
735 .map_err(PaginationError::Paginator)?,
736 };
737
738 self.handle_remote_events_with_diffs(
741 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
742 RemoteEventOrigin::Pagination,
743 )
744 .await;
745
746 Ok(hit_end_of_timeline)
747 }
748
749 pub(super) async fn focused_paginate_forwards(
754 &self,
755 num_events: u16,
756 ) -> Result<bool, PaginationError> {
757 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
758 TimelineFocusKind::Live { .. }
759 | TimelineFocusKind::PinnedEvents { .. }
760 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
761
762 TimelineFocusKind::Event { paginator, .. } => paginator
763 .get()
764 .expect("Paginator was not instantiated")
765 .paginate_forwards(num_events)
766 .await
767 .map_err(PaginationError::Paginator)?,
768 };
769
770 self.handle_remote_events_with_diffs(
773 vec![VectorDiff::Append { values: events.into() }],
774 RemoteEventOrigin::Pagination,
775 )
776 .await;
777
778 Ok(hit_end_of_timeline)
779 }
780
781 pub(super) fn is_live(&self) -> bool {
783 matches!(&*self.focus, TimelineFocusKind::Live { .. })
784 }
785
786 pub(super) fn is_threaded(&self) -> bool {
788 self.focus.is_thread()
789 }
790
791 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
794 self.focus.thread_root().map(ToOwned::to_owned)
795 }
796
797 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
801 self.state.read().await.items.clone_items()
802 }
803
804 #[cfg(test)]
805 pub(super) async fn subscribe_raw(
806 &self,
807 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
808 self.state.read().await.items.subscribe().into_values_and_stream()
809 }
810
811 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
812 let state = self.state.read().await;
813
814 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
815 }
816
817 pub(super) async fn subscribe_filter_map<U, F>(
818 &self,
819 f: F,
820 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
821 where
822 U: Clone,
823 F: Fn(Arc<TimelineItem>) -> Option<U>,
824 {
825 self.state.read().await.items.subscribe().filter_map(f)
826 }
827
828 #[instrument(skip_all)]
832 pub(super) async fn toggle_reaction_local(
833 &self,
834 item_id: &TimelineEventItemId,
835 key: &str,
836 ) -> Result<bool, Error> {
837 let mut state = self.state.write().await;
838
839 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
840 warn!("Timeline item not found, can't add reaction");
841 return Err(Error::FailedToToggleReaction);
842 };
843
844 let user_id = self.room_data_provider.own_user_id();
845 let prev_status = item
846 .content()
847 .reactions()
848 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
849
850 let Some(prev_status) = prev_status else {
851 match item.handle() {
853 TimelineItemHandle::Local(send_handle) => {
854 if send_handle
855 .react(key.to_owned())
856 .await
857 .map_err(|err| Error::SendQueueError(err.into()))?
858 .is_some()
859 {
860 trace!("adding a reaction to a local echo");
861 return Ok(true);
862 }
863
864 warn!("couldn't toggle reaction for local echo");
865 return Ok(false);
866 }
867
868 TimelineItemHandle::Remote(event_id) => {
869 trace!("adding a reaction to a remote echo");
873 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
874 self.room_data_provider
875 .send(ReactionEventContent::from(annotation).into())
876 .await?;
877 return Ok(true);
878 }
879 }
880 };
881
882 trace!("removing a previous reaction");
883 match prev_status {
884 ReactionStatus::LocalToLocal(send_reaction_handle) => {
885 if let Some(handle) = send_reaction_handle {
886 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
887 warn!("unexpectedly unable to abort sending of local reaction");
890 }
891 } else {
892 warn!("no send reaction handle (this should only happen in testing contexts)");
893 }
894 }
895
896 ReactionStatus::LocalToRemote(send_handle) => {
897 trace!("aborting send of the previous reaction that was a local echo");
900 if let Some(handle) = send_handle {
901 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
902 warn!("unexpectedly unable to abort sending of local reaction");
905 }
906 } else {
907 warn!("no send handle (this should only happen in testing contexts)");
908 }
909 }
910
911 ReactionStatus::RemoteToRemote(event_id) => {
912 let Some(annotated_event_id) =
914 item.as_remote().map(|event_item| event_item.event_id.clone())
915 else {
916 warn!("remote reaction to remote event, but the associated item isn't remote");
917 return Ok(false);
918 };
919
920 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
921 let reaction_info = reactions.remove_reaction(user_id, key);
922
923 if reaction_info.is_some() {
924 let new_item = item.with_reactions(reactions);
925 state.items.replace(item_pos, new_item);
926 } else {
927 warn!(
928 "reaction is missing on the item, not removing it locally, \
929 but sending redaction."
930 );
931 }
932
933 drop(state);
935
936 trace!("sending redact for a previous reaction");
937 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
938 if let Some(reaction_info) = reaction_info {
939 debug!("sending redact failed, adding the reaction back to the list");
940
941 let mut state = self.state.write().await;
942 if let Some((item_pos, item)) =
943 rfind_event_by_id(&state.items, &annotated_event_id)
944 {
945 let mut reactions =
947 item.content().reactions().cloned().unwrap_or_default();
948 reactions
949 .entry(key.to_owned())
950 .or_default()
951 .insert(user_id.to_owned(), reaction_info);
952 let new_item = item.with_reactions(reactions);
953 state.items.replace(item_pos, new_item);
954 } else {
955 warn!(
956 "couldn't find item to re-add reaction anymore; \
957 maybe it's been redacted?"
958 );
959 }
960 }
961
962 return Err(err);
963 }
964 }
965 }
966
967 Ok(false)
968 }
969
970 pub(super) async fn handle_remote_events_with_diffs(
972 &self,
973 diffs: Vec<VectorDiff<TimelineEvent>>,
974 origin: RemoteEventOrigin,
975 ) {
976 if diffs.is_empty() {
977 return;
978 }
979
980 let mut state = self.state.write().await;
981 state
982 .handle_remote_events_with_diffs(
983 diffs,
984 origin,
985 &self.room_data_provider,
986 &self.settings,
987 )
988 .await
989 }
990
991 pub(super) async fn handle_remote_aggregations(
993 &self,
994 diffs: Vec<VectorDiff<TimelineEvent>>,
995 origin: RemoteEventOrigin,
996 ) {
997 if diffs.is_empty() {
998 return;
999 }
1000
1001 let mut state = self.state.write().await;
1002 state
1003 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
1004 .await
1005 }
1006
1007 pub(super) async fn clear(&self) {
1008 self.state.write().await.clear();
1009 }
1010
1011 pub(super) async fn replace_with_initial_remote_events<Events>(
1019 &self,
1020 events: Events,
1021 origin: RemoteEventOrigin,
1022 ) where
1023 Events: IntoIterator,
1024 <Events as IntoIterator>::Item: Into<TimelineEvent>,
1025 {
1026 let mut state = self.state.write().await;
1027
1028 let track_read_markers = &self.settings.track_read_receipts;
1029 if track_read_markers.is_enabled() {
1030 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
1031 state
1032 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
1033 .await;
1034 }
1035
1036 let mut events = events.into_iter().peekable();
1042 if !state.items.is_empty() || events.peek().is_some() {
1043 state
1044 .replace_with_remote_events(
1045 events,
1046 origin,
1047 &self.room_data_provider,
1048 &self.settings,
1049 )
1050 .await;
1051 }
1052
1053 if track_read_markers.is_enabled() {
1054 if let Some(fully_read_event_id) =
1055 self.room_data_provider.load_fully_read_marker().await
1056 {
1057 state.handle_fully_read_marker(fully_read_event_id);
1058 } else if let Some(latest_receipt_event_id) = state
1059 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1060 {
1061 debug!("no `m.fully_read` marker found, falling back to read receipt");
1063 state.handle_fully_read_marker(latest_receipt_event_id);
1064 }
1065 }
1066 }
1067
1068 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1069 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1070 }
1071
1072 pub(super) async fn handle_ephemeral_events(
1073 &self,
1074 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1075 ) {
1076 if events.is_empty() {
1078 return;
1079 }
1080 let mut state = self.state.write().await;
1081 state.handle_ephemeral_events(events, &self.room_data_provider).await;
1082 }
1083
1084 #[instrument(skip_all)]
1086 pub(super) async fn handle_local_event(
1087 &self,
1088 txn_id: OwnedTransactionId,
1089 content: AnyMessageLikeEventContent,
1090 send_handle: Option<SendHandle>,
1091 ) {
1092 let sender = self.room_data_provider.own_user_id().to_owned();
1093 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1094
1095 let date_divider_mode = self.settings.date_divider_mode.clone();
1096
1097 let mut state = self.state.write().await;
1098 state
1099 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1100 .await;
1101 }
1102
1103 #[instrument(skip(self))]
1108 pub(super) async fn update_event_send_state(
1109 &self,
1110 txn_id: &TransactionId,
1111 send_state: EventSendState,
1112 ) {
1113 let mut state = self.state.write().await;
1114 let mut txn = state.transaction();
1115
1116 let new_event_id: Option<&EventId> =
1117 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1118
1119 if rfind_event_item(&txn.items, |it| {
1122 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1123 })
1124 .is_some()
1125 {
1126 trace!("Remote echo received before send-event response");
1128
1129 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1130
1131 if let Some((idx, _)) = local_echo {
1135 warn!("Message echo got duplicated, removing the local one");
1136 txn.items.remove(idx);
1137
1138 let mut adjuster =
1140 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1141 adjuster.run(&mut txn.items, &mut txn.meta);
1142 }
1143
1144 txn.commit();
1145 return;
1146 }
1147
1148 let result = rfind_event_item(&txn.items, |it| {
1150 it.transaction_id() == Some(txn_id)
1151 || new_event_id.is_some()
1152 && it.event_id() == new_event_id
1153 && it.as_local().is_some()
1154 });
1155
1156 let Some((idx, item)) = result else {
1157 if let Some(new_event_id) = new_event_id {
1162 if txn.meta.aggregations.mark_aggregation_as_sent(
1163 txn_id.to_owned(),
1164 new_event_id.to_owned(),
1165 &mut txn.items,
1166 &txn.meta.room_version_rules,
1167 ) {
1168 trace!("Aggregation marked as sent");
1169 txn.commit();
1170 return;
1171 }
1172
1173 trace!("Sent aggregation was not found");
1174 }
1175
1176 warn!("Timeline item not found, can't update send state");
1177 return;
1178 };
1179
1180 let Some(local_item) = item.as_local() else {
1181 warn!("We looked for a local item, but it transitioned to remote.");
1182 return;
1183 };
1184
1185 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1188 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1189 }
1190
1191 if let Some(new_event_id) = new_event_id {
1194 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1195 }
1196
1197 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1198 txn.items.replace(idx, new_item);
1199
1200 txn.commit();
1201 }
1202
1203 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1204 let mut state = self.state.write().await;
1205
1206 if let Some((idx, _)) =
1207 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1208 {
1209 let mut txn = state.transaction();
1210
1211 txn.items.remove(idx);
1212
1213 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1216 adjuster.run(&mut txn.items, &mut txn.meta);
1217
1218 txn.meta.update_read_marker(&mut txn.items);
1219
1220 txn.commit();
1221
1222 debug!("discarded local echo");
1223 return true;
1224 }
1225
1226 let mut txn = state.transaction();
1229
1230 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1232 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1233 &mut txn.items,
1234 ) {
1235 Ok(val) => val,
1236 Err(err) => {
1237 warn!("error when discarding local echo for an aggregation: {err}");
1238 true
1240 }
1241 };
1242
1243 if found_aggregation {
1244 txn.commit();
1245 }
1246
1247 found_aggregation
1248 }
1249
1250 pub(super) async fn replace_local_echo(
1251 &self,
1252 txn_id: &TransactionId,
1253 content: AnyMessageLikeEventContent,
1254 ) -> bool {
1255 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1256 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1261 return false;
1262 };
1263
1264 let mut state = self.state.write().await;
1265 let mut txn = state.transaction();
1266
1267 let Some((idx, prev_item)) =
1268 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1269 else {
1270 debug!("Can't find local echo to replace");
1271 return false;
1272 };
1273
1274 let ti_kind = {
1277 let Some(prev_local_item) = prev_item.as_local() else {
1278 warn!("We looked for a local item, but it transitioned as remote??");
1279 return false;
1280 };
1281 let progress = as_variant!(&prev_local_item.send_state,
1283 EventSendState::NotSentYet { progress } => progress.clone())
1284 .flatten();
1285 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1286 };
1287
1288 let new_item = TimelineItem::new(
1290 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1291 content.msgtype,
1292 content.mentions,
1293 prev_item.content().reactions().cloned().unwrap_or_default(),
1294 prev_item.content().thread_root(),
1295 prev_item.content().in_reply_to(),
1296 prev_item.content().thread_summary(),
1297 )),
1298 prev_item.internal_id.to_owned(),
1299 );
1300
1301 txn.items.replace(idx, new_item);
1302
1303 txn.commit();
1307
1308 debug!("Replaced local echo");
1309 true
1310 }
1311
1312 pub(super) async fn compute_redecryption_candidates(
1313 &self,
1314 ) -> (BTreeSet<String>, BTreeSet<String>) {
1315 let state = self.state.read().await;
1316 compute_redecryption_candidates(&state.items)
1317 }
1318
1319 pub(super) async fn set_sender_profiles_pending(&self) {
1320 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1321 }
1322
1323 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1324 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1325 }
1326
1327 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1328 self.state.write().await.items.for_each(|mut entry| {
1329 let Some(event_item) = entry.as_event() else { return };
1330 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1331 let new_item = entry.with_kind(TimelineItemKind::Event(
1332 event_item.with_sender_profile(profile_state.clone()),
1333 ));
1334 ObservableItemsEntry::replace(&mut entry, new_item);
1335 }
1336 });
1337 }
1338
1339 pub(super) async fn update_missing_sender_profiles(&self) {
1340 trace!("Updating missing sender profiles");
1341
1342 let mut state = self.state.write().await;
1343 let mut entries = state.items.entries();
1344 while let Some(mut entry) = entries.next() {
1345 let Some(event_item) = entry.as_event() else { continue };
1346 let event_id = event_item.event_id().map(debug);
1347 let transaction_id = event_item.transaction_id().map(debug);
1348
1349 if event_item.sender_profile().is_ready() {
1350 trace!(event_id, transaction_id, "Profile already set");
1351 continue;
1352 }
1353
1354 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1355 Some(profile) => {
1356 trace!(event_id, transaction_id, "Adding profile");
1357 let updated_item =
1358 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1359 let new_item = entry.with_kind(updated_item);
1360 ObservableItemsEntry::replace(&mut entry, new_item);
1361 }
1362 None => {
1363 if !event_item.sender_profile().is_unavailable() {
1364 trace!(event_id, transaction_id, "Marking profile unavailable");
1365 let updated_item =
1366 event_item.with_sender_profile(TimelineDetails::Unavailable);
1367 let new_item = entry.with_kind(updated_item);
1368 ObservableItemsEntry::replace(&mut entry, new_item);
1369 } else {
1370 debug!(event_id, transaction_id, "Profile already marked unavailable");
1371 }
1372 }
1373 }
1374 }
1375
1376 trace!("Done updating missing sender profiles");
1377 }
1378
1379 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1381 trace!("Forcing update of sender profiles: {sender_ids:?}");
1382
1383 let mut state = self.state.write().await;
1384 let mut entries = state.items.entries();
1385 while let Some(mut entry) = entries.next() {
1386 let Some(event_item) = entry.as_event() else { continue };
1387 if !sender_ids.contains(event_item.sender()) {
1388 continue;
1389 }
1390
1391 let event_id = event_item.event_id().map(debug);
1392 let transaction_id = event_item.transaction_id().map(debug);
1393
1394 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1395 Some(profile) => {
1396 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1397 {
1398 debug!(event_id, transaction_id, "Profile already up-to-date");
1399 } else {
1400 trace!(event_id, transaction_id, "Updating profile");
1401 let updated_item =
1402 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1403 let new_item = entry.with_kind(updated_item);
1404 ObservableItemsEntry::replace(&mut entry, new_item);
1405 }
1406 }
1407 None => {
1408 if !event_item.sender_profile().is_unavailable() {
1409 trace!(event_id, transaction_id, "Marking profile unavailable");
1410 let updated_item =
1411 event_item.with_sender_profile(TimelineDetails::Unavailable);
1412 let new_item = entry.with_kind(updated_item);
1413 ObservableItemsEntry::replace(&mut entry, new_item);
1414 } else {
1415 debug!(event_id, transaction_id, "Profile already marked unavailable");
1416 }
1417 }
1418 }
1419 }
1420
1421 trace!("Done forcing update of sender profiles");
1422 }
1423
1424 #[cfg(test)]
1425 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1426 let own_user_id = self.room_data_provider.own_user_id();
1427 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1428 }
1429
1430 pub(super) async fn latest_user_read_receipt(
1434 &self,
1435 user_id: &UserId,
1436 ) -> Option<(OwnedEventId, Receipt)> {
1437 let receipt_thread = self.focus.receipt_thread();
1438
1439 self.state
1440 .read()
1441 .await
1442 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1443 .await
1444 }
1445
1446 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1449 &self,
1450 user_id: &UserId,
1451 ) -> Option<OwnedEventId> {
1452 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1453 }
1454
1455 pub async fn subscribe_own_user_read_receipts_changed(
1457 &self,
1458 ) -> impl Stream<Item = ()> + use<P> {
1459 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1460 }
1461
1462 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1464 match echo.content {
1465 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1466 let content = match serialized_event.deserialize() {
1467 Ok(d) => d,
1468 Err(err) => {
1469 warn!("error deserializing local echo: {err}");
1470 return;
1471 }
1472 };
1473
1474 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1475 .await;
1476
1477 if let Some(send_error) = send_error {
1478 self.update_event_send_state(
1479 &echo.transaction_id,
1480 EventSendState::SendingFailed {
1481 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1482 send_error,
1483 ))),
1484 is_recoverable: false,
1485 },
1486 )
1487 .await;
1488 }
1489 }
1490
1491 LocalEchoContent::React { key, send_handle, applies_to } => {
1492 self.handle_local_reaction(key, send_handle, applies_to).await;
1493 }
1494 }
1495 }
1496
1497 #[instrument(skip(self, send_handle))]
1499 async fn handle_local_reaction(
1500 &self,
1501 reaction_key: String,
1502 send_handle: SendReactionHandle,
1503 applies_to: OwnedTransactionId,
1504 ) {
1505 let mut state = self.state.write().await;
1506 let mut tr = state.transaction();
1507
1508 let target = TimelineEventItemId::TransactionId(applies_to);
1509
1510 let reaction_txn_id = send_handle.transaction_id().to_owned();
1511 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1512 let aggregation = Aggregation::new(
1513 TimelineEventItemId::TransactionId(reaction_txn_id),
1514 AggregationKind::Reaction {
1515 key: reaction_key.clone(),
1516 sender: self.room_data_provider.own_user_id().to_owned(),
1517 timestamp: MilliSecondsSinceUnixEpoch::now(),
1518 reaction_status,
1519 },
1520 );
1521
1522 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1523 find_item_and_apply_aggregation(
1524 &tr.meta.aggregations,
1525 &mut tr.items,
1526 &target,
1527 aggregation,
1528 &tr.meta.room_version_rules,
1529 );
1530
1531 tr.commit();
1532 }
1533
1534 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1536 match update {
1537 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1538 self.handle_local_echo(echo).await;
1539 }
1540
1541 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1542 if !self.discard_local_echo(&transaction_id).await {
1543 warn!("couldn't find the local echo to discard");
1544 }
1545 }
1546
1547 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1548 let content = match new_content.deserialize() {
1549 Ok(d) => d,
1550 Err(err) => {
1551 warn!("error deserializing local echo (upon edit): {err}");
1552 return;
1553 }
1554 };
1555
1556 if !self.replace_local_echo(&transaction_id, content).await {
1557 warn!("couldn't find the local echo to replace");
1558 }
1559 }
1560
1561 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1562 self.update_event_send_state(
1563 &transaction_id,
1564 EventSendState::SendingFailed { error, is_recoverable },
1565 )
1566 .await;
1567 }
1568
1569 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1570 self.update_event_send_state(
1571 &transaction_id,
1572 EventSendState::NotSentYet { progress: None },
1573 )
1574 .await;
1575 }
1576
1577 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1578 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1579 .await;
1580 }
1581
1582 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1583 self.update_event_send_state(
1584 &related_to,
1585 EventSendState::NotSentYet {
1586 progress: Some(MediaUploadProgress { index, progress }),
1587 },
1588 )
1589 .await;
1590 }
1591 }
1592 }
1593
1594 pub async fn insert_timeline_start_if_missing(&self) {
1597 let mut state = self.state.write().await;
1598 let mut txn = state.transaction();
1599 txn.items.push_timeline_start_if_missing(
1600 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1601 );
1602 txn.commit();
1603 }
1604
1605 pub(super) async fn make_replied_to(
1611 &self,
1612 event: TimelineEvent,
1613 ) -> Result<Option<EmbeddedEvent>, Error> {
1614 let state = self.state.read().await;
1615 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1616 }
1617}
1618
1619impl TimelineController {
1620 pub(super) fn room(&self) -> &Room {
1621 &self.room_data_provider
1622 }
1623
1624 #[instrument(skip(self))]
1627 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1628 let state_guard = self.state.write().await;
1629 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1630 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1631 let remote_item = item
1632 .as_remote()
1633 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1634 .clone();
1635
1636 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1637 debug!("Event is not a message");
1638 return Ok(());
1639 };
1640 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1641 debug!("Event is not a reply");
1642 return Ok(());
1643 };
1644 if let TimelineDetails::Pending = &in_reply_to.event {
1645 debug!("Replied-to event is already being fetched");
1646 return Ok(());
1647 }
1648 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1649 debug!("Replied-to event has already been fetched");
1650 return Ok(());
1651 }
1652
1653 let internal_id = item.internal_id.to_owned();
1654 let item = item.clone();
1655 let event = fetch_replied_to_event(
1656 state_guard,
1657 &self.state,
1658 index,
1659 &item,
1660 internal_id,
1661 &msglike,
1662 &in_reply_to.event_id,
1663 self.room(),
1664 )
1665 .await?;
1666
1667 let mut state = self.state.write().await;
1670 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1671 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1672
1673 let TimelineItemContent::MsgLike(MsgLikeContent {
1676 kind: MsgLikeKind::Message(message),
1677 reactions,
1678 thread_root,
1679 in_reply_to,
1680 thread_summary,
1681 }) = item.content().clone()
1682 else {
1683 info!("Event is no longer a message (redacted?)");
1684 return Ok(());
1685 };
1686 let Some(in_reply_to) = in_reply_to else {
1687 warn!("Event no longer has a reply (bug?)");
1688 return Ok(());
1689 };
1690
1691 trace!("Updating in-reply-to details");
1694 let internal_id = item.internal_id.to_owned();
1695 let mut item = item.clone();
1696 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1697 kind: MsgLikeKind::Message(message),
1698 reactions,
1699 thread_root,
1700 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1701 thread_summary,
1702 }));
1703 state.items.replace(index, TimelineItem::new(item, internal_id));
1704
1705 Ok(())
1706 }
1707
1708 pub(super) fn infer_thread_for_read_receipt(
1714 &self,
1715 receipt_type: &SendReceiptType,
1716 ) -> ReceiptThread {
1717 if matches!(receipt_type, SendReceiptType::FullyRead) {
1718 ReceiptThread::Unthreaded
1719 } else {
1720 self.focus.receipt_thread()
1721 }
1722 }
1723
1724 pub(super) async fn should_send_receipt(
1728 &self,
1729 receipt_type: &SendReceiptType,
1730 receipt_thread: &ReceiptThread,
1731 event_id: &EventId,
1732 ) -> bool {
1733 let own_user_id = self.room().own_user_id();
1734 let state = self.state.read().await;
1735 let room = self.room();
1736
1737 match receipt_type {
1738 SendReceiptType::Read => {
1739 if let Some((old_pub_read, _)) = state
1740 .meta
1741 .user_receipt(
1742 own_user_id,
1743 ReceiptType::Read,
1744 receipt_thread.clone(),
1745 room,
1746 state.items.all_remote_events(),
1747 )
1748 .await
1749 {
1750 trace!(%old_pub_read, "found a previous public receipt");
1751 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1752 &old_pub_read,
1753 event_id,
1754 state.items.all_remote_events(),
1755 ) {
1756 trace!(
1757 "event referred to new receipt is {relative_pos:?} the previous receipt"
1758 );
1759 return relative_pos == RelativePosition::After;
1760 }
1761 }
1762 }
1763
1764 SendReceiptType::ReadPrivate => {
1767 if let Some((old_priv_read, _)) =
1768 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1769 {
1770 trace!(%old_priv_read, "found a previous private receipt");
1771 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1772 &old_priv_read,
1773 event_id,
1774 state.items.all_remote_events(),
1775 ) {
1776 trace!(
1777 "event referred to new receipt is {relative_pos:?} the previous receipt"
1778 );
1779 return relative_pos == RelativePosition::After;
1780 }
1781 }
1782 }
1783
1784 SendReceiptType::FullyRead => {
1785 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1786 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1787 &prev_event_id,
1788 event_id,
1789 state.items.all_remote_events(),
1790 )
1791 {
1792 return relative_pos == RelativePosition::After;
1793 }
1794 }
1795
1796 _ => {}
1797 }
1798
1799 true
1801 }
1802
1803 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1806 let state = self.state.read().await;
1807 let filter_out_thread_events = match self.focus() {
1808 TimelineFocusKind::Thread { .. } => false,
1809 TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1810 TimelineFocusKind::Event { paginator } => {
1811 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1812 }
1813 _ => true,
1814 };
1815
1816 state
1821 .items
1822 .all_remote_events()
1823 .iter()
1824 .rev()
1825 .filter_map(|item| {
1826 if !filter_out_thread_events || item.thread_root_id.is_none() {
1827 Some(item.event_id.clone())
1828 } else {
1829 None
1830 }
1831 })
1832 .next()
1833 }
1834
1835 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1836 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1837 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1838
1839 let request = DecryptionRetryRequest {
1840 room_id: self.room().room_id().to_owned(),
1841 utd_session_ids: utds,
1842 refresh_info_session_ids: decrypted,
1843 };
1844
1845 self.room().client().event_cache().request_decryption(request);
1846 }
1847
1848 pub(super) async fn map_pagination_status(
1856 &self,
1857 status: RoomPaginationStatus,
1858 ) -> RoomPaginationStatus {
1859 match status {
1860 RoomPaginationStatus::Idle { hit_timeline_start } => {
1861 if hit_timeline_start {
1862 let state = self.state.read().await;
1863 if state.meta.subscriber_skip_count.get() > 0 {
1867 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1868 }
1869 }
1870 }
1871 RoomPaginationStatus::Paginating => {}
1872 }
1873
1874 status
1876 }
1877}
1878
1879impl<P: RoomDataProvider> TimelineController<P> {
1880 pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1882 &self.focus
1883 }
1884}
1885
1886#[allow(clippy::too_many_arguments)]
1887async fn fetch_replied_to_event<P: RoomDataProvider>(
1888 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1889 state_lock: &RwLock<TimelineState<P>>,
1890 index: usize,
1891 item: &EventTimelineItem,
1892 internal_id: TimelineUniqueId,
1893 msglike: &MsgLikeContent,
1894 in_reply_to: &EventId,
1895 room: &Room,
1896) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1897 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1898 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1899 trace!("Found replied-to event locally");
1900 return Ok(details);
1901 }
1902
1903 trace!("Setting in-reply-to details to pending");
1906 let in_reply_to_details =
1907 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1908
1909 let event_item = item
1910 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1911
1912 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1913 state_guard.items.replace(index, new_timeline_item);
1914
1915 drop(state_guard);
1917
1918 trace!("Fetching replied-to event");
1919 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1920 Ok(timeline_event) => {
1921 let state = state_lock.read().await;
1922
1923 let replied_to_item =
1924 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1925
1926 if let Some(item) = replied_to_item {
1927 TimelineDetails::Ready(Box::new(item))
1928 } else {
1929 return Err(Error::UnsupportedEvent);
1931 }
1932 }
1933
1934 Err(e) => TimelineDetails::Error(Arc::new(e)),
1935 };
1936
1937 Ok(res)
1938}