1use std::{
16 collections::BTreeSet,
17 fmt,
18 sync::{Arc, OnceLock},
19};
20
21use as_variant::as_variant;
22use eyeball_im::{VectorDiff, VectorSubscriberStream};
23use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26#[cfg(test)]
27use matrix_sdk::Result;
28use matrix_sdk::{
29 deserialized_responses::TimelineEvent,
30 event_cache::{
31 DecryptionRetryRequest, EventFocusThreadMode, PaginationStatus, RoomEventCache,
32 TimelineVectorDiffs,
33 },
34 send_queue::{
35 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
36 },
37 task_monitor::BackgroundTaskHandle,
38};
39#[cfg(test)]
40use ruma::events::receipt::ReceiptEventContent;
41use ruma::{
42 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
43 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
44 events::{
45 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
46 AnySyncTimelineEvent, MessageLikeEventType,
47 poll::unstable_start::UnstablePollStartEventContent,
48 reaction::ReactionEventContent,
49 receipt::{Receipt, ReceiptThread, ReceiptType},
50 relation::Annotation,
51 room::message::{MessageType, Relation},
52 },
53 room_version_rules::RoomVersionRules,
54 serde::Raw,
55};
56use tokio::sync::{RwLock, RwLockWriteGuard, broadcast};
57use tracing::{
58 Instrument as _, Span, debug, error, field::debug, info, info_span, instrument, trace, warn,
59};
60
61pub(super) use self::{
62 metadata::{RelativePosition, TimelineMetadata},
63 observable_items::{
64 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
65 ObservableItemsTransactionEntry,
66 },
67 state::TimelineState,
68 state_transaction::TimelineStateTransaction,
69};
70use super::{
71 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
72 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
73 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
74 TimelineReadReceiptTracking, VirtualTimelineItem,
75 algorithms::{rfind_event_by_id, rfind_event_item},
76 event_item::{ReactionStatus, RemoteEventOrigin},
77 item::TimelineUniqueId,
78 subscriber::TimelineSubscriber,
79 traits::RoomDataProvider,
80};
81use crate::{
82 timeline::{
83 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
84 algorithms::rfind_event_by_item_id,
85 controller::decryption_retry_task::compute_redecryption_candidates,
86 date_dividers::DateDividerAdjuster,
87 event_item::TimelineItemHandle,
88 tasks::{event_focused_task, pinned_events_task, thread_updates_task},
89 },
90 unable_to_decrypt_hook::UtdHookManager,
91};
92
93pub(in crate::timeline) mod aggregations;
94mod decryption_retry_task;
95mod metadata;
96mod observable_items;
97mod read_receipts;
98mod state;
99mod state_transaction;
100
101pub(super) use aggregations::*;
102pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
103
104#[derive(Debug)]
110pub(in crate::timeline) enum TimelineFocusKind {
111 Live {
113 hide_threaded_events: bool,
115 },
116
117 Event {
120 focused_event_id: OwnedEventId,
122
123 thread_root: OnceLock<OwnedEventId>,
129
130 thread_mode: TimelineEventFocusThreadMode,
133 },
134
135 Thread {
137 root_event_id: OwnedEventId,
139 },
140
141 PinnedEvents,
142}
143
144impl TimelineFocusKind {
145 pub(super) fn receipt_thread(&self) -> ReceiptThread {
152 if let Some(thread_root) = self.thread_root() {
153 ReceiptThread::Thread(thread_root.to_owned())
154 } else if self.hide_threaded_events() {
155 ReceiptThread::Main
156 } else {
157 ReceiptThread::Unthreaded
158 }
159 }
160
161 fn hide_threaded_events(&self) -> bool {
163 match self {
164 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
165 TimelineFocusKind::Event { thread_mode, .. } => {
166 matches!(
167 thread_mode,
168 TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
169 )
170 }
171 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
172 }
173 }
174
175 fn is_thread(&self) -> bool {
178 self.thread_root().is_some()
179 }
180
181 fn thread_root(&self) -> Option<&EventId> {
183 match self {
184 TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
185 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
186 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
187 }
188 }
189}
190
191#[derive(Clone, Debug)]
192pub(super) struct TimelineController<P: RoomDataProvider = Room> {
193 state: Arc<RwLock<TimelineState<P>>>,
195
196 focus: Arc<TimelineFocusKind>,
198
199 pub(crate) room_data_provider: P,
204
205 pub(super) settings: TimelineSettings,
207}
208
209#[derive(Clone)]
210pub(super) struct TimelineSettings {
211 pub(super) track_read_receipts: TimelineReadReceiptTracking,
214
215 pub(super) event_filter: Arc<TimelineEventFilterFn>,
218
219 pub(super) add_failed_to_parse: bool,
221
222 pub(super) date_divider_mode: DateDividerMode,
224}
225
226#[cfg(not(tarpaulin_include))]
227impl fmt::Debug for TimelineSettings {
228 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
229 f.debug_struct("TimelineSettings")
230 .field("track_read_receipts", &self.track_read_receipts)
231 .field("add_failed_to_parse", &self.add_failed_to_parse)
232 .finish_non_exhaustive()
233 }
234}
235
236impl Default for TimelineSettings {
237 fn default() -> Self {
238 Self {
239 track_read_receipts: TimelineReadReceiptTracking::Disabled,
240 event_filter: Arc::new(default_event_filter),
241 add_failed_to_parse: true,
242 date_divider_mode: DateDividerMode::Daily,
243 }
244 }
245}
246
247pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
257 match event {
258 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
259 if ev.redacts(&rules.redaction).is_some() {
260 false
263 } else {
264 ev.event_type() != MessageLikeEventType::Reaction
267 }
268 }
269
270 AnySyncTimelineEvent::MessageLike(msg) => {
271 match msg.original_content() {
272 None => {
273 msg.event_type() != MessageLikeEventType::Reaction
276 }
277
278 Some(original_content) => {
279 match original_content {
280 AnyMessageLikeEventContent::RoomMessage(content) => {
281 if content
282 .relates_to
283 .as_ref()
284 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
285 {
286 return false;
288 }
289
290 match content.msgtype {
291 MessageType::Audio(_)
292 | MessageType::Emote(_)
293 | MessageType::File(_)
294 | MessageType::Image(_)
295 | MessageType::Location(_)
296 | MessageType::Notice(_)
297 | MessageType::ServerNotice(_)
298 | MessageType::Text(_)
299 | MessageType::Video(_)
300 | MessageType::VerificationRequest(_) => true,
301 #[cfg(feature = "unstable-msc4274")]
302 MessageType::Gallery(_) => true,
303 _ => false,
304 }
305 }
306
307 AnyMessageLikeEventContent::Sticker(_)
308 | AnyMessageLikeEventContent::UnstablePollStart(
309 UnstablePollStartEventContent::New(_),
310 )
311 | AnyMessageLikeEventContent::CallInvite(_)
312 | AnyMessageLikeEventContent::RtcNotification(_)
313 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
314
315 AnyMessageLikeEventContent::Beacon(_) => false,
319
320 _ => false,
321 }
322 }
323 }
324 }
325
326 AnySyncTimelineEvent::State(_) => {
327 true
329 }
330 }
331}
332
333pub(super) struct InitFocusResult {
335 pub has_events: bool,
337 pub focus_task: Option<BackgroundTaskHandle>,
340}
341
342impl<P: RoomDataProvider> TimelineController<P> {
343 pub(super) fn new(
344 room_data_provider: P,
345 focus: TimelineFocus,
346 internal_id_prefix: Option<String>,
347 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
348 is_room_encrypted: bool,
349 settings: TimelineSettings,
350 ) -> Self {
351 let focus = match focus {
352 TimelineFocus::Live { hide_threaded_events } => {
353 TimelineFocusKind::Live { hide_threaded_events }
354 }
355
356 TimelineFocus::Event { target, thread_mode, .. } => {
357 TimelineFocusKind::Event {
358 focused_event_id: target,
359 thread_root: OnceLock::new(),
361 thread_mode,
362 }
363 }
364
365 TimelineFocus::Thread { root_event_id, .. } => {
366 TimelineFocusKind::Thread { root_event_id }
367 }
368
369 TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
370 };
371
372 let focus = Arc::new(focus);
373 let state = Arc::new(RwLock::new(TimelineState::new(
374 focus.clone(),
375 room_data_provider.own_user_id().to_owned(),
376 room_data_provider.room_version_rules(),
377 internal_id_prefix,
378 unable_to_decrypt_hook,
379 is_room_encrypted,
380 )));
381
382 Self { state, focus, room_data_provider, settings }
383 }
384
385 pub async fn handle_encryption_state_changes(&self) {
390 let mut room_info = self.room_data_provider.room_info();
391
392 let mark_encrypted = || async {
394 let mut state = self.state.write().await;
395 state.meta.is_room_encrypted = true;
396 state.mark_all_events_as_encrypted();
397 };
398
399 if room_info.get().encryption_state().is_encrypted() {
400 mark_encrypted().await;
403 return;
404 }
405
406 while let Some(info) = room_info.next().await {
407 if info.encryption_state().is_encrypted() {
408 mark_encrypted().await;
409 break;
412 }
413 }
414 }
415
416 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
425 let state = self.state.read().await;
426
427 let (count, needs) = state
428 .meta
429 .subscriber_skip_count
430 .compute_next_when_paginating_backwards(num_events.into());
431
432 let is_live_timeline = true;
434 state.meta.subscriber_skip_count.update(count, is_live_timeline);
435
436 needs
437 }
438
439 pub(super) fn is_live(&self) -> bool {
441 matches!(&*self.focus, TimelineFocusKind::Live { .. })
442 }
443
444 pub(super) fn is_threaded(&self) -> bool {
446 self.focus.is_thread()
447 }
448
449 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
452 self.focus.thread_root().map(ToOwned::to_owned)
453 }
454
455 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
459 self.state.read().await.items.clone_items()
460 }
461
462 #[cfg(test)]
463 pub(super) async fn subscribe_raw(
464 &self,
465 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
466 self.state.read().await.items.subscribe().into_values_and_stream()
467 }
468
469 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
470 let state = self.state.read().await;
471
472 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
473 }
474
475 pub(super) async fn subscribe_filter_map<U, F>(
476 &self,
477 f: F,
478 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
479 where
480 U: Clone,
481 F: Fn(Arc<TimelineItem>) -> Option<U>,
482 {
483 self.state.read().await.items.subscribe().filter_map(f)
484 }
485
486 #[instrument(skip_all)]
490 pub(super) async fn toggle_reaction_local(
491 &self,
492 item_id: &TimelineEventItemId,
493 key: &str,
494 ) -> Result<bool, Error> {
495 let mut state = self.state.write().await;
496
497 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
498 warn!("Timeline item not found, can't add reaction");
499 return Err(Error::FailedToToggleReaction);
500 };
501
502 let user_id = self.room_data_provider.own_user_id();
503 let prev_status = item
504 .content()
505 .reactions()
506 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
507
508 let Some(prev_status) = prev_status else {
509 match item.handle() {
511 TimelineItemHandle::Local(send_handle) => {
512 if send_handle
513 .react(key.to_owned())
514 .await
515 .map_err(|err| Error::SendQueueError(err.into()))?
516 .is_some()
517 {
518 trace!("adding a reaction to a local echo");
519 return Ok(true);
520 }
521
522 warn!("couldn't toggle reaction for local echo");
523 return Ok(false);
524 }
525
526 TimelineItemHandle::Remote(event_id) => {
527 trace!("adding a reaction to a remote echo");
531 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
532 self.room_data_provider
533 .send(ReactionEventContent::from(annotation).into())
534 .await?;
535 return Ok(true);
536 }
537 }
538 };
539
540 trace!("removing a previous reaction");
541 match prev_status {
542 ReactionStatus::LocalToLocal(send_reaction_handle) => {
543 if let Some(handle) = send_reaction_handle {
544 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
545 warn!("unexpectedly unable to abort sending of local reaction");
548 }
549 } else {
550 warn!("no send reaction handle (this should only happen in testing contexts)");
551 }
552 }
553
554 ReactionStatus::LocalToRemote(send_handle) => {
555 trace!("aborting send of the previous reaction that was a local echo");
558 if let Some(handle) = send_handle {
559 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
560 warn!("unexpectedly unable to abort sending of local reaction");
563 }
564 } else {
565 warn!("no send handle (this should only happen in testing contexts)");
566 }
567 }
568
569 ReactionStatus::RemoteToRemote(event_id) => {
570 let Some(annotated_event_id) =
572 item.as_remote().map(|event_item| event_item.event_id.clone())
573 else {
574 warn!("remote reaction to remote event, but the associated item isn't remote");
575 return Ok(false);
576 };
577
578 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
579 let reaction_info = reactions.remove_reaction(user_id, key);
580
581 if reaction_info.is_some() {
582 let new_item = item.with_reactions(reactions);
583 state.items.replace(item_pos, new_item);
584 } else {
585 warn!(
586 "reaction is missing on the item, not removing it locally, \
587 but sending redaction."
588 );
589 }
590
591 drop(state);
593
594 trace!("sending redact for a previous reaction");
595 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
596 if let Some(reaction_info) = reaction_info {
597 debug!("sending redact failed, adding the reaction back to the list");
598
599 let mut state = self.state.write().await;
600 if let Some((item_pos, item)) =
601 rfind_event_by_id(&state.items, &annotated_event_id)
602 {
603 let mut reactions =
605 item.content().reactions().cloned().unwrap_or_default();
606 reactions
607 .entry(key.to_owned())
608 .or_default()
609 .insert(user_id.to_owned(), reaction_info);
610 let new_item = item.with_reactions(reactions);
611 state.items.replace(item_pos, new_item);
612 } else {
613 warn!(
614 "couldn't find item to re-add reaction anymore; \
615 maybe it's been redacted?"
616 );
617 }
618 }
619
620 return Err(err);
621 }
622 }
623 }
624
625 Ok(false)
626 }
627
628 pub(super) async fn handle_remote_events_with_diffs(
630 &self,
631 diffs: Vec<VectorDiff<TimelineEvent>>,
632 origin: RemoteEventOrigin,
633 ) {
634 if diffs.is_empty() {
635 return;
636 }
637
638 let mut state = self.state.write().await;
639 state
640 .handle_remote_events_with_diffs(
641 diffs,
642 origin,
643 &self.room_data_provider,
644 &self.settings,
645 )
646 .await
647 }
648
649 pub(super) async fn handle_remote_aggregations(
651 &self,
652 diffs: Vec<VectorDiff<TimelineEvent>>,
653 origin: RemoteEventOrigin,
654 ) {
655 if diffs.is_empty() {
656 return;
657 }
658
659 let mut state = self.state.write().await;
660 state
661 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
662 .await
663 }
664
665 pub(super) async fn clear(&self) {
666 self.state.write().await.clear();
667 }
668
669 pub(super) async fn replace_with_initial_remote_events<Events>(
677 &self,
678 events: Events,
679 origin: RemoteEventOrigin,
680 ) where
681 Events: IntoIterator,
682 <Events as IntoIterator>::Item: Into<TimelineEvent>,
683 {
684 let mut state = self.state.write().await;
685
686 let track_read_markers = &self.settings.track_read_receipts;
687 if track_read_markers.is_enabled() {
688 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
689 state
690 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
691 .await;
692 }
693
694 let mut events = events.into_iter().peekable();
700 if !state.items.is_empty() || events.peek().is_some() {
701 state
702 .replace_with_remote_events(
703 events,
704 origin,
705 &self.room_data_provider,
706 &self.settings,
707 )
708 .await;
709 }
710
711 if track_read_markers.is_enabled() {
712 if let Some(fully_read_event_id) =
713 self.room_data_provider.load_fully_read_marker().await
714 {
715 state.handle_fully_read_marker(fully_read_event_id);
716 } else if let Some(latest_receipt_event_id) = state
717 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
718 {
719 debug!("no `m.fully_read` marker found, falling back to read receipt");
721 state.handle_fully_read_marker(latest_receipt_event_id);
722 }
723 }
724 }
725
726 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
727 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
728 }
729
730 pub(super) async fn handle_ephemeral_events(
731 &self,
732 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
733 ) {
734 if events.is_empty() {
736 return;
737 }
738 let mut state = self.state.write().await;
739 state.handle_ephemeral_events(events, &self.room_data_provider).await;
740 }
741
742 #[instrument(skip_all)]
744 pub(super) async fn handle_local_event(
745 &self,
746 txn_id: OwnedTransactionId,
747 content: AnyMessageLikeEventContent,
748 send_handle: Option<SendHandle>,
749 ) {
750 let sender = self.room_data_provider.own_user_id().to_owned();
751 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
752
753 let date_divider_mode = self.settings.date_divider_mode.clone();
754
755 let mut state = self.state.write().await;
756 state
757 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
758 .await;
759 }
760
761 #[instrument(skip(self))]
766 pub(super) async fn update_event_send_state(
767 &self,
768 txn_id: &TransactionId,
769 send_state: EventSendState,
770 ) {
771 let mut state = self.state.write().await;
772 let mut txn = state.transaction();
773
774 let new_event_id: Option<&EventId> =
775 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
776
777 if rfind_event_item(&txn.items, |it| {
780 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
781 })
782 .is_some()
783 {
784 trace!("Remote echo received before send-event response");
786
787 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
788
789 if let Some((idx, _)) = local_echo {
793 warn!("Message echo got duplicated, removing the local one");
794 txn.items.remove(idx);
795
796 let mut adjuster =
798 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
799 adjuster.run(&mut txn.items, &mut txn.meta);
800 }
801
802 txn.commit();
803 return;
804 }
805
806 let result = rfind_event_item(&txn.items, |it| {
808 it.transaction_id() == Some(txn_id)
809 || new_event_id.is_some()
810 && it.event_id() == new_event_id
811 && it.as_local().is_some()
812 });
813
814 let Some((idx, item)) = result else {
815 if let Some(new_event_id) = new_event_id {
820 if txn.meta.aggregations.mark_aggregation_as_sent(
821 txn_id.to_owned(),
822 new_event_id.to_owned(),
823 &mut txn.items,
824 &txn.meta.room_version_rules,
825 ) {
826 trace!("Aggregation marked as sent");
827 txn.commit();
828 return;
829 }
830
831 trace!("Sent aggregation was not found");
832 }
833
834 warn!("Timeline item not found, can't update send state");
835 return;
836 };
837
838 let Some(local_item) = item.as_local() else {
839 warn!("We looked for a local item, but it transitioned to remote.");
840 return;
841 };
842
843 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
846 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
847 }
848
849 if let Some(new_event_id) = new_event_id {
852 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
853 }
854
855 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
856 txn.items.replace(idx, new_item);
857
858 txn.commit();
859 }
860
861 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
862 let mut state = self.state.write().await;
863
864 if let Some((idx, _)) =
865 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
866 {
867 let mut txn = state.transaction();
868
869 txn.items.remove(idx);
870
871 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
874 adjuster.run(&mut txn.items, &mut txn.meta);
875
876 txn.meta.update_read_marker(&mut txn.items);
877
878 txn.commit();
879
880 debug!("discarded local echo");
881 return true;
882 }
883
884 let mut txn = state.transaction();
887
888 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
890 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
891 &mut txn.items,
892 ) {
893 Ok(val) => val,
894 Err(err) => {
895 warn!("error when discarding local echo for an aggregation: {err}");
896 true
898 }
899 };
900
901 if found_aggregation {
902 txn.commit();
903 }
904
905 found_aggregation
906 }
907
908 pub(super) async fn replace_local_echo(
909 &self,
910 txn_id: &TransactionId,
911 content: AnyMessageLikeEventContent,
912 ) -> bool {
913 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
914 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
919 return false;
920 };
921
922 let mut state = self.state.write().await;
923 let mut txn = state.transaction();
924
925 let Some((idx, prev_item)) =
926 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
927 else {
928 debug!("Can't find local echo to replace");
929 return false;
930 };
931
932 let ti_kind = {
935 let Some(prev_local_item) = prev_item.as_local() else {
936 warn!("We looked for a local item, but it transitioned as remote??");
937 return false;
938 };
939 let progress = as_variant!(&prev_local_item.send_state,
941 EventSendState::NotSentYet { progress } => progress.clone())
942 .flatten();
943 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
944 };
945
946 let new_item = TimelineItem::new(
948 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
949 content.msgtype,
950 content.mentions,
951 prev_item.content().reactions().cloned().unwrap_or_default(),
952 prev_item.content().thread_root(),
953 prev_item.content().in_reply_to(),
954 prev_item.content().thread_summary(),
955 )),
956 prev_item.internal_id.to_owned(),
957 );
958
959 txn.items.replace(idx, new_item);
960
961 txn.commit();
965
966 debug!("Replaced local echo");
967 true
968 }
969
970 pub(super) async fn compute_redecryption_candidates(
971 &self,
972 ) -> (BTreeSet<String>, BTreeSet<String>) {
973 let state = self.state.read().await;
974 compute_redecryption_candidates(&state.items)
975 }
976
977 pub(super) async fn set_sender_profiles_pending(&self) {
978 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
979 }
980
981 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
982 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
983 }
984
985 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
986 self.state.write().await.items.for_each(|mut entry| {
987 let Some(event_item) = entry.as_event() else { return };
988 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
989 let new_item = entry.with_kind(TimelineItemKind::Event(
990 event_item.with_sender_profile(profile_state.clone()),
991 ));
992 ObservableItemsEntry::replace(&mut entry, new_item);
993 }
994 });
995 }
996
997 pub(super) async fn update_missing_sender_profiles(&self) {
998 trace!("Updating missing sender profiles");
999
1000 let mut state = self.state.write().await;
1001 let mut entries = state.items.entries();
1002 while let Some(mut entry) = entries.next() {
1003 let Some(event_item) = entry.as_event() else { continue };
1004 let event_id = event_item.event_id().map(debug);
1005 let transaction_id = event_item.transaction_id().map(debug);
1006
1007 if event_item.sender_profile().is_ready() {
1008 trace!(event_id, transaction_id, "Profile already set");
1009 continue;
1010 }
1011
1012 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1013 Some(profile) => {
1014 trace!(event_id, transaction_id, "Adding profile");
1015 let updated_item =
1016 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1017 let new_item = entry.with_kind(updated_item);
1018 ObservableItemsEntry::replace(&mut entry, new_item);
1019 }
1020 None => {
1021 if !event_item.sender_profile().is_unavailable() {
1022 trace!(event_id, transaction_id, "Marking profile unavailable");
1023 let updated_item =
1024 event_item.with_sender_profile(TimelineDetails::Unavailable);
1025 let new_item = entry.with_kind(updated_item);
1026 ObservableItemsEntry::replace(&mut entry, new_item);
1027 } else {
1028 debug!(event_id, transaction_id, "Profile already marked unavailable");
1029 }
1030 }
1031 }
1032 }
1033
1034 trace!("Done updating missing sender profiles");
1035 }
1036
1037 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1039 trace!("Forcing update of sender profiles: {sender_ids:?}");
1040
1041 let mut state = self.state.write().await;
1042 let mut entries = state.items.entries();
1043 while let Some(mut entry) = entries.next() {
1044 let Some(event_item) = entry.as_event() else { continue };
1045 if !sender_ids.contains(event_item.sender()) {
1046 continue;
1047 }
1048
1049 let event_id = event_item.event_id().map(debug);
1050 let transaction_id = event_item.transaction_id().map(debug);
1051
1052 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1053 Some(profile) => {
1054 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1055 {
1056 debug!(event_id, transaction_id, "Profile already up-to-date");
1057 } else {
1058 trace!(event_id, transaction_id, "Updating profile");
1059 let updated_item =
1060 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1061 let new_item = entry.with_kind(updated_item);
1062 ObservableItemsEntry::replace(&mut entry, new_item);
1063 }
1064 }
1065 None => {
1066 if !event_item.sender_profile().is_unavailable() {
1067 trace!(event_id, transaction_id, "Marking profile unavailable");
1068 let updated_item =
1069 event_item.with_sender_profile(TimelineDetails::Unavailable);
1070 let new_item = entry.with_kind(updated_item);
1071 ObservableItemsEntry::replace(&mut entry, new_item);
1072 } else {
1073 debug!(event_id, transaction_id, "Profile already marked unavailable");
1074 }
1075 }
1076 }
1077 }
1078
1079 trace!("Done forcing update of sender profiles");
1080 }
1081
1082 #[cfg(test)]
1083 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1084 let own_user_id = self.room_data_provider.own_user_id();
1085 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1086 }
1087
1088 pub(super) async fn latest_user_read_receipt(
1092 &self,
1093 user_id: &UserId,
1094 ) -> Option<(OwnedEventId, Receipt)> {
1095 let receipt_thread = self.focus.receipt_thread();
1096
1097 self.state
1098 .read()
1099 .await
1100 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1101 .await
1102 }
1103
1104 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1107 &self,
1108 user_id: &UserId,
1109 ) -> Option<OwnedEventId> {
1110 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1111 }
1112
1113 pub async fn subscribe_own_user_read_receipts_changed(
1115 &self,
1116 ) -> impl Stream<Item = ()> + use<P> {
1117 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1118 }
1119
1120 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1122 match echo.content {
1123 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1124 let content = match serialized_event.deserialize() {
1125 Ok(d) => d,
1126 Err(err) => {
1127 warn!("error deserializing local echo: {err}");
1128 return;
1129 }
1130 };
1131
1132 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1133 .await;
1134
1135 if let Some(send_error) = send_error {
1136 self.update_event_send_state(
1137 &echo.transaction_id,
1138 EventSendState::SendingFailed {
1139 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1140 send_error,
1141 ))),
1142 is_recoverable: false,
1143 },
1144 )
1145 .await;
1146 }
1147 }
1148
1149 LocalEchoContent::React { key, send_handle, applies_to } => {
1150 self.handle_local_reaction(key, send_handle, applies_to).await;
1151 }
1152
1153 LocalEchoContent::Redaction { redacts, send_error, .. } => {
1154 self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
1155
1156 if let Some(send_error) = send_error {
1157 self.update_event_send_state(
1158 &echo.transaction_id,
1159 EventSendState::SendingFailed {
1160 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1161 send_error,
1162 ))),
1163 is_recoverable: false,
1164 },
1165 )
1166 .await;
1167 }
1168 }
1169 }
1170 }
1171
1172 #[instrument(skip(self, send_handle))]
1174 async fn handle_local_reaction(
1175 &self,
1176 reaction_key: String,
1177 send_handle: SendReactionHandle,
1178 applies_to: OwnedTransactionId,
1179 ) {
1180 let mut state = self.state.write().await;
1181 let mut tr = state.transaction();
1182
1183 let target = TimelineEventItemId::TransactionId(applies_to);
1184
1185 let reaction_txn_id = send_handle.transaction_id().to_owned();
1186 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1187 let aggregation = Aggregation::new(
1188 TimelineEventItemId::TransactionId(reaction_txn_id),
1189 AggregationKind::Reaction {
1190 key: reaction_key.clone(),
1191 sender: self.room_data_provider.own_user_id().to_owned(),
1192 timestamp: MilliSecondsSinceUnixEpoch::now(),
1193 reaction_status,
1194 },
1195 );
1196
1197 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1198 find_item_and_apply_aggregation(
1199 &tr.meta.aggregations,
1200 &mut tr.items,
1201 &target,
1202 aggregation,
1203 &tr.meta.room_version_rules,
1204 );
1205
1206 tr.commit();
1207 }
1208
1209 pub(super) async fn handle_local_redaction(
1211 &self,
1212 txn_id: OwnedTransactionId,
1213 redacts: OwnedEventId,
1214 ) {
1215 let mut state = self.state.write().await;
1216 let mut tr = state.transaction();
1217
1218 let target = TimelineEventItemId::EventId(redacts);
1219
1220 let aggregation = Aggregation::new(
1221 TimelineEventItemId::TransactionId(txn_id),
1222 AggregationKind::Redaction { is_local: true },
1223 );
1224
1225 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1226 find_item_and_apply_aggregation(
1227 &tr.meta.aggregations,
1228 &mut tr.items,
1229 &target,
1230 aggregation,
1231 &tr.meta.room_version_rules,
1232 );
1233
1234 tr.commit();
1235 }
1236
1237 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1239 match update {
1240 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1241 self.handle_local_echo(echo).await;
1242 }
1243
1244 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1245 if !self.discard_local_echo(&transaction_id).await {
1246 warn!("couldn't find the local echo to discard");
1247 }
1248 }
1249
1250 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1251 let content = match new_content.deserialize() {
1252 Ok(d) => d,
1253 Err(err) => {
1254 warn!("error deserializing local echo (upon edit): {err}");
1255 return;
1256 }
1257 };
1258
1259 if !self.replace_local_echo(&transaction_id, content).await {
1260 warn!("couldn't find the local echo to replace");
1261 }
1262 }
1263
1264 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1265 self.update_event_send_state(
1266 &transaction_id,
1267 EventSendState::SendingFailed { error, is_recoverable },
1268 )
1269 .await;
1270 }
1271
1272 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1273 self.update_event_send_state(
1274 &transaction_id,
1275 EventSendState::NotSentYet { progress: None },
1276 )
1277 .await;
1278 }
1279
1280 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1281 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1282 .await;
1283 }
1284
1285 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1286 self.update_event_send_state(
1287 &related_to,
1288 EventSendState::NotSentYet {
1289 progress: Some(MediaUploadProgress { index, progress }),
1290 },
1291 )
1292 .await;
1293 }
1294 }
1295 }
1296
1297 pub async fn insert_timeline_start_if_missing(&self) {
1300 let mut state = self.state.write().await;
1301 let mut txn = state.transaction();
1302 txn.items.push_timeline_start_if_missing(
1303 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1304 );
1305 txn.commit();
1306 }
1307
1308 pub(super) async fn make_replied_to(
1314 &self,
1315 event: TimelineEvent,
1316 ) -> Result<Option<EmbeddedEvent>, Error> {
1317 let state = self.state.read().await;
1318 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1319 }
1320}
1321
1322impl TimelineController {
1323 pub(super) fn room(&self) -> &Room {
1324 &self.room_data_provider
1325 }
1326
1327 pub(super) async fn init_focus(
1332 &self,
1333 focus: &TimelineFocus,
1334 room_event_cache: &RoomEventCache,
1335 ) -> Result<InitFocusResult, Error> {
1336 match focus {
1337 TimelineFocus::Live { .. } => {
1338 let events = room_event_cache.events().await?;
1340
1341 let has_events = !events.is_empty();
1342
1343 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1344
1345 match room_event_cache.pagination().status().get() {
1346 PaginationStatus::Idle { hit_timeline_start } => {
1347 if hit_timeline_start {
1348 self.insert_timeline_start_if_missing().await;
1351 }
1352 }
1353 PaginationStatus::Paginating => {}
1354 }
1355
1356 Ok(InitFocusResult { has_events, focus_task: None })
1357 }
1358
1359 TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
1360 let event_cache_thread_mode = match thread_mode {
1362 TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
1363 TimelineEventFocusThreadMode::Automatic { .. } => {
1364 EventFocusThreadMode::Automatic
1365 }
1366 };
1367
1368 let cache = room_event_cache
1369 .get_or_create_event_focused_cache(
1370 event_id.clone(),
1371 *num_context_events,
1372 event_cache_thread_mode,
1373 )
1374 .await
1375 .map_err(PaginationError::EventCache)?;
1376
1377 let (events, receiver) = cache.subscribe().await;
1378
1379 let has_events = !events.is_empty();
1380
1381 match &*self.focus {
1384 TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
1385 if let Some(thread_root) = cache.thread_root().await {
1386 focus_thread_root.get_or_init(|| thread_root);
1387 }
1388 }
1389 TimelineFocusKind::Live { .. }
1390 | TimelineFocusKind::Thread { .. }
1391 | TimelineFocusKind::PinnedEvents => {
1392 panic!("unexpected focus for an event-focused timeline")
1393 }
1394 }
1395
1396 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
1397 .await;
1398
1399 let task = self
1400 .room_data_provider
1401 .client()
1402 .task_monitor()
1403 .spawn_infinite_task(
1404 "timeline::event_focused_cache_updates",
1405 event_focused_task(
1406 event_id.clone(),
1407 (*thread_mode).into(),
1408 room_event_cache.clone(),
1409 self.clone(),
1410 receiver,
1411 ),
1412 )
1413 .abort_on_drop();
1414
1415 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1416 }
1417
1418 TimelineFocus::Thread { root_event_id, .. } => {
1419 let (has_events, receiver) =
1420 self.init_with_thread_root(root_event_id, room_event_cache).await?;
1421
1422 let room = &self.room_data_provider;
1423 let span = info_span!(
1424 parent: Span::none(),
1425 "thread_live_update_handler",
1426 room_id = ?room.room_id(),
1427 );
1428 span.follows_from(Span::current());
1429
1430 let task = room
1431 .client()
1432 .task_monitor()
1433 .spawn_infinite_task(
1434 "timeline::thread_event_cache_updates",
1435 thread_updates_task(
1436 receiver,
1437 room_event_cache.clone(),
1438 self.clone(),
1439 root_event_id.clone(),
1440 )
1441 .instrument(span),
1442 )
1443 .abort_on_drop();
1444
1445 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1446 }
1447
1448 TimelineFocus::PinnedEvents => {
1449 let (initial_events, pinned_events_recv) =
1450 room_event_cache.subscribe_to_pinned_events().await?;
1451
1452 let has_events = !initial_events.is_empty();
1453
1454 self.replace_with_initial_remote_events(
1455 initial_events,
1456 RemoteEventOrigin::Pagination,
1457 )
1458 .await;
1459
1460 let task = self
1461 .room_data_provider
1462 .client()
1463 .task_monitor()
1464 .spawn_infinite_task(
1465 "timeline::pinned_event_cache_updates",
1466 pinned_events_task(
1467 room_event_cache.clone(),
1468 self.clone(),
1469 pinned_events_recv,
1470 ),
1471 )
1472 .abort_on_drop();
1473
1474 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1475 }
1476 }
1477 }
1478
1479 pub(super) async fn init_with_thread_root(
1486 &self,
1487 root_event_id: &OwnedEventId,
1488 room_event_cache: &RoomEventCache,
1489 ) -> Result<(bool, broadcast::Receiver<TimelineVectorDiffs>), Error> {
1490 let (events, receiver) =
1491 room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
1492 let has_events = !events.is_empty();
1493
1494 let mut related_events = Vector::new();
1498 for event_id in events.iter().filter_map(|event| event.event_id()) {
1499 if let Some((_original, related)) =
1500 room_event_cache.find_event_with_relations(&event_id, None).await?
1501 {
1502 related_events.extend(related);
1503 }
1504 }
1505
1506 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1507
1508 if !related_events.is_empty() {
1510 self.handle_remote_aggregations(
1511 vec![VectorDiff::Append { values: related_events }],
1512 RemoteEventOrigin::Cache,
1513 )
1514 .await;
1515 }
1516
1517 Ok((has_events, receiver))
1518 }
1519
1520 #[instrument(skip(self))]
1523 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1524 let state_guard = self.state.write().await;
1525 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1526 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1527 let remote_item = item
1528 .as_remote()
1529 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1530 .clone();
1531
1532 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1533 debug!("Event is not a message");
1534 return Ok(());
1535 };
1536 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1537 debug!("Event is not a reply");
1538 return Ok(());
1539 };
1540 if let TimelineDetails::Pending = &in_reply_to.event {
1541 debug!("Replied-to event is already being fetched");
1542 return Ok(());
1543 }
1544 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1545 debug!("Replied-to event has already been fetched");
1546 return Ok(());
1547 }
1548
1549 let internal_id = item.internal_id.to_owned();
1550 let item = item.clone();
1551 let event = fetch_replied_to_event(
1552 state_guard,
1553 &self.state,
1554 index,
1555 &item,
1556 internal_id,
1557 &msglike,
1558 &in_reply_to.event_id,
1559 self.room(),
1560 )
1561 .await?;
1562
1563 let mut state = self.state.write().await;
1566 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1567 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1568
1569 let TimelineItemContent::MsgLike(MsgLikeContent {
1572 kind: MsgLikeKind::Message(message),
1573 reactions,
1574 thread_root,
1575 in_reply_to,
1576 thread_summary,
1577 }) = item.content().clone()
1578 else {
1579 info!("Event is no longer a message (redacted?)");
1580 return Ok(());
1581 };
1582 let Some(in_reply_to) = in_reply_to else {
1583 warn!("Event no longer has a reply (bug?)");
1584 return Ok(());
1585 };
1586
1587 trace!("Updating in-reply-to details");
1590 let internal_id = item.internal_id.to_owned();
1591 let mut item = item.clone();
1592 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1593 kind: MsgLikeKind::Message(message),
1594 reactions,
1595 thread_root,
1596 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1597 thread_summary,
1598 }));
1599 state.items.replace(index, TimelineItem::new(item, internal_id));
1600
1601 Ok(())
1602 }
1603
1604 pub(super) fn infer_thread_for_read_receipt(
1610 &self,
1611 receipt_type: &SendReceiptType,
1612 ) -> ReceiptThread {
1613 if matches!(receipt_type, SendReceiptType::FullyRead) {
1614 ReceiptThread::Unthreaded
1615 } else {
1616 self.focus.receipt_thread()
1617 }
1618 }
1619
1620 pub(super) async fn should_send_receipt(
1624 &self,
1625 receipt_type: &SendReceiptType,
1626 receipt_thread: &ReceiptThread,
1627 event_id: &EventId,
1628 ) -> bool {
1629 let own_user_id = self.room().own_user_id();
1630 let state = self.state.read().await;
1631 let room = self.room();
1632
1633 match receipt_type {
1634 SendReceiptType::Read => {
1635 if let Some((old_pub_read, _)) = state
1636 .meta
1637 .user_receipt(
1638 own_user_id,
1639 ReceiptType::Read,
1640 receipt_thread.clone(),
1641 room,
1642 state.items.all_remote_events(),
1643 )
1644 .await
1645 {
1646 trace!(%old_pub_read, "found a previous public receipt");
1647 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1648 &old_pub_read,
1649 event_id,
1650 state.items.all_remote_events(),
1651 ) {
1652 trace!(
1653 "event referred to new receipt is {relative_pos:?} the previous receipt"
1654 );
1655 return relative_pos == RelativePosition::After;
1656 }
1657 }
1658 }
1659
1660 SendReceiptType::ReadPrivate => {
1663 if let Some((old_priv_read, _)) =
1664 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1665 {
1666 trace!(%old_priv_read, "found a previous private receipt");
1667 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1668 &old_priv_read,
1669 event_id,
1670 state.items.all_remote_events(),
1671 ) {
1672 trace!(
1673 "event referred to new receipt is {relative_pos:?} the previous receipt"
1674 );
1675 return relative_pos == RelativePosition::After;
1676 }
1677 }
1678 }
1679
1680 SendReceiptType::FullyRead => {
1681 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1682 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1683 &prev_event_id,
1684 event_id,
1685 state.items.all_remote_events(),
1686 )
1687 {
1688 return relative_pos == RelativePosition::After;
1689 }
1690 }
1691
1692 _ => {}
1693 }
1694
1695 true
1697 }
1698
1699 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1702 let state = self.state.read().await;
1703 let filter_out_thread_events = match self.focus() {
1704 TimelineFocusKind::Thread { .. } => false,
1705 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
1706 TimelineFocusKind::Event { .. } => {
1707 false
1709 }
1710 TimelineFocusKind::PinnedEvents => true,
1711 };
1712
1713 state
1714 .items
1715 .all_remote_events()
1716 .iter()
1717 .rev()
1718 .filter_map(|event_meta| {
1719 if !filter_out_thread_events {
1720 Some(event_meta.event_id.clone())
1722 } else if event_meta.thread_root_id.is_none() {
1723 if let Some(TimelineEventItemId::EventId(target_event_id)) =
1729 state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1730 event_meta.event_id.clone(),
1731 ))
1732 && let Some(target_meta) =
1733 state.items.all_remote_events().get_by_event_id(target_event_id)
1734 && target_meta.thread_root_id.is_some()
1735 {
1736 None
1738 } else {
1739 Some(event_meta.event_id.clone())
1742 }
1743 } else {
1744 None
1747 }
1748 })
1749 .next()
1750 }
1751
1752 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1753 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1754 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1755
1756 let request = DecryptionRetryRequest {
1757 room_id: self.room().room_id().to_owned(),
1758 utd_session_ids: utds,
1759 refresh_info_session_ids: decrypted,
1760 };
1761
1762 self.room().client().event_cache().request_decryption(request);
1763 }
1764
1765 pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1773 match status {
1774 PaginationStatus::Idle { hit_timeline_start } => {
1775 if hit_timeline_start {
1776 let state = self.state.read().await;
1777 if state.meta.subscriber_skip_count.get() > 0 {
1781 return PaginationStatus::Idle { hit_timeline_start: false };
1782 }
1783 }
1784 }
1785 PaginationStatus::Paginating => {}
1786 }
1787
1788 status
1790 }
1791}
1792
1793impl<P: RoomDataProvider> TimelineController<P> {
1794 pub(super) fn focus(&self) -> &TimelineFocusKind {
1796 &self.focus
1797 }
1798}
1799
1800#[allow(clippy::too_many_arguments)]
1801async fn fetch_replied_to_event<P: RoomDataProvider>(
1802 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1803 state_lock: &RwLock<TimelineState<P>>,
1804 index: usize,
1805 item: &EventTimelineItem,
1806 internal_id: TimelineUniqueId,
1807 msglike: &MsgLikeContent,
1808 in_reply_to: &EventId,
1809 room: &Room,
1810) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1811 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1812 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1813 trace!("Found replied-to event locally");
1814 return Ok(details);
1815 }
1816
1817 trace!("Setting in-reply-to details to pending");
1820 let in_reply_to_details =
1821 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1822
1823 let event_item = item
1824 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1825
1826 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1827 state_guard.items.replace(index, new_timeline_item);
1828
1829 drop(state_guard);
1831
1832 trace!("Fetching replied-to event");
1833 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1834 Ok(timeline_event) => {
1835 let state = state_lock.read().await;
1836
1837 let replied_to_item =
1838 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1839
1840 if let Some(item) = replied_to_item {
1841 TimelineDetails::Ready(Box::new(item))
1842 } else {
1843 return Err(Error::UnsupportedEvent);
1845 }
1846 }
1847
1848 Err(e) => TimelineDetails::Error(Arc::new(e)),
1849 };
1850
1851 Ok(res)
1852}