1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25 deserialized_responses::TimelineEvent,
26 event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
27 paginators::{PaginationResult, PaginationToken, Paginator},
28 send_queue::{
29 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
30 },
31};
32#[cfg(test)]
33use ruma::events::receipt::ReceiptEventContent;
34use ruma::{
35 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
36 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
37 events::{
38 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
39 AnySyncTimelineEvent, MessageLikeEventType,
40 poll::unstable_start::UnstablePollStartEventContent,
41 reaction::ReactionEventContent,
42 receipt::{Receipt, ReceiptThread, ReceiptType},
43 relation::Annotation,
44 room::message::{MessageType, Relation},
45 },
46 room_version_rules::RoomVersionRules,
47 serde::Raw,
48};
49use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
50use tracing::{debug, error, field::debug, info, instrument, trace, warn};
51
52pub(super) use self::{
53 metadata::{RelativePosition, TimelineMetadata},
54 observable_items::{
55 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
56 ObservableItemsTransactionEntry,
57 },
58 state::TimelineState,
59 state_transaction::TimelineStateTransaction,
60};
61use super::{
62 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
63 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
64 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
65 algorithms::{rfind_event_by_id, rfind_event_item},
66 event_item::{ReactionStatus, RemoteEventOrigin},
67 item::TimelineUniqueId,
68 subscriber::TimelineSubscriber,
69 traits::RoomDataProvider,
70};
71use crate::{
72 timeline::{
73 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
74 algorithms::rfind_event_by_item_id,
75 controller::decryption_retry_task::compute_redecryption_candidates,
76 date_dividers::DateDividerAdjuster,
77 event_item::TimelineItemHandle,
78 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
79 },
80 unable_to_decrypt_hook::UtdHookManager,
81};
82
83pub(in crate::timeline) mod aggregations;
84mod decryption_retry_task;
85mod metadata;
86mod observable_items;
87mod read_receipts;
88mod state;
89mod state_transaction;
90
91pub(super) use aggregations::*;
92pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
93use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
94use matrix_sdk_common::serde_helpers::extract_thread_root;
95
96#[derive(Debug)]
102pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
103 Live {
105 hide_threaded_events: bool,
107 },
108
109 Event {
112 paginator: OnceCell<AnyPaginator<P>>,
114 },
115
116 Thread {
118 root_event_id: OwnedEventId,
120 },
121
122 PinnedEvents {
123 loader: PinnedEventsLoader,
124 },
125}
126
127#[derive(Debug)]
128pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
129 Unthreaded {
130 paginator: Paginator<P>,
132 hide_threaded_events: bool,
134 },
135 Threaded(ThreadedEventsLoader<P>),
136}
137
138impl<P: RoomDataProvider> AnyPaginator<P> {
139 pub async fn paginate_backwards(
148 &self,
149 num_events: u16,
150 ) -> Result<PaginationResult, PaginatorError> {
151 match self {
152 Self::Unthreaded { paginator, .. } => {
153 paginator.paginate_backward(num_events.into()).await
154 }
155 Self::Threaded(threaded_paginator) => {
156 threaded_paginator.paginate_backwards(num_events.into()).await
157 }
158 }
159 }
160
161 pub async fn paginate_forwards(
169 &self,
170 num_events: u16,
171 ) -> Result<PaginationResult, PaginatorError> {
172 match self {
173 Self::Unthreaded { paginator, .. } => {
174 paginator.paginate_forward(num_events.into()).await
175 }
176 Self::Threaded(threaded_paginator) => {
177 threaded_paginator.paginate_forwards(num_events.into()).await
178 }
179 }
180 }
181
182 pub fn hide_threaded_events(&self) -> bool {
184 match self {
185 Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
186 Self::Threaded(_) => false,
187 }
188 }
189
190 pub fn thread_root(&self) -> Option<&EventId> {
193 match self {
194 Self::Unthreaded { .. } => None,
195 Self::Threaded(thread_events_loader) => {
196 Some(thread_events_loader.thread_root_event_id())
197 }
198 }
199 }
200}
201
202impl<P: RoomDataProvider> TimelineFocusKind<P> {
203 pub(super) fn receipt_thread(&self) -> ReceiptThread {
210 if let Some(thread_root) = self.thread_root() {
211 ReceiptThread::Thread(thread_root.to_owned())
212 } else if self.hide_threaded_events() {
213 ReceiptThread::Main
214 } else {
215 ReceiptThread::Unthreaded
216 }
217 }
218
219 fn hide_threaded_events(&self) -> bool {
221 match self {
222 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
223 TimelineFocusKind::Event { paginator } => {
224 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
225 }
226 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
227 }
228 }
229
230 fn is_thread(&self) -> bool {
233 self.thread_root().is_some()
234 }
235
236 fn thread_root(&self) -> Option<&EventId> {
238 match self {
239 TimelineFocusKind::Event { paginator, .. } => {
240 paginator.get().and_then(|paginator| paginator.thread_root())
241 }
242 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
243 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
244 }
245 }
246}
247
248#[derive(Clone, Debug)]
249pub(super) struct TimelineController<P: RoomDataProvider = Room> {
250 state: Arc<RwLock<TimelineState<P>>>,
252
253 focus: Arc<TimelineFocusKind<P>>,
255
256 pub(crate) room_data_provider: P,
261
262 pub(super) settings: TimelineSettings,
264}
265
266#[derive(Clone)]
267pub(super) struct TimelineSettings {
268 pub(super) track_read_receipts: bool,
270
271 pub(super) event_filter: Arc<TimelineEventFilterFn>,
274
275 pub(super) add_failed_to_parse: bool,
277
278 pub(super) date_divider_mode: DateDividerMode,
280}
281
282#[cfg(not(tarpaulin_include))]
283impl fmt::Debug for TimelineSettings {
284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285 f.debug_struct("TimelineSettings")
286 .field("track_read_receipts", &self.track_read_receipts)
287 .field("add_failed_to_parse", &self.add_failed_to_parse)
288 .finish_non_exhaustive()
289 }
290}
291
292impl Default for TimelineSettings {
293 fn default() -> Self {
294 Self {
295 track_read_receipts: false,
296 event_filter: Arc::new(default_event_filter),
297 add_failed_to_parse: true,
298 date_divider_mode: DateDividerMode::Daily,
299 }
300 }
301}
302
303pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
313 match event {
314 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
315 if ev.redacts(&rules.redaction).is_some() {
316 false
319 } else {
320 ev.event_type() != MessageLikeEventType::Reaction
323 }
324 }
325
326 AnySyncTimelineEvent::MessageLike(msg) => {
327 match msg.original_content() {
328 None => {
329 msg.event_type() != MessageLikeEventType::Reaction
332 }
333
334 Some(original_content) => {
335 match original_content {
336 AnyMessageLikeEventContent::RoomMessage(content) => {
337 if content
338 .relates_to
339 .as_ref()
340 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
341 {
342 return false;
344 }
345
346 match content.msgtype {
347 MessageType::Audio(_)
348 | MessageType::Emote(_)
349 | MessageType::File(_)
350 | MessageType::Image(_)
351 | MessageType::Location(_)
352 | MessageType::Notice(_)
353 | MessageType::ServerNotice(_)
354 | MessageType::Text(_)
355 | MessageType::Video(_)
356 | MessageType::VerificationRequest(_) => true,
357 #[cfg(feature = "unstable-msc4274")]
358 MessageType::Gallery(_) => true,
359 _ => false,
360 }
361 }
362
363 AnyMessageLikeEventContent::Sticker(_)
364 | AnyMessageLikeEventContent::UnstablePollStart(
365 UnstablePollStartEventContent::New(_),
366 )
367 | AnyMessageLikeEventContent::CallInvite(_)
368 | AnyMessageLikeEventContent::RtcNotification(_)
369 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
370
371 _ => false,
372 }
373 }
374 }
375 }
376
377 AnySyncTimelineEvent::State(_) => {
378 true
380 }
381 }
382}
383
384impl<P: RoomDataProvider> TimelineController<P> {
385 pub(super) fn new(
386 room_data_provider: P,
387 focus: TimelineFocus,
388 internal_id_prefix: Option<String>,
389 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
390 is_room_encrypted: bool,
391 settings: TimelineSettings,
392 ) -> Self {
393 let focus = match focus {
394 TimelineFocus::Live { hide_threaded_events } => {
395 TimelineFocusKind::Live { hide_threaded_events }
396 }
397
398 TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
399
400 TimelineFocus::Thread { root_event_id, .. } => {
401 TimelineFocusKind::Thread { root_event_id }
402 }
403
404 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
405 TimelineFocusKind::PinnedEvents {
406 loader: PinnedEventsLoader::new(
407 Arc::new(room_data_provider.clone()),
408 max_events_to_load as usize,
409 max_concurrent_requests as usize,
410 ),
411 }
412 }
413 };
414
415 let focus = Arc::new(focus);
416 let state = Arc::new(RwLock::new(TimelineState::new(
417 focus.clone(),
418 room_data_provider.own_user_id().to_owned(),
419 room_data_provider.room_version_rules(),
420 internal_id_prefix,
421 unable_to_decrypt_hook,
422 is_room_encrypted,
423 )));
424
425 Self { state, focus, room_data_provider, settings }
426 }
427
428 pub(super) async fn init_focus(
435 &self,
436 focus: &TimelineFocus,
437 room_event_cache: &RoomEventCache,
438 ) -> Result<bool, Error> {
439 match focus {
440 TimelineFocus::Live { .. } => {
441 let events = room_event_cache.events().await?;
443
444 let has_events = !events.is_empty();
445
446 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
447
448 match room_event_cache.pagination().status().get() {
449 RoomPaginationStatus::Idle { hit_timeline_start } => {
450 if hit_timeline_start {
451 self.insert_timeline_start_if_missing().await;
454 }
455 }
456 RoomPaginationStatus::Paginating => {}
457 }
458
459 Ok(has_events)
460 }
461
462 TimelineFocus::Event { target: event_id, num_context_events, hide_threaded_events } => {
463 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
464 unreachable!();
466 };
467
468 let event_paginator = Paginator::new(self.room_data_provider.clone());
469
470 let start_from_result = event_paginator
473 .start_from(event_id, (*num_context_events).into())
474 .await
475 .map_err(PaginationError::Paginator)?;
476
477 let thread_root_event_id = start_from_result
479 .events
480 .iter()
481 .find(
482 |event| {
483 if let Some(id) = event.event_id() { id == *event_id } else { false }
484 },
485 )
486 .and_then(|event| extract_thread_root(event.raw()));
487
488 let _ = paginator.set(match thread_root_event_id {
489 Some(root_id) => {
490 let mut tokens = event_paginator.tokens();
491
492 let includes_root_event = start_from_result.events.iter().any(|event| {
496 if let Some(id) = event.event_id() { id == root_id } else { false }
497 });
498
499 if includes_root_event {
500 tokens.previous = PaginationToken::HitEnd;
503 }
504
505 AnyPaginator::Threaded(ThreadedEventsLoader::new(
506 self.room_data_provider.clone(),
507 root_id,
508 tokens,
509 ))
510 }
511
512 None => AnyPaginator::Unthreaded {
513 paginator: event_paginator,
514 hide_threaded_events: *hide_threaded_events,
515 },
516 });
517
518 let has_events = !start_from_result.events.is_empty();
519 let events = start_from_result.events;
520
521 match paginator.get().expect("Paginator was not instantiated") {
522 AnyPaginator::Unthreaded { .. } => {
523 self.replace_with_initial_remote_events(
524 events,
525 RemoteEventOrigin::Pagination,
526 )
527 .await;
528 }
529
530 AnyPaginator::Threaded(threaded_events_loader) => {
531 let thread_root = threaded_events_loader.thread_root_event_id();
534 let events_in_thread = events.into_iter().filter(|event| {
535 extract_thread_root(event.raw())
536 .is_some_and(|event_thread_root| event_thread_root == thread_root)
537 || event.event_id().as_deref() == Some(thread_root)
538 });
539
540 self.replace_with_initial_remote_events(
541 events_in_thread,
542 RemoteEventOrigin::Pagination,
543 )
544 .await;
545 }
546 }
547
548 Ok(has_events)
549 }
550
551 TimelineFocus::Thread { root_event_id, .. } => {
552 let (events, _) =
553 room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
554 let has_events = !events.is_empty();
555
556 let mut related_events = Vector::new();
560 for event_id in events.iter().filter_map(|event| event.event_id()) {
561 if let Some((_original, related)) =
562 room_event_cache.find_event_with_relations(&event_id, None).await?
563 {
564 related_events.extend(related);
565 }
566 }
567
568 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
569
570 if !related_events.is_empty() {
572 self.handle_remote_aggregations(
573 vec![VectorDiff::Append { values: related_events }],
574 RemoteEventOrigin::Cache,
575 )
576 .await;
577 }
578
579 Ok(has_events)
580 }
581
582 TimelineFocus::PinnedEvents { .. } => {
583 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
584 unreachable!();
586 };
587
588 let Some(loaded_events) =
589 loader.load_events().await.map_err(Error::PinnedEventsError)?
590 else {
591 return Ok(false);
593 };
594
595 let has_events = !loaded_events.is_empty();
596
597 self.replace_with_initial_remote_events(
598 loaded_events,
599 RemoteEventOrigin::Pagination,
600 )
601 .await;
602
603 Ok(has_events)
604 }
605 }
606 }
607
608 pub async fn handle_encryption_state_changes(&self) {
613 let mut room_info = self.room_data_provider.room_info();
614
615 let mark_encrypted = || async {
617 let mut state = self.state.write().await;
618 state.meta.is_room_encrypted = true;
619 state.mark_all_events_as_encrypted();
620 };
621
622 if room_info.get().encryption_state().is_encrypted() {
623 mark_encrypted().await;
626 return;
627 }
628
629 while let Some(info) = room_info.next().await {
630 if info.encryption_state().is_encrypted() {
631 mark_encrypted().await;
632 break;
635 }
636 }
637 }
638
639 pub(crate) async fn reload_pinned_events(
640 &self,
641 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
642 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
643 loader.load_events().await
644 } else {
645 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
646 }
647 }
648
649 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
658 let state = self.state.read().await;
659
660 let (count, needs) = state
661 .meta
662 .subscriber_skip_count
663 .compute_next_when_paginating_backwards(num_events.into());
664
665 let is_live_timeline = true;
667 state.meta.subscriber_skip_count.update(count, is_live_timeline);
668
669 needs
670 }
671
672 pub(super) async fn focused_paginate_backwards(
677 &self,
678 num_events: u16,
679 ) -> Result<bool, PaginationError> {
680 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
681 TimelineFocusKind::Live { .. }
682 | TimelineFocusKind::PinnedEvents { .. }
683 | TimelineFocusKind::Thread { .. } => {
684 return Err(PaginationError::NotSupported);
685 }
686 TimelineFocusKind::Event { paginator, .. } => paginator
687 .get()
688 .expect("Paginator was not instantiated")
689 .paginate_backwards(num_events)
690 .await
691 .map_err(PaginationError::Paginator)?,
692 };
693
694 self.handle_remote_events_with_diffs(
697 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
698 RemoteEventOrigin::Pagination,
699 )
700 .await;
701
702 Ok(hit_end_of_timeline)
703 }
704
705 pub(super) async fn focused_paginate_forwards(
710 &self,
711 num_events: u16,
712 ) -> Result<bool, PaginationError> {
713 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
714 TimelineFocusKind::Live { .. }
715 | TimelineFocusKind::PinnedEvents { .. }
716 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
717
718 TimelineFocusKind::Event { paginator, .. } => paginator
719 .get()
720 .expect("Paginator was not instantiated")
721 .paginate_forwards(num_events)
722 .await
723 .map_err(PaginationError::Paginator)?,
724 };
725
726 self.handle_remote_events_with_diffs(
729 vec![VectorDiff::Append { values: events.into() }],
730 RemoteEventOrigin::Pagination,
731 )
732 .await;
733
734 Ok(hit_end_of_timeline)
735 }
736
737 pub(super) fn is_live(&self) -> bool {
739 matches!(&*self.focus, TimelineFocusKind::Live { .. })
740 }
741
742 pub(super) fn is_threaded(&self) -> bool {
744 self.focus.is_thread()
745 }
746
747 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
750 self.focus.thread_root().map(ToOwned::to_owned)
751 }
752
753 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
757 self.state.read().await.items.clone_items()
758 }
759
760 #[cfg(test)]
761 pub(super) async fn subscribe_raw(
762 &self,
763 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
764 self.state.read().await.items.subscribe().into_values_and_stream()
765 }
766
767 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
768 let state = self.state.read().await;
769
770 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
771 }
772
773 pub(super) async fn subscribe_filter_map<U, F>(
774 &self,
775 f: F,
776 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
777 where
778 U: Clone,
779 F: Fn(Arc<TimelineItem>) -> Option<U>,
780 {
781 self.state.read().await.items.subscribe().filter_map(f)
782 }
783
784 #[instrument(skip_all)]
788 pub(super) async fn toggle_reaction_local(
789 &self,
790 item_id: &TimelineEventItemId,
791 key: &str,
792 ) -> Result<bool, Error> {
793 let mut state = self.state.write().await;
794
795 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
796 warn!("Timeline item not found, can't add reaction");
797 return Err(Error::FailedToToggleReaction);
798 };
799
800 let user_id = self.room_data_provider.own_user_id();
801 let prev_status = item
802 .content()
803 .reactions()
804 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
805
806 let Some(prev_status) = prev_status else {
807 match item.handle() {
809 TimelineItemHandle::Local(send_handle) => {
810 if send_handle
811 .react(key.to_owned())
812 .await
813 .map_err(|err| Error::SendQueueError(err.into()))?
814 .is_some()
815 {
816 trace!("adding a reaction to a local echo");
817 return Ok(true);
818 }
819
820 warn!("couldn't toggle reaction for local echo");
821 return Ok(false);
822 }
823
824 TimelineItemHandle::Remote(event_id) => {
825 trace!("adding a reaction to a remote echo");
829 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
830 self.room_data_provider
831 .send(ReactionEventContent::from(annotation).into())
832 .await?;
833 return Ok(true);
834 }
835 }
836 };
837
838 trace!("removing a previous reaction");
839 match prev_status {
840 ReactionStatus::LocalToLocal(send_reaction_handle) => {
841 if let Some(handle) = send_reaction_handle {
842 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
843 warn!("unexpectedly unable to abort sending of local reaction");
846 }
847 } else {
848 warn!("no send reaction handle (this should only happen in testing contexts)");
849 }
850 }
851
852 ReactionStatus::LocalToRemote(send_handle) => {
853 trace!("aborting send of the previous reaction that was a local echo");
856 if let Some(handle) = send_handle {
857 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
858 warn!("unexpectedly unable to abort sending of local reaction");
861 }
862 } else {
863 warn!("no send handle (this should only happen in testing contexts)");
864 }
865 }
866
867 ReactionStatus::RemoteToRemote(event_id) => {
868 let Some(annotated_event_id) =
870 item.as_remote().map(|event_item| event_item.event_id.clone())
871 else {
872 warn!("remote reaction to remote event, but the associated item isn't remote");
873 return Ok(false);
874 };
875
876 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
877 let reaction_info = reactions.remove_reaction(user_id, key);
878
879 if reaction_info.is_some() {
880 let new_item = item.with_reactions(reactions);
881 state.items.replace(item_pos, new_item);
882 } else {
883 warn!(
884 "reaction is missing on the item, not removing it locally, \
885 but sending redaction."
886 );
887 }
888
889 drop(state);
891
892 trace!("sending redact for a previous reaction");
893 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
894 if let Some(reaction_info) = reaction_info {
895 debug!("sending redact failed, adding the reaction back to the list");
896
897 let mut state = self.state.write().await;
898 if let Some((item_pos, item)) =
899 rfind_event_by_id(&state.items, &annotated_event_id)
900 {
901 let mut reactions =
903 item.content().reactions().cloned().unwrap_or_default();
904 reactions
905 .entry(key.to_owned())
906 .or_default()
907 .insert(user_id.to_owned(), reaction_info);
908 let new_item = item.with_reactions(reactions);
909 state.items.replace(item_pos, new_item);
910 } else {
911 warn!(
912 "couldn't find item to re-add reaction anymore; \
913 maybe it's been redacted?"
914 );
915 }
916 }
917
918 return Err(err);
919 }
920 }
921 }
922
923 Ok(false)
924 }
925
926 pub(super) async fn handle_remote_events_with_diffs(
928 &self,
929 diffs: Vec<VectorDiff<TimelineEvent>>,
930 origin: RemoteEventOrigin,
931 ) {
932 if diffs.is_empty() {
933 return;
934 }
935
936 let mut state = self.state.write().await;
937 state
938 .handle_remote_events_with_diffs(
939 diffs,
940 origin,
941 &self.room_data_provider,
942 &self.settings,
943 )
944 .await
945 }
946
947 pub(super) async fn handle_remote_aggregations(
949 &self,
950 diffs: Vec<VectorDiff<TimelineEvent>>,
951 origin: RemoteEventOrigin,
952 ) {
953 if diffs.is_empty() {
954 return;
955 }
956
957 let mut state = self.state.write().await;
958 state
959 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
960 .await
961 }
962
963 pub(super) async fn clear(&self) {
964 self.state.write().await.clear();
965 }
966
967 pub(super) async fn replace_with_initial_remote_events<Events>(
975 &self,
976 events: Events,
977 origin: RemoteEventOrigin,
978 ) where
979 Events: IntoIterator,
980 <Events as IntoIterator>::Item: Into<TimelineEvent>,
981 {
982 let mut state = self.state.write().await;
983
984 let track_read_markers = self.settings.track_read_receipts;
985 if track_read_markers {
986 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
987 state
988 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
989 .await;
990 }
991
992 let mut events = events.into_iter().peekable();
998 if !state.items.is_empty() || events.peek().is_some() {
999 state
1000 .replace_with_remote_events(
1001 events,
1002 origin,
1003 &self.room_data_provider,
1004 &self.settings,
1005 )
1006 .await;
1007 }
1008
1009 if track_read_markers {
1010 if let Some(fully_read_event_id) =
1011 self.room_data_provider.load_fully_read_marker().await
1012 {
1013 state.handle_fully_read_marker(fully_read_event_id);
1014 } else if let Some(latest_receipt_event_id) = state
1015 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1016 {
1017 debug!("no `m.fully_read` marker found, falling back to read receipt");
1019 state.handle_fully_read_marker(latest_receipt_event_id);
1020 }
1021 }
1022 }
1023
1024 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1025 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1026 }
1027
1028 pub(super) async fn handle_ephemeral_events(
1029 &self,
1030 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1031 ) {
1032 let mut state = self.state.write().await;
1033 state.handle_ephemeral_events(events, &self.room_data_provider).await;
1034 }
1035
1036 #[instrument(skip_all)]
1038 pub(super) async fn handle_local_event(
1039 &self,
1040 txn_id: OwnedTransactionId,
1041 content: AnyMessageLikeEventContent,
1042 send_handle: Option<SendHandle>,
1043 ) {
1044 let sender = self.room_data_provider.own_user_id().to_owned();
1045 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1046
1047 let date_divider_mode = self.settings.date_divider_mode.clone();
1048
1049 let mut state = self.state.write().await;
1050 state
1051 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1052 .await;
1053 }
1054
1055 #[instrument(skip(self))]
1060 pub(super) async fn update_event_send_state(
1061 &self,
1062 txn_id: &TransactionId,
1063 send_state: EventSendState,
1064 ) {
1065 let mut state = self.state.write().await;
1066 let mut txn = state.transaction();
1067
1068 let new_event_id: Option<&EventId> =
1069 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1070
1071 if rfind_event_item(&txn.items, |it| {
1074 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1075 })
1076 .is_some()
1077 {
1078 trace!("Remote echo received before send-event response");
1080
1081 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1082
1083 if let Some((idx, _)) = local_echo {
1087 warn!("Message echo got duplicated, removing the local one");
1088 txn.items.remove(idx);
1089
1090 let mut adjuster =
1092 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1093 adjuster.run(&mut txn.items, &mut txn.meta);
1094 }
1095
1096 txn.commit();
1097 return;
1098 }
1099
1100 let result = rfind_event_item(&txn.items, |it| {
1102 it.transaction_id() == Some(txn_id)
1103 || new_event_id.is_some()
1104 && it.event_id() == new_event_id
1105 && it.as_local().is_some()
1106 });
1107
1108 let Some((idx, item)) = result else {
1109 if let Some(new_event_id) = new_event_id {
1114 if txn.meta.aggregations.mark_aggregation_as_sent(
1115 txn_id.to_owned(),
1116 new_event_id.to_owned(),
1117 &mut txn.items,
1118 &txn.meta.room_version_rules,
1119 ) {
1120 trace!("Aggregation marked as sent");
1121 txn.commit();
1122 return;
1123 }
1124
1125 trace!("Sent aggregation was not found");
1126 }
1127
1128 warn!("Timeline item not found, can't update send state");
1129 return;
1130 };
1131
1132 let Some(local_item) = item.as_local() else {
1133 warn!("We looked for a local item, but it transitioned to remote.");
1134 return;
1135 };
1136
1137 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1140 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1141 }
1142
1143 if let Some(new_event_id) = new_event_id {
1146 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1147 }
1148
1149 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1150 txn.items.replace(idx, new_item);
1151
1152 txn.commit();
1153 }
1154
1155 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1156 let mut state = self.state.write().await;
1157
1158 if let Some((idx, _)) =
1159 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1160 {
1161 let mut txn = state.transaction();
1162
1163 txn.items.remove(idx);
1164
1165 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1168 adjuster.run(&mut txn.items, &mut txn.meta);
1169
1170 txn.meta.update_read_marker(&mut txn.items);
1171
1172 txn.commit();
1173
1174 debug!("discarded local echo");
1175 return true;
1176 }
1177
1178 let mut txn = state.transaction();
1181
1182 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1184 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1185 &mut txn.items,
1186 ) {
1187 Ok(val) => val,
1188 Err(err) => {
1189 warn!("error when discarding local echo for an aggregation: {err}");
1190 true
1192 }
1193 };
1194
1195 if found_aggregation {
1196 txn.commit();
1197 }
1198
1199 found_aggregation
1200 }
1201
1202 pub(super) async fn replace_local_echo(
1203 &self,
1204 txn_id: &TransactionId,
1205 content: AnyMessageLikeEventContent,
1206 ) -> bool {
1207 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1208 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1213 return false;
1214 };
1215
1216 let mut state = self.state.write().await;
1217 let mut txn = state.transaction();
1218
1219 let Some((idx, prev_item)) =
1220 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1221 else {
1222 debug!("Can't find local echo to replace");
1223 return false;
1224 };
1225
1226 let ti_kind = {
1229 let Some(prev_local_item) = prev_item.as_local() else {
1230 warn!("We looked for a local item, but it transitioned as remote??");
1231 return false;
1232 };
1233 let progress = as_variant!(&prev_local_item.send_state,
1235 EventSendState::NotSentYet { progress } => progress.clone())
1236 .flatten();
1237 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1238 };
1239
1240 let new_item = TimelineItem::new(
1242 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1243 content.msgtype,
1244 content.mentions,
1245 prev_item.content().reactions().cloned().unwrap_or_default(),
1246 prev_item.content().thread_root(),
1247 prev_item.content().in_reply_to(),
1248 prev_item.content().thread_summary(),
1249 )),
1250 prev_item.internal_id.to_owned(),
1251 );
1252
1253 txn.items.replace(idx, new_item);
1254
1255 txn.commit();
1259
1260 debug!("Replaced local echo");
1261 true
1262 }
1263
1264 pub(super) async fn compute_redecryption_candidates(
1265 &self,
1266 ) -> (BTreeSet<String>, BTreeSet<String>) {
1267 let state = self.state.read().await;
1268 compute_redecryption_candidates(&state.items)
1269 }
1270
1271 pub(super) async fn set_sender_profiles_pending(&self) {
1272 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1273 }
1274
1275 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1276 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1277 }
1278
1279 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1280 self.state.write().await.items.for_each(|mut entry| {
1281 let Some(event_item) = entry.as_event() else { return };
1282 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1283 let new_item = entry.with_kind(TimelineItemKind::Event(
1284 event_item.with_sender_profile(profile_state.clone()),
1285 ));
1286 ObservableItemsEntry::replace(&mut entry, new_item);
1287 }
1288 });
1289 }
1290
1291 pub(super) async fn update_missing_sender_profiles(&self) {
1292 trace!("Updating missing sender profiles");
1293
1294 let mut state = self.state.write().await;
1295 let mut entries = state.items.entries();
1296 while let Some(mut entry) = entries.next() {
1297 let Some(event_item) = entry.as_event() else { continue };
1298 let event_id = event_item.event_id().map(debug);
1299 let transaction_id = event_item.transaction_id().map(debug);
1300
1301 if event_item.sender_profile().is_ready() {
1302 trace!(event_id, transaction_id, "Profile already set");
1303 continue;
1304 }
1305
1306 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1307 Some(profile) => {
1308 trace!(event_id, transaction_id, "Adding profile");
1309 let updated_item =
1310 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1311 let new_item = entry.with_kind(updated_item);
1312 ObservableItemsEntry::replace(&mut entry, new_item);
1313 }
1314 None => {
1315 if !event_item.sender_profile().is_unavailable() {
1316 trace!(event_id, transaction_id, "Marking profile unavailable");
1317 let updated_item =
1318 event_item.with_sender_profile(TimelineDetails::Unavailable);
1319 let new_item = entry.with_kind(updated_item);
1320 ObservableItemsEntry::replace(&mut entry, new_item);
1321 } else {
1322 debug!(event_id, transaction_id, "Profile already marked unavailable");
1323 }
1324 }
1325 }
1326 }
1327
1328 trace!("Done updating missing sender profiles");
1329 }
1330
1331 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1333 trace!("Forcing update of sender profiles: {sender_ids:?}");
1334
1335 let mut state = self.state.write().await;
1336 let mut entries = state.items.entries();
1337 while let Some(mut entry) = entries.next() {
1338 let Some(event_item) = entry.as_event() else { continue };
1339 if !sender_ids.contains(event_item.sender()) {
1340 continue;
1341 }
1342
1343 let event_id = event_item.event_id().map(debug);
1344 let transaction_id = event_item.transaction_id().map(debug);
1345
1346 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1347 Some(profile) => {
1348 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1349 {
1350 debug!(event_id, transaction_id, "Profile already up-to-date");
1351 } else {
1352 trace!(event_id, transaction_id, "Updating profile");
1353 let updated_item =
1354 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1355 let new_item = entry.with_kind(updated_item);
1356 ObservableItemsEntry::replace(&mut entry, new_item);
1357 }
1358 }
1359 None => {
1360 if !event_item.sender_profile().is_unavailable() {
1361 trace!(event_id, transaction_id, "Marking profile unavailable");
1362 let updated_item =
1363 event_item.with_sender_profile(TimelineDetails::Unavailable);
1364 let new_item = entry.with_kind(updated_item);
1365 ObservableItemsEntry::replace(&mut entry, new_item);
1366 } else {
1367 debug!(event_id, transaction_id, "Profile already marked unavailable");
1368 }
1369 }
1370 }
1371 }
1372
1373 trace!("Done forcing update of sender profiles");
1374 }
1375
1376 #[cfg(test)]
1377 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1378 let own_user_id = self.room_data_provider.own_user_id();
1379 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1380 }
1381
1382 pub(super) async fn latest_user_read_receipt(
1386 &self,
1387 user_id: &UserId,
1388 ) -> Option<(OwnedEventId, Receipt)> {
1389 let receipt_thread = self.focus.receipt_thread();
1390
1391 self.state
1392 .read()
1393 .await
1394 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1395 .await
1396 }
1397
1398 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1401 &self,
1402 user_id: &UserId,
1403 ) -> Option<OwnedEventId> {
1404 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1405 }
1406
1407 pub async fn subscribe_own_user_read_receipts_changed(
1409 &self,
1410 ) -> impl Stream<Item = ()> + use<P> {
1411 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1412 }
1413
1414 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1416 match echo.content {
1417 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1418 let content = match serialized_event.deserialize() {
1419 Ok(d) => d,
1420 Err(err) => {
1421 warn!("error deserializing local echo: {err}");
1422 return;
1423 }
1424 };
1425
1426 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1427 .await;
1428
1429 if let Some(send_error) = send_error {
1430 self.update_event_send_state(
1431 &echo.transaction_id,
1432 EventSendState::SendingFailed {
1433 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1434 send_error,
1435 ))),
1436 is_recoverable: false,
1437 },
1438 )
1439 .await;
1440 }
1441 }
1442
1443 LocalEchoContent::React { key, send_handle, applies_to } => {
1444 self.handle_local_reaction(key, send_handle, applies_to).await;
1445 }
1446 }
1447 }
1448
1449 #[instrument(skip(self, send_handle))]
1451 async fn handle_local_reaction(
1452 &self,
1453 reaction_key: String,
1454 send_handle: SendReactionHandle,
1455 applies_to: OwnedTransactionId,
1456 ) {
1457 let mut state = self.state.write().await;
1458 let mut tr = state.transaction();
1459
1460 let target = TimelineEventItemId::TransactionId(applies_to);
1461
1462 let reaction_txn_id = send_handle.transaction_id().to_owned();
1463 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1464 let aggregation = Aggregation::new(
1465 TimelineEventItemId::TransactionId(reaction_txn_id),
1466 AggregationKind::Reaction {
1467 key: reaction_key.clone(),
1468 sender: self.room_data_provider.own_user_id().to_owned(),
1469 timestamp: MilliSecondsSinceUnixEpoch::now(),
1470 reaction_status,
1471 },
1472 );
1473
1474 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1475 find_item_and_apply_aggregation(
1476 &tr.meta.aggregations,
1477 &mut tr.items,
1478 &target,
1479 aggregation,
1480 &tr.meta.room_version_rules,
1481 );
1482
1483 tr.commit();
1484 }
1485
1486 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1488 match update {
1489 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1490 self.handle_local_echo(echo).await;
1491 }
1492
1493 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1494 if !self.discard_local_echo(&transaction_id).await {
1495 warn!("couldn't find the local echo to discard");
1496 }
1497 }
1498
1499 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1500 let content = match new_content.deserialize() {
1501 Ok(d) => d,
1502 Err(err) => {
1503 warn!("error deserializing local echo (upon edit): {err}");
1504 return;
1505 }
1506 };
1507
1508 if !self.replace_local_echo(&transaction_id, content).await {
1509 warn!("couldn't find the local echo to replace");
1510 }
1511 }
1512
1513 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1514 self.update_event_send_state(
1515 &transaction_id,
1516 EventSendState::SendingFailed { error, is_recoverable },
1517 )
1518 .await;
1519 }
1520
1521 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1522 self.update_event_send_state(
1523 &transaction_id,
1524 EventSendState::NotSentYet { progress: None },
1525 )
1526 .await;
1527 }
1528
1529 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1530 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1531 .await;
1532 }
1533
1534 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1535 self.update_event_send_state(
1536 &related_to,
1537 EventSendState::NotSentYet {
1538 progress: Some(MediaUploadProgress { index, progress }),
1539 },
1540 )
1541 .await;
1542 }
1543 }
1544 }
1545
1546 pub async fn insert_timeline_start_if_missing(&self) {
1549 let mut state = self.state.write().await;
1550 let mut txn = state.transaction();
1551 txn.items.push_timeline_start_if_missing(
1552 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1553 );
1554 txn.commit();
1555 }
1556
1557 pub(super) async fn make_replied_to(
1563 &self,
1564 event: TimelineEvent,
1565 ) -> Result<Option<EmbeddedEvent>, Error> {
1566 let state = self.state.read().await;
1567 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1568 }
1569}
1570
1571impl TimelineController {
1572 pub(super) fn room(&self) -> &Room {
1573 &self.room_data_provider
1574 }
1575
1576 #[instrument(skip(self))]
1579 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1580 let state_guard = self.state.write().await;
1581 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1582 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1583 let remote_item = item
1584 .as_remote()
1585 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1586 .clone();
1587
1588 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1589 debug!("Event is not a message");
1590 return Ok(());
1591 };
1592 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1593 debug!("Event is not a reply");
1594 return Ok(());
1595 };
1596 if let TimelineDetails::Pending = &in_reply_to.event {
1597 debug!("Replied-to event is already being fetched");
1598 return Ok(());
1599 }
1600 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1601 debug!("Replied-to event has already been fetched");
1602 return Ok(());
1603 }
1604
1605 let internal_id = item.internal_id.to_owned();
1606 let item = item.clone();
1607 let event = fetch_replied_to_event(
1608 state_guard,
1609 &self.state,
1610 index,
1611 &item,
1612 internal_id,
1613 &msglike,
1614 &in_reply_to.event_id,
1615 self.room(),
1616 )
1617 .await?;
1618
1619 let mut state = self.state.write().await;
1622 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1623 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1624
1625 let TimelineItemContent::MsgLike(MsgLikeContent {
1628 kind: MsgLikeKind::Message(message),
1629 reactions,
1630 thread_root,
1631 in_reply_to,
1632 thread_summary,
1633 }) = item.content().clone()
1634 else {
1635 info!("Event is no longer a message (redacted?)");
1636 return Ok(());
1637 };
1638 let Some(in_reply_to) = in_reply_to else {
1639 warn!("Event no longer has a reply (bug?)");
1640 return Ok(());
1641 };
1642
1643 trace!("Updating in-reply-to details");
1646 let internal_id = item.internal_id.to_owned();
1647 let mut item = item.clone();
1648 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1649 kind: MsgLikeKind::Message(message),
1650 reactions,
1651 thread_root,
1652 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1653 thread_summary,
1654 }));
1655 state.items.replace(index, TimelineItem::new(item, internal_id));
1656
1657 Ok(())
1658 }
1659
1660 pub(super) fn infer_thread_for_read_receipt(
1666 &self,
1667 receipt_type: &SendReceiptType,
1668 ) -> ReceiptThread {
1669 if matches!(receipt_type, SendReceiptType::FullyRead) {
1670 ReceiptThread::Unthreaded
1671 } else {
1672 self.focus.receipt_thread()
1673 }
1674 }
1675
1676 pub(super) async fn should_send_receipt(
1680 &self,
1681 receipt_type: &SendReceiptType,
1682 receipt_thread: &ReceiptThread,
1683 event_id: &EventId,
1684 ) -> bool {
1685 let own_user_id = self.room().own_user_id();
1686 let state = self.state.read().await;
1687 let room = self.room();
1688
1689 match receipt_type {
1690 SendReceiptType::Read => {
1691 if let Some((old_pub_read, _)) = state
1692 .meta
1693 .user_receipt(
1694 own_user_id,
1695 ReceiptType::Read,
1696 receipt_thread.clone(),
1697 room,
1698 state.items.all_remote_events(),
1699 )
1700 .await
1701 {
1702 trace!(%old_pub_read, "found a previous public receipt");
1703 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1704 &old_pub_read,
1705 event_id,
1706 state.items.all_remote_events(),
1707 ) {
1708 trace!(
1709 "event referred to new receipt is {relative_pos:?} the previous receipt"
1710 );
1711 return relative_pos == RelativePosition::After;
1712 }
1713 }
1714 }
1715
1716 SendReceiptType::ReadPrivate => {
1719 if let Some((old_priv_read, _)) =
1720 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1721 {
1722 trace!(%old_priv_read, "found a previous private receipt");
1723 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1724 &old_priv_read,
1725 event_id,
1726 state.items.all_remote_events(),
1727 ) {
1728 trace!(
1729 "event referred to new receipt is {relative_pos:?} the previous receipt"
1730 );
1731 return relative_pos == RelativePosition::After;
1732 }
1733 }
1734 }
1735
1736 SendReceiptType::FullyRead => {
1737 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1738 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1739 &prev_event_id,
1740 event_id,
1741 state.items.all_remote_events(),
1742 )
1743 {
1744 return relative_pos == RelativePosition::After;
1745 }
1746 }
1747
1748 _ => {}
1749 }
1750
1751 true
1753 }
1754
1755 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1758 let state = self.state.read().await;
1759 let filter_out_thread_events = match self.focus() {
1760 TimelineFocusKind::Thread { .. } => false,
1761 TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1762 TimelineFocusKind::Event { paginator } => {
1763 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1764 }
1765 _ => true,
1766 };
1767
1768 state
1773 .items
1774 .all_remote_events()
1775 .iter()
1776 .rev()
1777 .filter_map(|item| {
1778 if !filter_out_thread_events || item.thread_root_id.is_none() {
1779 Some(item.event_id.clone())
1780 } else {
1781 None
1782 }
1783 })
1784 .next()
1785 }
1786
1787 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1788 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1789 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1790
1791 let request = DecryptionRetryRequest {
1792 room_id: self.room().room_id().to_owned(),
1793 utd_session_ids: utds,
1794 refresh_info_session_ids: decrypted,
1795 };
1796
1797 self.room().client().event_cache().request_decryption(request);
1798 }
1799
1800 pub(super) async fn map_pagination_status(
1808 &self,
1809 status: RoomPaginationStatus,
1810 ) -> RoomPaginationStatus {
1811 match status {
1812 RoomPaginationStatus::Idle { hit_timeline_start } => {
1813 if hit_timeline_start {
1814 let state = self.state.read().await;
1815 if state.meta.subscriber_skip_count.get() > 0 {
1819 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1820 }
1821 }
1822 }
1823 RoomPaginationStatus::Paginating => {}
1824 }
1825
1826 status
1828 }
1829}
1830
1831impl<P: RoomDataProvider> TimelineController<P> {
1832 pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1834 &self.focus
1835 }
1836}
1837
1838#[allow(clippy::too_many_arguments)]
1839async fn fetch_replied_to_event<P: RoomDataProvider>(
1840 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1841 state_lock: &RwLock<TimelineState<P>>,
1842 index: usize,
1843 item: &EventTimelineItem,
1844 internal_id: TimelineUniqueId,
1845 msglike: &MsgLikeContent,
1846 in_reply_to: &EventId,
1847 room: &Room,
1848) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1849 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1850 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1851 trace!("Found replied-to event locally");
1852 return Ok(details);
1853 }
1854
1855 trace!("Setting in-reply-to details to pending");
1858 let in_reply_to_details =
1859 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1860
1861 let event_item = item
1862 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1863
1864 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1865 state_guard.items.replace(index, new_timeline_item);
1866
1867 drop(state_guard);
1869
1870 trace!("Fetching replied-to event");
1871 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1872 Ok(timeline_event) => {
1873 let state = state_lock.read().await;
1874
1875 let replied_to_item =
1876 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1877
1878 if let Some(item) = replied_to_item {
1879 TimelineDetails::Ready(Box::new(item))
1880 } else {
1881 return Err(Error::UnsupportedEvent);
1883 }
1884 }
1885
1886 Err(e) => TimelineDetails::Error(Arc::new(e)),
1887 };
1888
1889 Ok(res)
1890}