1use std::{
16 collections::BTreeSet,
17 fmt,
18 ops::Deref,
19 sync::{Arc, OnceLock},
20};
21
22use as_variant::as_variant;
23use eyeball_im::{VectorDiff, VectorSubscriberStream};
24use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
25use futures_core::Stream;
26use imbl::Vector;
27use matrix_sdk::{
28 deserialized_responses::TimelineEvent,
29 event_cache::{
30 DecryptionRetryRequest, EventCache, EventFocusedCache, PaginationStatus, PinnedEventsCache,
31 RoomEventCache, ThreadEventCache, TimelineVectorDiffs,
32 },
33 send_queue::{
34 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
35 },
36 task_monitor::BackgroundTaskHandle,
37};
38#[cfg(test)]
39use ruma::events::receipt::ReceiptEventContent;
40use ruma::{
41 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
42 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
43 events::{
44 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
45 AnySyncTimelineEvent, MessageLikeEventType,
46 poll::unstable_start::UnstablePollStartEventContent,
47 reaction::ReactionEventContent,
48 receipt::{Receipt, ReceiptThread, ReceiptType},
49 relation::Annotation,
50 room::message::{MessageType, Relation},
51 },
52 room_version_rules::RoomVersionRules,
53 serde::Raw,
54};
55use tokio::sync::{RwLock, RwLockWriteGuard, broadcast};
56use tracing::{
57 Instrument as _, Span, debug, error, field::debug, info, info_span, instrument, trace, warn,
58};
59
60pub(super) use self::{
61 metadata::{RelativePosition, TimelineMetadata},
62 observable_items::{
63 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
64 ObservableItemsTransactionEntry,
65 },
66 state::TimelineState,
67 state_transaction::TimelineStateTransaction,
68};
69use super::{
70 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
71 MediaUploadProgress, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus,
72 TimelineItem, TimelineItemContent, TimelineItemKind, TimelineReadReceiptTracking,
73 VirtualTimelineItem,
74 algorithms::{rfind_event_by_id, rfind_event_item},
75 event_item::{ReactionStatus, RemoteEventOrigin},
76 item::TimelineUniqueId,
77 subscriber::TimelineSubscriber,
78 traits::RoomDataProvider,
79};
80use crate::{
81 timeline::{
82 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
83 algorithms::rfind_event_by_item_id,
84 controller::decryption_retry_task::compute_redecryption_candidates,
85 date_dividers::DateDividerAdjuster,
86 event_item::TimelineItemHandle,
87 tasks::{event_focused_task, pinned_events_task, thread_updates_task},
88 },
89 unable_to_decrypt_hook::UtdHookManager,
90};
91
92pub(in crate::timeline) mod aggregations;
93mod decryption_retry_task;
94mod metadata;
95mod observable_items;
96mod read_receipts;
97mod state;
98mod state_transaction;
99
100pub(super) use aggregations::*;
101pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
102
103#[derive(Debug)]
109pub(in crate::timeline) enum TimelineFocusKind {
110 Live {
112 hide_threaded_events: bool,
114
115 event_cache: RoomEventCache,
117 },
118
119 Event {
122 focused_event_id: OwnedEventId,
124
125 thread_root: OnceLock<OwnedEventId>,
131
132 thread_mode: TimelineEventFocusThreadMode,
135
136 event_cache: EventFocusedCache,
138 },
139
140 Thread {
142 root_event_id: OwnedEventId,
144
145 event_cache: ThreadEventCache,
147 },
148
149 PinnedEvents {
150 event_cache: PinnedEventsCache,
152 },
153}
154
155impl TimelineFocusKind {
156 pub(super) fn receipt_thread(&self) -> ReceiptThread {
163 if let Some(thread_root) = self.thread_root() {
164 ReceiptThread::Thread(thread_root.to_owned())
165 } else if self.hide_threaded_events() {
166 ReceiptThread::Main
167 } else {
168 ReceiptThread::Unthreaded
169 }
170 }
171
172 fn hide_threaded_events(&self) -> bool {
174 match self {
175 TimelineFocusKind::Live { hide_threaded_events, .. } => *hide_threaded_events,
176 TimelineFocusKind::Event { thread_mode, .. } => {
177 matches!(
178 thread_mode,
179 TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
180 )
181 }
182 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
183 }
184 }
185
186 fn is_thread(&self) -> bool {
189 self.thread_root().is_some()
190 }
191
192 fn thread_root(&self) -> Option<&EventId> {
194 match self {
195 TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
196 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
197 TimelineFocusKind::Thread { root_event_id, .. } => Some(root_event_id),
198 }
199 }
200}
201
202#[derive(Clone, Debug)]
203pub(super) struct TimelineController<P: RoomDataProvider = Room> {
204 state: Arc<RwLock<TimelineState<P>>>,
206
207 focus: Arc<TimelineFocusKind>,
209
210 pub(crate) room_data_provider: P,
215
216 pub(super) settings: TimelineSettings,
218}
219
220#[derive(Clone)]
221pub(super) struct TimelineSettings {
222 pub(super) track_read_receipts: TimelineReadReceiptTracking,
225
226 pub(super) event_filter: Arc<TimelineEventFilterFn>,
229
230 pub(super) add_failed_to_parse: bool,
232
233 pub(super) date_divider_mode: DateDividerMode,
235}
236
237#[cfg(not(tarpaulin_include))]
238impl fmt::Debug for TimelineSettings {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 f.debug_struct("TimelineSettings")
241 .field("track_read_receipts", &self.track_read_receipts)
242 .field("add_failed_to_parse", &self.add_failed_to_parse)
243 .finish_non_exhaustive()
244 }
245}
246
247impl Default for TimelineSettings {
248 fn default() -> Self {
249 Self {
250 track_read_receipts: TimelineReadReceiptTracking::Disabled,
251 event_filter: Arc::new(default_event_filter),
252 add_failed_to_parse: true,
253 date_divider_mode: DateDividerMode::Daily,
254 }
255 }
256}
257
258pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
268 match event {
269 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
270 if ev.redacts(&rules.redaction).is_some() {
271 false
274 } else {
275 ev.event_type() != MessageLikeEventType::Reaction
278 }
279 }
280
281 AnySyncTimelineEvent::MessageLike(msg) => {
282 match msg.original_content() {
283 None => {
284 msg.event_type() != MessageLikeEventType::Reaction
287 }
288
289 Some(original_content) => {
290 match original_content {
291 AnyMessageLikeEventContent::RoomMessage(content) => {
292 if content
293 .relates_to
294 .as_ref()
295 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
296 {
297 return false;
299 }
300
301 match content.msgtype {
302 MessageType::Audio(_)
303 | MessageType::Emote(_)
304 | MessageType::File(_)
305 | MessageType::Image(_)
306 | MessageType::Location(_)
307 | MessageType::Notice(_)
308 | MessageType::ServerNotice(_)
309 | MessageType::Text(_)
310 | MessageType::Video(_)
311 | MessageType::VerificationRequest(_) => true,
312 #[cfg(feature = "unstable-msc4274")]
313 MessageType::Gallery(_) => true,
314 _ => false,
315 }
316 }
317
318 AnyMessageLikeEventContent::Sticker(_)
319 | AnyMessageLikeEventContent::UnstablePollStart(
320 UnstablePollStartEventContent::New(_),
321 )
322 | AnyMessageLikeEventContent::CallInvite(_)
323 | AnyMessageLikeEventContent::RtcNotification(_)
324 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
325
326 AnyMessageLikeEventContent::Beacon(_) => false,
330 AnyMessageLikeEventContent::RtcDecline(_) => false,
333
334 _ => false,
335 }
336 }
337 }
338 }
339
340 AnySyncTimelineEvent::State(_) => {
341 true
343 }
344 }
345}
346
347pub(super) struct InitFocusResult {
349 pub has_events: bool,
351 pub focus_task: Option<BackgroundTaskHandle>,
354}
355
356impl<P: RoomDataProvider> TimelineController<P> {
357 pub(super) async fn new(
358 room_data_provider: P,
359 focus: &TimelineFocus,
360 event_cache: &EventCache,
361 internal_id_prefix: Option<String>,
362 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
363 is_room_encrypted: bool,
364 settings: TimelineSettings,
365 ) -> Result<Self, Error> {
366 let room_id = room_data_provider.room_id();
367
368 let focus = match focus {
369 TimelineFocus::Live { hide_threaded_events } => TimelineFocusKind::Live {
370 hide_threaded_events: *hide_threaded_events,
371 event_cache: event_cache.room(room_id).await?.0,
372 },
373
374 TimelineFocus::Event { target, thread_mode, num_context_events, .. } => {
375 TimelineFocusKind::Event {
376 event_cache: event_cache
377 .event_focused(room_id, target, (*thread_mode).into(), *num_context_events)
378 .await?
379 .0,
380 focused_event_id: target.clone(),
381 thread_root: OnceLock::new(),
383 thread_mode: *thread_mode,
384 }
385 }
386
387 TimelineFocus::Thread { root_event_id, .. } => TimelineFocusKind::Thread {
388 event_cache: event_cache.thread(room_id, root_event_id).await?.0,
389 root_event_id: root_event_id.clone(),
390 },
391
392 TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents {
393 event_cache: event_cache.pinned_events(room_id).await?.0,
394 },
395 };
396
397 let focus = Arc::new(focus);
398 let state = Arc::new(RwLock::new(TimelineState::new(
399 focus.clone(),
400 room_data_provider.own_user_id().to_owned(),
401 room_data_provider.room_version_rules(),
402 internal_id_prefix,
403 unable_to_decrypt_hook,
404 is_room_encrypted,
405 )));
406
407 Ok(Self { state, focus, room_data_provider, settings })
408 }
409
410 pub async fn handle_encryption_state_changes(&self) {
415 let mut room_info = self.room_data_provider.room_info();
416
417 let mark_encrypted = || async {
419 let mut state = self.state.write().await;
420 state.meta.is_room_encrypted = true;
421 state.mark_all_events_as_encrypted();
422 };
423
424 if room_info.get().encryption_state().is_encrypted() {
425 mark_encrypted().await;
428 return;
429 }
430
431 while let Some(info) = room_info.next().await {
432 if info.encryption_state().is_encrypted() {
433 mark_encrypted().await;
434 break;
437 }
438 }
439 }
440
441 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
450 let state = self.state.read().await;
451
452 let (count, needs) = state
453 .meta
454 .subscriber_skip_count
455 .compute_next_when_paginating_backwards(num_events.into());
456
457 let is_live_timeline = true;
459 state.meta.subscriber_skip_count.update(count, is_live_timeline);
460
461 needs
462 }
463
464 pub(super) fn is_live(&self) -> bool {
466 matches!(&*self.focus, TimelineFocusKind::Live { .. })
467 }
468
469 pub(super) fn is_threaded(&self) -> bool {
471 self.focus.is_thread()
472 }
473
474 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
477 self.focus.thread_root().map(ToOwned::to_owned)
478 }
479
480 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
484 self.state.read().await.items.clone_items()
485 }
486
487 #[cfg(test)]
488 pub(super) async fn subscribe_raw(
489 &self,
490 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
491 self.state.read().await.items.subscribe().into_values_and_stream()
492 }
493
494 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
495 let state = self.state.read().await;
496
497 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
498 }
499
500 pub(super) async fn subscribe_filter_map<U, F>(
501 &self,
502 f: F,
503 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
504 where
505 U: Clone,
506 F: Fn(Arc<TimelineItem>) -> Option<U>,
507 {
508 self.state.read().await.items.subscribe().filter_map(f)
509 }
510
511 #[instrument(skip_all)]
515 pub(super) async fn toggle_reaction_local(
516 &self,
517 item_id: &TimelineEventItemId,
518 key: &str,
519 ) -> Result<bool, Error> {
520 let mut state = self.state.write().await;
521
522 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
523 warn!("Timeline item not found, can't add reaction");
524 return Err(Error::FailedToToggleReaction);
525 };
526
527 let user_id = self.room_data_provider.own_user_id();
528 let prev_status = item
529 .content()
530 .reactions()
531 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
532
533 let Some(prev_status) = prev_status else {
534 match item.handle() {
536 TimelineItemHandle::Local(send_handle) => {
537 if send_handle
538 .react(key.to_owned())
539 .await
540 .map_err(|err| Error::SendQueueError(err.into()))?
541 .is_some()
542 {
543 trace!("adding a reaction to a local echo");
544 return Ok(true);
545 }
546
547 warn!("couldn't toggle reaction for local echo");
548 return Ok(false);
549 }
550
551 TimelineItemHandle::Remote(event_id) => {
552 trace!("adding a reaction to a remote echo");
556 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
557 self.room_data_provider
558 .send(ReactionEventContent::from(annotation).into())
559 .await?;
560 return Ok(true);
561 }
562 }
563 };
564
565 trace!("removing a previous reaction");
566 match prev_status {
567 ReactionStatus::LocalToLocal(send_reaction_handle) => {
568 if let Some(handle) = send_reaction_handle {
569 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
570 warn!("unexpectedly unable to abort sending of local reaction");
573 }
574 } else {
575 warn!("no send reaction handle (this should only happen in testing contexts)");
576 }
577 }
578
579 ReactionStatus::LocalToRemote(send_handle) => {
580 trace!("aborting send of the previous reaction that was a local echo");
583 if let Some(handle) = send_handle {
584 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
585 warn!("unexpectedly unable to abort sending of local reaction");
588 }
589 } else {
590 warn!("no send handle (this should only happen in testing contexts)");
591 }
592 }
593
594 ReactionStatus::RemoteToRemote(event_id) => {
595 let Some(annotated_event_id) =
597 item.as_remote().map(|event_item| event_item.event_id.clone())
598 else {
599 warn!("remote reaction to remote event, but the associated item isn't remote");
600 return Ok(false);
601 };
602
603 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
604 let reaction_info = reactions.remove_reaction(user_id, key);
605
606 if reaction_info.is_some() {
607 let new_item = item.with_reactions(reactions);
608 state.items.replace(item_pos, new_item);
609 } else {
610 warn!(
611 "reaction is missing on the item, not removing it locally, \
612 but sending redaction."
613 );
614 }
615
616 drop(state);
618
619 trace!("sending redact for a previous reaction");
620 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
621 if let Some(reaction_info) = reaction_info {
622 debug!("sending redact failed, adding the reaction back to the list");
623
624 let mut state = self.state.write().await;
625 if let Some((item_pos, item)) =
626 rfind_event_by_id(&state.items, &annotated_event_id)
627 {
628 let mut reactions =
630 item.content().reactions().cloned().unwrap_or_default();
631 reactions
632 .entry(key.to_owned())
633 .or_default()
634 .insert(user_id.to_owned(), reaction_info);
635 let new_item = item.with_reactions(reactions);
636 state.items.replace(item_pos, new_item);
637 } else {
638 warn!(
639 "couldn't find item to re-add reaction anymore; \
640 maybe it's been redacted?"
641 );
642 }
643 }
644
645 return Err(err);
646 }
647 }
648 }
649
650 Ok(false)
651 }
652
653 pub(super) async fn handle_remote_events_with_diffs(
655 &self,
656 diffs: Vec<VectorDiff<TimelineEvent>>,
657 origin: RemoteEventOrigin,
658 ) {
659 if diffs.is_empty() {
660 return;
661 }
662
663 let mut state = self.state.write().await;
664 state
665 .handle_remote_events_with_diffs(
666 diffs,
667 origin,
668 &self.room_data_provider,
669 &self.settings,
670 )
671 .await
672 }
673
674 pub(super) async fn handle_remote_aggregations(
676 &self,
677 diffs: Vec<VectorDiff<TimelineEvent>>,
678 origin: RemoteEventOrigin,
679 ) {
680 if diffs.is_empty() {
681 return;
682 }
683
684 let mut state = self.state.write().await;
685 state
686 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
687 .await
688 }
689
690 pub(super) async fn clear(&self) {
691 self.state.write().await.clear();
692 }
693
694 pub(super) async fn replace_with_initial_remote_events<Events>(
702 &self,
703 events: Events,
704 origin: RemoteEventOrigin,
705 ) where
706 Events: IntoIterator,
707 <Events as IntoIterator>::Item: Into<TimelineEvent>,
708 {
709 let mut state = self.state.write().await;
710
711 let track_read_markers = &self.settings.track_read_receipts;
712 if track_read_markers.is_enabled() {
713 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
714 state
715 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
716 .await;
717 }
718
719 let mut events = events.into_iter().peekable();
725 if !state.items.is_empty() || events.peek().is_some() {
726 state
727 .replace_with_remote_events(
728 events,
729 origin,
730 &self.room_data_provider,
731 &self.settings,
732 )
733 .await;
734 }
735
736 if track_read_markers.is_enabled() {
737 if let Some(fully_read_event_id) =
738 self.room_data_provider.load_fully_read_marker().await
739 {
740 state.handle_fully_read_marker(fully_read_event_id);
741 } else if let Some(latest_receipt_event_id) = state
742 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
743 {
744 debug!("no `m.fully_read` marker found, falling back to read receipt");
746 state.handle_fully_read_marker(latest_receipt_event_id);
747 }
748 }
749 }
750
751 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
752 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
753 }
754
755 pub(super) async fn handle_ephemeral_events(
756 &self,
757 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
758 ) {
759 if events.is_empty() {
761 return;
762 }
763 let mut state = self.state.write().await;
764 state.handle_ephemeral_events(events, &self.room_data_provider).await;
765 }
766
767 #[instrument(skip_all)]
769 pub(super) async fn handle_local_event(
770 &self,
771 txn_id: OwnedTransactionId,
772 content: AnyMessageLikeEventContent,
773 send_handle: Option<SendHandle>,
774 ) {
775 let sender = self.room_data_provider.own_user_id().to_owned();
776 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
777
778 let date_divider_mode = self.settings.date_divider_mode.clone();
779
780 let mut state = self.state.write().await;
781 state
782 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
783 .await;
784 }
785
786 #[instrument(skip(self))]
791 pub(super) async fn update_event_send_state(
792 &self,
793 txn_id: &TransactionId,
794 send_state: EventSendState,
795 ) {
796 let mut state = self.state.write().await;
797 let mut txn = state.transaction();
798
799 let new_event_id: Option<&EventId> =
800 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
801
802 if rfind_event_item(&txn.items, |it| {
805 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
806 })
807 .is_some()
808 {
809 trace!("Remote echo received before send-event response");
811
812 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
813
814 if let Some((idx, _)) = local_echo {
818 warn!("Message echo got duplicated, removing the local one");
819 txn.items.remove(idx);
820
821 let mut adjuster =
823 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
824 adjuster.run(&mut txn.items, &mut txn.meta);
825 }
826
827 txn.commit();
828 return;
829 }
830
831 let result = rfind_event_item(&txn.items, |it| {
833 it.transaction_id() == Some(txn_id)
834 || new_event_id.is_some()
835 && it.event_id() == new_event_id
836 && it.as_local().is_some()
837 });
838
839 let Some((idx, item)) = result else {
840 if let Some(new_event_id) = new_event_id {
845 if txn.meta.aggregations.mark_aggregation_as_sent(
846 txn_id.to_owned(),
847 new_event_id.to_owned(),
848 &mut txn.items,
849 &txn.meta.room_version_rules,
850 ) {
851 trace!("Aggregation marked as sent");
852 txn.commit();
853 return;
854 }
855
856 trace!("Sent aggregation was not found");
857 }
858
859 warn!("Timeline item not found, can't update send state");
860 return;
861 };
862
863 let Some(local_item) = item.as_local() else {
864 warn!("We looked for a local item, but it transitioned to remote.");
865 return;
866 };
867
868 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
871 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
872 }
873
874 if let Some(new_event_id) = new_event_id {
877 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
878 }
879
880 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
881 txn.items.replace(idx, new_item);
882
883 txn.commit();
884 }
885
886 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
887 let mut state = self.state.write().await;
888
889 if let Some((idx, _)) =
890 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
891 {
892 let mut txn = state.transaction();
893
894 txn.items.remove(idx);
895
896 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
899 adjuster.run(&mut txn.items, &mut txn.meta);
900
901 txn.meta.update_read_marker(&mut txn.items);
902
903 txn.commit();
904
905 debug!("discarded local echo");
906 return true;
907 }
908
909 let mut txn = state.transaction();
912
913 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
915 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
916 &mut txn.items,
917 ) {
918 Ok(val) => val,
919 Err(err) => {
920 warn!("error when discarding local echo for an aggregation: {err}");
921 true
923 }
924 };
925
926 if found_aggregation {
927 txn.commit();
928 }
929
930 found_aggregation
931 }
932
933 pub(super) async fn replace_local_echo(
934 &self,
935 txn_id: &TransactionId,
936 content: AnyMessageLikeEventContent,
937 ) -> bool {
938 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
939 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
944 return false;
945 };
946
947 let mut state = self.state.write().await;
948 let mut txn = state.transaction();
949
950 let Some((idx, prev_item)) =
951 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
952 else {
953 debug!("Can't find local echo to replace");
954 return false;
955 };
956
957 let ti_kind = {
960 let Some(prev_local_item) = prev_item.as_local() else {
961 warn!("We looked for a local item, but it transitioned as remote??");
962 return false;
963 };
964 let progress = as_variant!(&prev_local_item.send_state,
966 EventSendState::NotSentYet { progress } => progress.clone())
967 .flatten();
968 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
969 };
970
971 let new_item = TimelineItem::new(
973 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
974 content.msgtype,
975 content.mentions,
976 prev_item.content().reactions().cloned().unwrap_or_default(),
977 prev_item.content().thread_root(),
978 prev_item.content().in_reply_to(),
979 prev_item.content().thread_summary(),
980 )),
981 prev_item.internal_id.to_owned(),
982 );
983
984 txn.items.replace(idx, new_item);
985
986 txn.commit();
990
991 debug!("Replaced local echo");
992 true
993 }
994
995 pub(super) async fn compute_redecryption_candidates(
996 &self,
997 ) -> (BTreeSet<String>, BTreeSet<String>) {
998 let state = self.state.read().await;
999 compute_redecryption_candidates(&state.items)
1000 }
1001
1002 pub(super) async fn set_sender_profiles_pending(&self) {
1003 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1004 }
1005
1006 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1007 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1008 }
1009
1010 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1011 self.state.write().await.items.for_each(|mut entry| {
1012 let Some(event_item) = entry.as_event() else { return };
1013 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1014 let new_item = entry.with_kind(TimelineItemKind::Event(
1015 event_item.with_sender_profile(profile_state.clone()),
1016 ));
1017 ObservableItemsEntry::replace(&mut entry, new_item);
1018 }
1019 });
1020 }
1021
1022 pub(super) async fn update_missing_sender_profiles(&self) {
1023 trace!("Updating missing sender profiles");
1024
1025 let mut state = self.state.write().await;
1026 let mut entries = state.items.entries();
1027 while let Some(mut entry) = entries.next() {
1028 let Some(event_item) = entry.as_event() else { continue };
1029 let event_id = event_item.event_id().map(debug);
1030 let transaction_id = event_item.transaction_id().map(debug);
1031
1032 if event_item.sender_profile().is_ready() {
1033 trace!(event_id, transaction_id, "Profile already set");
1034 continue;
1035 }
1036
1037 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1038 Some(profile) => {
1039 trace!(event_id, transaction_id, "Adding profile");
1040 let updated_item =
1041 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1042 let new_item = entry.with_kind(updated_item);
1043 ObservableItemsEntry::replace(&mut entry, new_item);
1044 }
1045 None => {
1046 if !event_item.sender_profile().is_unavailable() {
1047 trace!(event_id, transaction_id, "Marking profile unavailable");
1048 let updated_item =
1049 event_item.with_sender_profile(TimelineDetails::Unavailable);
1050 let new_item = entry.with_kind(updated_item);
1051 ObservableItemsEntry::replace(&mut entry, new_item);
1052 } else {
1053 debug!(event_id, transaction_id, "Profile already marked unavailable");
1054 }
1055 }
1056 }
1057 }
1058
1059 trace!("Done updating missing sender profiles");
1060 }
1061
1062 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1064 trace!("Forcing update of sender profiles: {sender_ids:?}");
1065
1066 let mut state = self.state.write().await;
1067 let mut entries = state.items.entries();
1068 while let Some(mut entry) = entries.next() {
1069 let Some(event_item) = entry.as_event() else { continue };
1070 if !sender_ids.contains(event_item.sender()) {
1071 continue;
1072 }
1073
1074 let event_id = event_item.event_id().map(debug);
1075 let transaction_id = event_item.transaction_id().map(debug);
1076
1077 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1078 Some(profile) => {
1079 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1080 {
1081 debug!(event_id, transaction_id, "Profile already up-to-date");
1082 } else {
1083 trace!(event_id, transaction_id, "Updating profile");
1084 let updated_item =
1085 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1086 let new_item = entry.with_kind(updated_item);
1087 ObservableItemsEntry::replace(&mut entry, new_item);
1088 }
1089 }
1090 None => {
1091 if !event_item.sender_profile().is_unavailable() {
1092 trace!(event_id, transaction_id, "Marking profile unavailable");
1093 let updated_item =
1094 event_item.with_sender_profile(TimelineDetails::Unavailable);
1095 let new_item = entry.with_kind(updated_item);
1096 ObservableItemsEntry::replace(&mut entry, new_item);
1097 } else {
1098 debug!(event_id, transaction_id, "Profile already marked unavailable");
1099 }
1100 }
1101 }
1102 }
1103
1104 trace!("Done forcing update of sender profiles");
1105 }
1106
1107 #[cfg(test)]
1108 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1109 let own_user_id = self.room_data_provider.own_user_id();
1110 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1111 }
1112
1113 pub(super) async fn latest_user_read_receipt(
1117 &self,
1118 user_id: &UserId,
1119 ) -> Option<(OwnedEventId, Receipt)> {
1120 let receipt_thread = self.focus.receipt_thread();
1121
1122 self.state
1123 .read()
1124 .await
1125 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1126 .await
1127 }
1128
1129 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1132 &self,
1133 user_id: &UserId,
1134 ) -> Option<OwnedEventId> {
1135 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1136 }
1137
1138 pub async fn subscribe_own_user_read_receipts_changed(
1140 &self,
1141 ) -> impl Stream<Item = ()> + use<P> {
1142 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1143 }
1144
1145 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1147 match echo.content {
1148 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1149 let content = match serialized_event.deserialize() {
1150 Ok(d) => d,
1151 Err(err) => {
1152 warn!("error deserializing local echo: {err}");
1153 return;
1154 }
1155 };
1156
1157 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1158 .await;
1159
1160 if let Some(send_error) = send_error {
1161 self.update_event_send_state(
1162 &echo.transaction_id,
1163 EventSendState::SendingFailed {
1164 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1165 send_error,
1166 ))),
1167 is_recoverable: false,
1168 },
1169 )
1170 .await;
1171 }
1172 }
1173
1174 LocalEchoContent::React { key, send_handle, applies_to } => {
1175 self.handle_local_reaction(key, send_handle, applies_to).await;
1176 }
1177
1178 LocalEchoContent::Redaction { redacts, send_error, .. } => {
1179 self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
1180
1181 if let Some(send_error) = send_error {
1182 self.update_event_send_state(
1183 &echo.transaction_id,
1184 EventSendState::SendingFailed {
1185 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1186 send_error,
1187 ))),
1188 is_recoverable: false,
1189 },
1190 )
1191 .await;
1192 }
1193 }
1194 }
1195 }
1196
1197 #[instrument(skip(self, send_handle))]
1199 async fn handle_local_reaction(
1200 &self,
1201 reaction_key: String,
1202 send_handle: SendReactionHandle,
1203 applies_to: OwnedTransactionId,
1204 ) {
1205 let mut state = self.state.write().await;
1206 let mut tr = state.transaction();
1207
1208 let target = TimelineEventItemId::TransactionId(applies_to);
1209
1210 let reaction_txn_id = send_handle.transaction_id().to_owned();
1211 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1212 let aggregation = Aggregation::new(
1213 TimelineEventItemId::TransactionId(reaction_txn_id),
1214 AggregationKind::Reaction {
1215 key: reaction_key.clone(),
1216 sender: self.room_data_provider.own_user_id().to_owned(),
1217 timestamp: MilliSecondsSinceUnixEpoch::now(),
1218 reaction_status,
1219 },
1220 );
1221
1222 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1223 find_item_and_apply_aggregation(
1224 &tr.meta.aggregations,
1225 &mut tr.items,
1226 &target,
1227 aggregation,
1228 &tr.meta.room_version_rules,
1229 );
1230
1231 tr.commit();
1232 }
1233
1234 pub(super) async fn handle_local_redaction(
1236 &self,
1237 txn_id: OwnedTransactionId,
1238 redacts: OwnedEventId,
1239 ) {
1240 let mut state = self.state.write().await;
1241 let mut tr = state.transaction();
1242
1243 let target = TimelineEventItemId::EventId(redacts);
1244
1245 let aggregation = Aggregation::new(
1246 TimelineEventItemId::TransactionId(txn_id),
1247 AggregationKind::Redaction { is_local: true },
1248 );
1249
1250 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1251 find_item_and_apply_aggregation(
1252 &tr.meta.aggregations,
1253 &mut tr.items,
1254 &target,
1255 aggregation,
1256 &tr.meta.room_version_rules,
1257 );
1258
1259 tr.commit();
1260 }
1261
1262 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1264 match update {
1265 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1266 self.handle_local_echo(echo).await;
1267 }
1268
1269 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1270 if !self.discard_local_echo(&transaction_id).await {
1271 warn!("couldn't find the local echo to discard");
1272 }
1273 }
1274
1275 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1276 let content = match new_content.deserialize() {
1277 Ok(d) => d,
1278 Err(err) => {
1279 warn!("error deserializing local echo (upon edit): {err}");
1280 return;
1281 }
1282 };
1283
1284 if !self.replace_local_echo(&transaction_id, content).await {
1285 warn!("couldn't find the local echo to replace");
1286 }
1287 }
1288
1289 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1290 self.update_event_send_state(
1291 &transaction_id,
1292 EventSendState::SendingFailed { error, is_recoverable },
1293 )
1294 .await;
1295 }
1296
1297 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1298 self.update_event_send_state(
1299 &transaction_id,
1300 EventSendState::NotSentYet { progress: None },
1301 )
1302 .await;
1303 }
1304
1305 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1306 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1307 .await;
1308 }
1309
1310 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1311 self.update_event_send_state(
1312 &related_to,
1313 EventSendState::NotSentYet {
1314 progress: Some(MediaUploadProgress { index, progress }),
1315 },
1316 )
1317 .await;
1318 }
1319 }
1320 }
1321
1322 pub async fn insert_timeline_start_if_missing(&self) {
1325 let mut state = self.state.write().await;
1326 let mut txn = state.transaction();
1327 txn.items.push_timeline_start_if_missing(
1328 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1329 );
1330 txn.commit();
1331 }
1332
1333 pub(super) async fn make_replied_to(
1339 &self,
1340 event: TimelineEvent,
1341 ) -> Result<Option<EmbeddedEvent>, Error> {
1342 let state = self.state.read().await;
1343 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1344 }
1345}
1346
1347impl TimelineController {
1348 pub(super) fn room(&self) -> &Room {
1349 &self.room_data_provider
1350 }
1351
1352 pub(super) async fn init_focus(&self) -> Result<InitFocusResult, Error> {
1357 match self.focus.deref() {
1358 TimelineFocusKind::Live { event_cache, .. } => {
1359 let events = event_cache.events().await?;
1361
1362 let has_events = !events.is_empty();
1363
1364 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1365
1366 match event_cache.pagination().status().get() {
1367 PaginationStatus::Idle { hit_timeline_start } => {
1368 if hit_timeline_start {
1369 self.insert_timeline_start_if_missing().await;
1372 }
1373 }
1374 PaginationStatus::Paginating => {}
1375 }
1376
1377 Ok(InitFocusResult { has_events, focus_task: None })
1378 }
1379
1380 TimelineFocusKind::Event {
1381 focused_event_id: event_id,
1382 thread_mode,
1383 thread_root: focus_thread_root,
1384 event_cache,
1385 ..
1386 } => {
1387 let (events, receiver) = event_cache.subscribe().await?;
1388
1389 let has_events = !events.is_empty();
1390
1391 if let Some(thread_root) = event_cache.thread_root().await? {
1394 focus_thread_root.get_or_init(|| thread_root);
1395 }
1396
1397 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
1398 .await;
1399
1400 let task = self
1401 .room_data_provider
1402 .client()
1403 .task_monitor()
1404 .spawn_infinite_task(
1405 "timeline::event_focused_cache_updates",
1406 event_focused_task(
1407 event_id.clone(),
1408 (*thread_mode).into(),
1409 event_cache.clone(),
1410 self.clone(),
1411 receiver,
1412 ),
1413 )
1414 .abort_on_drop();
1415
1416 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1417 }
1418
1419 TimelineFocusKind::Thread { event_cache, .. } => {
1420 let (has_events, receiver) = self.init_with_thread_root(event_cache).await?;
1421
1422 let room = &self.room_data_provider;
1423 let span = info_span!(
1424 parent: Span::none(),
1425 "thread_live_update_handler",
1426 room_id = ?room.room_id(),
1427 );
1428 span.follows_from(Span::current());
1429
1430 let task = room
1431 .client()
1432 .task_monitor()
1433 .spawn_infinite_task(
1434 "timeline::thread_event_cache_updates",
1435 thread_updates_task(receiver, event_cache.clone(), self.clone())
1436 .instrument(span),
1437 )
1438 .abort_on_drop();
1439
1440 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1441 }
1442
1443 TimelineFocusKind::PinnedEvents { event_cache } => {
1444 let (initial_events, pinned_events_recv) = event_cache.subscribe().await?;
1445
1446 let has_events = !initial_events.is_empty();
1447
1448 self.replace_with_initial_remote_events(
1449 initial_events,
1450 RemoteEventOrigin::Pagination,
1451 )
1452 .await;
1453
1454 let task = self
1455 .room_data_provider
1456 .client()
1457 .task_monitor()
1458 .spawn_infinite_task(
1459 "timeline::pinned_events_cache_updates",
1460 pinned_events_task(event_cache.clone(), self.clone(), pinned_events_recv),
1461 )
1462 .abort_on_drop();
1463
1464 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1465 }
1466 }
1467 }
1468
1469 pub(super) async fn init_with_thread_root(
1476 &self,
1477 event_cache: &ThreadEventCache,
1478 ) -> Result<(bool, broadcast::Receiver<TimelineVectorDiffs>), Error> {
1479 let (events, receiver) = event_cache.subscribe().await?;
1480 let has_events = !events.is_empty();
1481
1482 let mut related_events = Vector::new();
1486 for event_id in events.iter().filter_map(|event| event.event_id()) {
1487 if let Some((_original, related)) =
1488 event_cache.find_event_with_relations(event_id, None).await?
1489 {
1490 related_events.extend(related);
1491 }
1492 }
1493
1494 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1495
1496 if !related_events.is_empty() {
1498 self.handle_remote_aggregations(
1499 vec![VectorDiff::Append { values: related_events }],
1500 RemoteEventOrigin::Cache,
1501 )
1502 .await;
1503 }
1504
1505 Ok((has_events, receiver))
1506 }
1507
1508 #[instrument(skip(self))]
1511 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1512 let state_guard = self.state.write().await;
1513 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1514 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1515 let remote_item = item
1516 .as_remote()
1517 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1518 .clone();
1519
1520 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1521 debug!("Event is not a message");
1522 return Ok(());
1523 };
1524 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1525 debug!("Event is not a reply");
1526 return Ok(());
1527 };
1528 if let TimelineDetails::Pending = &in_reply_to.event {
1529 debug!("Replied-to event is already being fetched");
1530 return Ok(());
1531 }
1532 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1533 debug!("Replied-to event has already been fetched");
1534 return Ok(());
1535 }
1536
1537 let internal_id = item.internal_id.to_owned();
1538 let item = item.clone();
1539 let event = fetch_replied_to_event(
1540 state_guard,
1541 &self.state,
1542 index,
1543 &item,
1544 internal_id,
1545 &msglike,
1546 &in_reply_to.event_id,
1547 self.room(),
1548 )
1549 .await?;
1550
1551 let mut state = self.state.write().await;
1554 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1555 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1556
1557 let TimelineItemContent::MsgLike(MsgLikeContent {
1560 kind: MsgLikeKind::Message(message),
1561 reactions,
1562 thread_root,
1563 in_reply_to,
1564 thread_summary,
1565 }) = item.content().clone()
1566 else {
1567 info!("Event is no longer a message (redacted?)");
1568 return Ok(());
1569 };
1570 let Some(in_reply_to) = in_reply_to else {
1571 warn!("Event no longer has a reply (bug?)");
1572 return Ok(());
1573 };
1574
1575 trace!("Updating in-reply-to details");
1578 let internal_id = item.internal_id.to_owned();
1579 let mut item = item.clone();
1580 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1581 kind: MsgLikeKind::Message(message),
1582 reactions,
1583 thread_root,
1584 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1585 thread_summary,
1586 }));
1587 state.items.replace(index, TimelineItem::new(item, internal_id));
1588
1589 Ok(())
1590 }
1591
1592 pub(super) fn infer_thread_for_read_receipt(
1598 &self,
1599 receipt_type: &SendReceiptType,
1600 ) -> ReceiptThread {
1601 if matches!(receipt_type, SendReceiptType::FullyRead) {
1602 ReceiptThread::Unthreaded
1603 } else {
1604 self.focus.receipt_thread()
1605 }
1606 }
1607
1608 pub(super) async fn should_send_receipt(
1612 &self,
1613 receipt_type: &SendReceiptType,
1614 receipt_thread: &ReceiptThread,
1615 event_id: &EventId,
1616 ) -> bool {
1617 let own_user_id = self.room().own_user_id();
1618 let state = self.state.read().await;
1619 let room = self.room();
1620
1621 match receipt_type {
1622 SendReceiptType::Read => {
1623 if let Some((old_pub_read, _)) = state
1624 .meta
1625 .user_receipt(
1626 own_user_id,
1627 ReceiptType::Read,
1628 receipt_thread.clone(),
1629 room,
1630 state.items.all_remote_events(),
1631 )
1632 .await
1633 {
1634 trace!(%old_pub_read, "found a previous public receipt");
1635 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1636 &old_pub_read,
1637 event_id,
1638 state.items.all_remote_events(),
1639 ) {
1640 trace!(
1641 "event referred to new receipt is {relative_pos:?} the previous receipt"
1642 );
1643 return relative_pos == RelativePosition::After;
1644 }
1645 }
1646 }
1647
1648 SendReceiptType::ReadPrivate => {
1651 if let Some((old_priv_read, _)) =
1652 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1653 {
1654 trace!(%old_priv_read, "found a previous private receipt");
1655 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1656 &old_priv_read,
1657 event_id,
1658 state.items.all_remote_events(),
1659 ) {
1660 trace!(
1661 "event referred to new receipt is {relative_pos:?} the previous receipt"
1662 );
1663 return relative_pos == RelativePosition::After;
1664 }
1665 }
1666 }
1667
1668 SendReceiptType::FullyRead => {
1669 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1670 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1671 &prev_event_id,
1672 event_id,
1673 state.items.all_remote_events(),
1674 )
1675 {
1676 return relative_pos == RelativePosition::After;
1677 }
1678 }
1679
1680 _ => {}
1681 }
1682
1683 true
1685 }
1686
1687 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1690 let state = self.state.read().await;
1691 let filter_out_thread_events = match self.focus() {
1692 TimelineFocusKind::Thread { .. } => false,
1693 TimelineFocusKind::Live { hide_threaded_events, .. } => *hide_threaded_events,
1694 TimelineFocusKind::Event { .. } => {
1695 false
1697 }
1698 TimelineFocusKind::PinnedEvents { .. } => true,
1699 };
1700
1701 state
1702 .items
1703 .all_remote_events()
1704 .iter()
1705 .rev()
1706 .filter_map(|event_meta| {
1707 if !filter_out_thread_events {
1708 Some(event_meta.event_id.clone())
1710 } else if event_meta.thread_root_id.is_none() {
1711 if let Some(TimelineEventItemId::EventId(target_event_id)) =
1717 state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1718 event_meta.event_id.clone(),
1719 ))
1720 && let Some(target_meta) =
1721 state.items.all_remote_events().get_by_event_id(target_event_id)
1722 && target_meta.thread_root_id.is_some()
1723 {
1724 None
1726 } else {
1727 Some(event_meta.event_id.clone())
1730 }
1731 } else {
1732 None
1735 }
1736 })
1737 .next()
1738 }
1739
1740 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1741 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1742 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1743
1744 let request = DecryptionRetryRequest {
1745 room_id: self.room().room_id().to_owned(),
1746 utd_session_ids: utds,
1747 refresh_info_session_ids: decrypted,
1748 };
1749
1750 self.room().client().event_cache().request_decryption(request);
1751 }
1752
1753 pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1761 match status {
1762 PaginationStatus::Idle { hit_timeline_start } => {
1763 if hit_timeline_start {
1764 let state = self.state.read().await;
1765 if state.meta.subscriber_skip_count.get() > 0 {
1769 return PaginationStatus::Idle { hit_timeline_start: false };
1770 }
1771 }
1772 }
1773 PaginationStatus::Paginating => {}
1774 }
1775
1776 status
1778 }
1779}
1780
1781impl<P: RoomDataProvider> TimelineController<P> {
1782 pub(super) fn focus(&self) -> &TimelineFocusKind {
1784 &self.focus
1785 }
1786}
1787
1788#[allow(clippy::too_many_arguments)]
1789async fn fetch_replied_to_event<P: RoomDataProvider>(
1790 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1791 state_lock: &RwLock<TimelineState<P>>,
1792 index: usize,
1793 item: &EventTimelineItem,
1794 internal_id: TimelineUniqueId,
1795 msglike: &MsgLikeContent,
1796 in_reply_to: &EventId,
1797 room: &Room,
1798) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1799 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1800 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1801 trace!("Found replied-to event locally");
1802 return Ok(details);
1803 }
1804
1805 trace!("Setting in-reply-to details to pending");
1808 let in_reply_to_details =
1809 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1810
1811 let event_item = item
1812 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1813
1814 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1815 state_guard.items.replace(index, new_timeline_item);
1816
1817 drop(state_guard);
1819
1820 trace!("Fetching replied-to event");
1821 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1822 Ok(timeline_event) => {
1823 let state = state_lock.read().await;
1824
1825 let replied_to_item =
1826 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1827
1828 if let Some(item) = replied_to_item {
1829 TimelineDetails::Ready(Box::new(item))
1830 } else {
1831 return Err(Error::UnsupportedEvent);
1833 }
1834 }
1835
1836 Err(e) => TimelineDetails::Error(Arc::new(e)),
1837 };
1838
1839 Ok(res)
1840}