1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::VectorDiff;
20use eyeball_im_util::vector::VectorObserverExt;
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
25use matrix_sdk::{
26 deserialized_responses::TimelineEvent,
27 event_cache::{
28 paginator::{PaginationResult, Paginator},
29 RoomEventCache,
30 },
31 send_queue::{
32 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
33 },
34 Result, Room,
35};
36use ruma::{
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 poll::unstable_start::UnstablePollStartEventContent,
40 reaction::ReactionEventContent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 relation::Annotation,
43 room::message::{MessageType, Relation},
44 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
45 AnySyncTimelineEvent, MessageLikeEventType,
46 },
47 serde::Raw,
48 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
49 TransactionId, UserId,
50};
51#[cfg(test)]
52use ruma::{events::receipt::ReceiptEventContent, OwnedRoomId, RoomId};
53use tokio::sync::{RwLock, RwLockWriteGuard};
54use tracing::{debug, error, field::debug, info, instrument, trace, warn};
55
56pub(super) use self::{
57 metadata::{RelativePosition, TimelineMetadata},
58 observable_items::{
59 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
60 ObservableItemsTransactionEntry,
61 },
62 state::{FullEventMeta, PendingEdit, PendingEditKind, TimelineState},
63 state_transaction::TimelineStateTransaction,
64};
65use super::{
66 algorithms::{rfind_event_by_id, rfind_event_item},
67 event_handler::TimelineEventKind,
68 event_item::{ReactionStatus, RemoteEventOrigin},
69 item::TimelineUniqueId,
70 subscriber::TimelineSubscriber,
71 traits::{Decryptor, RoomDataProvider},
72 DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, PaginationError,
73 Profile, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
74 TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
75};
76use crate::{
77 timeline::{
78 algorithms::rfind_event_by_item_id,
79 date_dividers::DateDividerAdjuster,
80 event_item::EventTimelineItemKind,
81 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
82 MsgLikeContent, MsgLikeKind, TimelineEventFilterFn,
83 },
84 unable_to_decrypt_hook::UtdHookManager,
85};
86
87mod aggregations;
88mod decryption_retry_task;
89mod metadata;
90mod observable_items;
91mod read_receipts;
92mod state;
93mod state_transaction;
94
95pub(super) use aggregations::*;
96
97#[derive(Debug)]
99enum TimelineFocusData<P: RoomDataProvider> {
100 Live,
102
103 Event {
106 event_id: OwnedEventId,
108 paginator: Paginator<P>,
110 num_context_events: u16,
112 },
113
114 PinnedEvents {
115 loader: PinnedEventsLoader,
116 },
117}
118
119#[derive(Clone, Debug)]
120pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
121 state: Arc<RwLock<TimelineState>>,
123
124 focus: Arc<RwLock<TimelineFocusData<P>>>,
126
127 pub(crate) room_data_provider: P,
131
132 pub(super) settings: TimelineSettings,
134
135 decryption_retry_task: DecryptionRetryTask<D>,
138}
139
140#[derive(Clone)]
141pub(super) struct TimelineSettings {
142 pub(super) track_read_receipts: bool,
144
145 pub(super) event_filter: Arc<TimelineEventFilterFn>,
148
149 pub(super) add_failed_to_parse: bool,
151
152 pub(super) date_divider_mode: DateDividerMode,
154}
155
156#[cfg(not(tarpaulin_include))]
157impl fmt::Debug for TimelineSettings {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 f.debug_struct("TimelineSettings")
160 .field("track_read_receipts", &self.track_read_receipts)
161 .field("add_failed_to_parse", &self.add_failed_to_parse)
162 .finish_non_exhaustive()
163 }
164}
165
166impl Default for TimelineSettings {
167 fn default() -> Self {
168 Self {
169 track_read_receipts: false,
170 event_filter: Arc::new(default_event_filter),
171 add_failed_to_parse: true,
172 date_divider_mode: DateDividerMode::Daily,
173 }
174 }
175}
176
177#[derive(Debug, Clone, Copy)]
178pub(super) enum TimelineFocusKind {
179 Live,
180 Event,
181 PinnedEvents,
182}
183
184pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
194 match event {
195 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
196 if ev.redacts(room_version).is_some() {
197 false
200 } else {
201 ev.event_type() != MessageLikeEventType::Reaction
204 }
205 }
206
207 AnySyncTimelineEvent::MessageLike(msg) => {
208 match msg.original_content() {
209 None => {
210 msg.event_type() != MessageLikeEventType::Reaction
213 }
214
215 Some(original_content) => {
216 match original_content {
217 AnyMessageLikeEventContent::RoomMessage(content) => {
218 if content
219 .relates_to
220 .as_ref()
221 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
222 {
223 return false;
225 }
226
227 matches!(
228 content.msgtype,
229 MessageType::Audio(_)
230 | MessageType::Emote(_)
231 | MessageType::File(_)
232 | MessageType::Image(_)
233 | MessageType::Location(_)
234 | MessageType::Notice(_)
235 | MessageType::ServerNotice(_)
236 | MessageType::Text(_)
237 | MessageType::Video(_)
238 | MessageType::VerificationRequest(_)
239 )
240 }
241
242 AnyMessageLikeEventContent::Sticker(_)
243 | AnyMessageLikeEventContent::UnstablePollStart(
244 UnstablePollStartEventContent::New(_),
245 )
246 | AnyMessageLikeEventContent::CallInvite(_)
247 | AnyMessageLikeEventContent::CallNotify(_)
248 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
249
250 _ => false,
251 }
252 }
253 }
254 }
255
256 AnySyncTimelineEvent::State(_) => {
257 true
259 }
260 }
261}
262
263impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
264 pub(super) fn new(
265 room_data_provider: P,
266 focus: TimelineFocus,
267 internal_id_prefix: Option<String>,
268 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
269 is_room_encrypted: bool,
270 ) -> Self {
271 let (focus_data, focus_kind) = match focus {
272 TimelineFocus::Live => (TimelineFocusData::Live, TimelineFocusKind::Live),
273
274 TimelineFocus::Event { target, num_context_events } => {
275 let paginator = Paginator::new(room_data_provider.clone());
276 (
277 TimelineFocusData::Event { paginator, event_id: target, num_context_events },
278 TimelineFocusKind::Event,
279 )
280 }
281
282 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
283 TimelineFocusData::PinnedEvents {
284 loader: PinnedEventsLoader::new(
285 Arc::new(room_data_provider.clone()),
286 max_events_to_load as usize,
287 max_concurrent_requests as usize,
288 ),
289 },
290 TimelineFocusKind::PinnedEvents,
291 ),
292 };
293
294 let state = Arc::new(RwLock::new(TimelineState::new(
295 focus_kind,
296 room_data_provider.own_user_id().to_owned(),
297 room_data_provider.room_version(),
298 internal_id_prefix,
299 unable_to_decrypt_hook,
300 is_room_encrypted,
301 )));
302
303 let settings = TimelineSettings::default();
304
305 let decryption_retry_task =
306 DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
307
308 Self {
309 state,
310 focus: Arc::new(RwLock::new(focus_data)),
311 room_data_provider,
312 settings,
313 decryption_retry_task,
314 }
315 }
316
317 pub(super) async fn init_focus(
324 &self,
325 room_event_cache: &RoomEventCache,
326 ) -> Result<bool, Error> {
327 let focus_guard = self.focus.read().await;
328
329 match &*focus_guard {
330 TimelineFocusData::Live => {
331 let (events, _stream) = room_event_cache.subscribe().await;
333
334 let has_events = !events.is_empty();
335
336 self.replace_with_initial_remote_events(
337 events.into_iter(),
338 RemoteEventOrigin::Cache,
339 )
340 .await;
341
342 Ok(has_events)
343 }
344
345 TimelineFocusData::Event { event_id, paginator, num_context_events } => {
346 let start_from_result = paginator
348 .start_from(event_id, (*num_context_events).into())
349 .await
350 .map_err(PaginationError::Paginator)?;
351
352 drop(focus_guard);
353
354 let has_events = !start_from_result.events.is_empty();
355
356 self.replace_with_initial_remote_events(
357 start_from_result.events.into_iter(),
358 RemoteEventOrigin::Pagination,
359 )
360 .await;
361
362 Ok(has_events)
363 }
364
365 TimelineFocusData::PinnedEvents { loader } => {
366 let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
367
368 drop(focus_guard);
369
370 let has_events = !loaded_events.is_empty();
371
372 self.replace_with_initial_remote_events(
373 loaded_events.into_iter(),
374 RemoteEventOrigin::Pagination,
375 )
376 .await;
377
378 Ok(has_events)
379 }
380 }
381 }
382
383 pub async fn handle_encryption_state_changes(&self) {
388 let mut room_info = self.room_data_provider.room_info();
389
390 let mark_encrypted = || async {
392 let mut state = self.state.write().await;
393 state.meta.is_room_encrypted = true;
394 state.mark_all_events_as_encrypted();
395 };
396
397 if room_info.get().encryption_state().is_encrypted() {
398 mark_encrypted().await;
401 return;
402 }
403
404 while let Some(info) = room_info.next().await {
405 if info.encryption_state().is_encrypted() {
406 mark_encrypted().await;
407 break;
410 }
411 }
412 }
413
414 pub(crate) async fn reload_pinned_events(
415 &self,
416 ) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
417 let focus_guard = self.focus.read().await;
418
419 if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
420 loader.load_events().await
421 } else {
422 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
423 }
424 }
425
426 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
435 let state = self.state.read().await;
436
437 let (count, needs) = state
438 .meta
439 .subscriber_skip_count
440 .compute_next_when_paginating_backwards(num_events.into());
441
442 state.meta.subscriber_skip_count.update(count, &state.timeline_focus);
443
444 needs
445 }
446
447 pub(super) async fn focused_paginate_backwards(
452 &self,
453 num_events: u16,
454 ) -> Result<bool, PaginationError> {
455 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
456 TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
457 return Err(PaginationError::NotEventFocusMode)
458 }
459 TimelineFocusData::Event { paginator, .. } => paginator
460 .paginate_backward(num_events.into())
461 .await
462 .map_err(PaginationError::Paginator)?,
463 };
464
465 self.handle_remote_events_with_diffs(
468 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
469 RemoteEventOrigin::Pagination,
470 )
471 .await;
472
473 Ok(hit_end_of_timeline)
474 }
475
476 pub(super) async fn focused_paginate_forwards(
481 &self,
482 num_events: u16,
483 ) -> Result<bool, PaginationError> {
484 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
485 TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
486 return Err(PaginationError::NotEventFocusMode)
487 }
488 TimelineFocusData::Event { paginator, .. } => paginator
489 .paginate_forward(num_events.into())
490 .await
491 .map_err(PaginationError::Paginator)?,
492 };
493
494 self.handle_remote_events_with_diffs(
497 vec![VectorDiff::Append { values: events.into() }],
498 RemoteEventOrigin::Pagination,
499 )
500 .await;
501
502 Ok(hit_end_of_timeline)
503 }
504
505 pub(super) async fn is_live(&self) -> bool {
507 matches!(&*self.focus.read().await, TimelineFocusData::Live)
508 }
509
510 pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
511 self.settings = settings;
512 self
513 }
514
515 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
519 self.state.read().await.items.clone_items()
520 }
521
522 #[cfg(test)]
523 pub(super) async fn subscribe_raw(
524 &self,
525 ) -> (
526 Vector<Arc<TimelineItem>>,
527 impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
528 ) {
529 let state = self.state.read().await;
530
531 state.items.subscribe().into_values_and_stream()
532 }
533
534 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
535 let state = self.state.read().await;
536
537 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
538 }
539
540 pub(super) async fn subscribe_filter_map<U, F>(
541 &self,
542 f: F,
543 ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
544 where
545 U: Clone,
546 F: Fn(Arc<TimelineItem>) -> Option<U>,
547 {
548 self.state.read().await.items.subscribe().filter_map(f)
549 }
550
551 #[instrument(skip_all)]
555 pub(super) async fn toggle_reaction_local(
556 &self,
557 item_id: &TimelineEventItemId,
558 key: &str,
559 ) -> Result<bool, Error> {
560 let mut state = self.state.write().await;
561
562 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
563 warn!("Timeline item not found, can't add reaction");
564 return Err(Error::FailedToToggleReaction);
565 };
566
567 let user_id = self.room_data_provider.own_user_id();
568 let prev_status = item
569 .content()
570 .reactions()
571 .get(key)
572 .and_then(|group| group.get(user_id))
573 .map(|reaction_info| reaction_info.status.clone());
574
575 let Some(prev_status) = prev_status else {
576 match &item.kind {
577 EventTimelineItemKind::Local(local) => {
578 if let Some(send_handle) = &local.send_handle {
579 if send_handle
580 .react(key.to_owned())
581 .await
582 .map_err(|err| Error::SendQueueError(err.into()))?
583 .is_some()
584 {
585 trace!("adding a reaction to a local echo");
586 return Ok(true);
587 }
588
589 warn!("couldn't toggle reaction for local echo");
590 return Ok(false);
591 }
592
593 warn!("missing send handle for local echo; is this a test?");
594 return Ok(false);
595 }
596
597 EventTimelineItemKind::Remote(remote) => {
598 trace!("adding a reaction to a remote echo");
602 let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
603 self.room_data_provider
604 .send(ReactionEventContent::from(annotation).into())
605 .await?;
606 return Ok(true);
607 }
608 }
609 };
610
611 trace!("removing a previous reaction");
612 match prev_status {
613 ReactionStatus::LocalToLocal(send_reaction_handle) => {
614 if let Some(handle) = send_reaction_handle {
615 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
616 warn!("unexpectedly unable to abort sending of local reaction");
619 }
620 } else {
621 warn!("no send reaction handle (this should only happen in testing contexts)");
622 }
623 }
624
625 ReactionStatus::LocalToRemote(send_handle) => {
626 trace!("aborting send of the previous reaction that was a local echo");
629 if let Some(handle) = send_handle {
630 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
631 warn!("unexpectedly unable to abort sending of local reaction");
634 }
635 } else {
636 warn!("no send handle (this should only happen in testing contexts)");
637 }
638 }
639
640 ReactionStatus::RemoteToRemote(event_id) => {
641 let Some(annotated_event_id) =
643 item.as_remote().map(|event_item| event_item.event_id.clone())
644 else {
645 warn!("remote reaction to remote event, but the associated item isn't remote");
646 return Ok(false);
647 };
648
649 let mut reactions = item.content().reactions().clone();
650 let reaction_info = reactions.remove_reaction(user_id, key);
651
652 if reaction_info.is_some() {
653 let new_item = item.with_reactions(reactions);
654 state.items.replace(item_pos, new_item);
655 } else {
656 warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
657 }
658
659 drop(state);
661
662 trace!("sending redact for a previous reaction");
663 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
664 if let Some(reaction_info) = reaction_info {
665 debug!("sending redact failed, adding the reaction back to the list");
666
667 let mut state = self.state.write().await;
668 if let Some((item_pos, item)) =
669 rfind_event_by_id(&state.items, &annotated_event_id)
670 {
671 let mut reactions = item.content().reactions();
673 reactions
674 .entry(key.to_owned())
675 .or_default()
676 .insert(user_id.to_owned(), reaction_info);
677 let new_item = item.with_reactions(reactions);
678 state.items.replace(item_pos, new_item);
679 } else {
680 warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
681 }
682 }
683
684 return Err(err);
685 }
686 }
687 }
688
689 Ok(false)
690 }
691
692 pub(super) async fn handle_remote_events_with_diffs(
694 &self,
695 diffs: Vec<VectorDiff<TimelineEvent>>,
696 origin: RemoteEventOrigin,
697 ) {
698 if diffs.is_empty() {
699 return;
700 }
701
702 let mut state = self.state.write().await;
703 state
704 .handle_remote_events_with_diffs(
705 diffs,
706 origin,
707 &self.room_data_provider,
708 &self.settings,
709 )
710 .await
711 }
712
713 pub(super) async fn clear(&self) {
714 self.state.write().await.clear();
715 }
716
717 pub(super) async fn replace_with_initial_remote_events<Events>(
725 &self,
726 events: Events,
727 origin: RemoteEventOrigin,
728 ) where
729 Events: IntoIterator + ExactSizeIterator,
730 <Events as IntoIterator>::Item: Into<TimelineEvent>,
731 {
732 let mut state = self.state.write().await;
733
734 let track_read_markers = self.settings.track_read_receipts;
735 if track_read_markers {
736 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
737 state
738 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
739 .await;
740 }
741
742 if !state.items.is_empty() || events.len() > 0 {
748 state
749 .replace_with_remote_events(
750 events,
751 origin,
752 &self.room_data_provider,
753 &self.settings,
754 )
755 .await;
756 }
757
758 if track_read_markers {
759 if let Some(fully_read_event_id) =
760 self.room_data_provider.load_fully_read_marker().await
761 {
762 state.handle_fully_read_marker(fully_read_event_id);
763 }
764 }
765 }
766
767 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
768 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
769 }
770
771 pub(super) async fn handle_ephemeral_events(
772 &self,
773 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
774 ) {
775 let mut state = self.state.write().await;
776 state.handle_ephemeral_events(events, &self.room_data_provider).await;
777 }
778
779 #[instrument(skip_all)]
781 pub(super) async fn handle_local_event(
782 &self,
783 txn_id: OwnedTransactionId,
784 content: TimelineEventKind,
785 send_handle: Option<SendHandle>,
786 ) {
787 let sender = self.room_data_provider.own_user_id().to_owned();
788 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
789
790 let should_add_new_items = self.is_live().await;
792
793 let date_divider_mode = self.settings.date_divider_mode.clone();
794
795 let mut state = self.state.write().await;
796 state
797 .handle_local_event(
798 sender,
799 profile,
800 should_add_new_items,
801 date_divider_mode,
802 txn_id,
803 send_handle,
804 content,
805 )
806 .await;
807 }
808
809 #[instrument(skip(self))]
814 pub(super) async fn update_event_send_state(
815 &self,
816 txn_id: &TransactionId,
817 send_state: EventSendState,
818 ) {
819 let mut state = self.state.write().await;
820 let mut txn = state.transaction();
821
822 let new_event_id: Option<&EventId> =
823 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
824
825 if rfind_event_item(&txn.items, |it| {
828 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
829 })
830 .is_some()
831 {
832 trace!("Remote echo received before send-event response");
834
835 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
836
837 if let Some((idx, _)) = local_echo {
841 warn!("Message echo got duplicated, removing the local one");
842 txn.items.remove(idx);
843
844 let mut adjuster =
846 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
847 adjuster.run(&mut txn.items, &mut txn.meta);
848 }
849
850 txn.commit();
851 return;
852 }
853
854 let result = rfind_event_item(&txn.items, |it| {
856 it.transaction_id() == Some(txn_id)
857 || new_event_id.is_some()
858 && it.event_id() == new_event_id
859 && it.as_local().is_some()
860 });
861
862 let Some((idx, item)) = result else {
863 if let Some(new_event_id) = new_event_id {
868 match txn
869 .meta
870 .aggregations
871 .mark_aggregation_as_sent(txn_id.to_owned(), new_event_id.to_owned())
872 {
873 MarkAggregationSentResult::MarkedSent { update } => {
874 trace!("marked aggregation as sent");
875
876 if let Some((target, aggregation)) = update {
877 if let Some((item_pos, item)) =
878 rfind_event_by_item_id(&txn.items, &target)
879 {
880 let mut content = item.content().clone();
881 match aggregation.apply(&mut content) {
882 ApplyAggregationResult::UpdatedItem => {
883 trace!("reapplied aggregation in the event");
884 let internal_id = item.internal_id.to_owned();
885 let new_item = item.with_content(content);
886 txn.items.replace(
887 item_pos,
888 TimelineItem::new(new_item, internal_id),
889 );
890 txn.commit();
891 }
892 ApplyAggregationResult::LeftItemIntact => {}
893 ApplyAggregationResult::Error(err) => {
894 warn!("when reapplying aggregation just marked as sent: {err}");
895 }
896 }
897 }
898 }
899
900 return;
903 }
904
905 MarkAggregationSentResult::NotFound => {}
906 }
907 }
908
909 warn!("Timeline item not found, can't update send state");
910 return;
911 };
912
913 let Some(local_item) = item.as_local() else {
914 warn!("We looked for a local item, but it transitioned to remote.");
915 return;
916 };
917
918 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
921 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
922 }
923
924 if let Some(new_event_id) = new_event_id {
927 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
928 }
929
930 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
931 txn.items.replace(idx, new_item);
932
933 txn.commit();
934 }
935
936 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
937 let mut state = self.state.write().await;
938
939 if let Some((idx, _)) =
940 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
941 {
942 let mut txn = state.transaction();
943
944 txn.items.remove(idx);
945
946 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
949 adjuster.run(&mut txn.items, &mut txn.meta);
950
951 txn.meta.update_read_marker(&mut txn.items);
952
953 txn.commit();
954
955 debug!("discarded local echo");
956 return true;
957 }
958
959 let state = &mut *state;
962
963 if let Some((target, aggregation)) = state
965 .meta
966 .aggregations
967 .try_remove_aggregation(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
968 {
969 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, target) else {
970 warn!("missing target item for a local aggregation");
971 return false;
972 };
973
974 let mut content = item.content().clone();
975 match aggregation.unapply(&mut content) {
976 ApplyAggregationResult::UpdatedItem => {
977 trace!("removed local reaction to local echo");
978 let internal_id = item.internal_id.clone();
979 let new_item = item.with_content(content);
980 state.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
981 }
982 ApplyAggregationResult::LeftItemIntact => {}
983 ApplyAggregationResult::Error(err) => {
984 warn!("when undoing local aggregation: {err}");
985 }
986 }
987
988 return true;
989 }
990
991 false
992 }
993
994 pub(super) async fn replace_local_echo(
995 &self,
996 txn_id: &TransactionId,
997 content: AnyMessageLikeEventContent,
998 ) -> bool {
999 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1000 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1005 return false;
1006 };
1007
1008 let mut state = self.state.write().await;
1009 let mut txn = state.transaction();
1010
1011 let Some((idx, prev_item)) =
1012 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1013 else {
1014 debug!("Can't find local echo to replace");
1015 return false;
1016 };
1017
1018 let ti_kind = {
1021 let Some(prev_local_item) = prev_item.as_local() else {
1022 warn!("We looked for a local item, but it transitioned as remote??");
1023 return false;
1024 };
1025 prev_local_item.with_send_state(EventSendState::NotSentYet)
1026 };
1027
1028 let prev_reactions = prev_item.content().reactions();
1030 let prev_thread_root = prev_item.content().thread_root();
1031 let prev_in_reply_to = prev_item.content().in_reply_to();
1032 let new_item = TimelineItem::new(
1033 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1034 content,
1035 None,
1036 prev_reactions,
1037 prev_thread_root,
1038 prev_in_reply_to,
1039 )),
1040 prev_item.internal_id.to_owned(),
1041 );
1042
1043 txn.items.replace(idx, new_item);
1044
1045 txn.commit();
1049
1050 debug!("Replaced local echo");
1051 true
1052 }
1053
1054 async fn retry_event_decryption_inner(
1055 &self,
1056 decryptor: D,
1057 session_ids: Option<BTreeSet<String>>,
1058 ) {
1059 self.decryption_retry_task.decrypt(decryptor, session_ids, self.settings.clone()).await;
1060 }
1061
1062 pub(super) async fn set_sender_profiles_pending(&self) {
1063 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1064 }
1065
1066 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1067 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1068 }
1069
1070 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1071 self.state.write().await.items.for_each(|mut entry| {
1072 let Some(event_item) = entry.as_event() else { return };
1073 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1074 let new_item = entry.with_kind(TimelineItemKind::Event(
1075 event_item.with_sender_profile(profile_state.clone()),
1076 ));
1077 ObservableItemsEntry::replace(&mut entry, new_item);
1078 }
1079 });
1080 }
1081
1082 pub(super) async fn update_missing_sender_profiles(&self) {
1083 trace!("Updating missing sender profiles");
1084
1085 let mut state = self.state.write().await;
1086 let mut entries = state.items.entries();
1087 while let Some(mut entry) = entries.next() {
1088 let Some(event_item) = entry.as_event() else { continue };
1089 let event_id = event_item.event_id().map(debug);
1090 let transaction_id = event_item.transaction_id().map(debug);
1091
1092 if event_item.sender_profile().is_ready() {
1093 trace!(event_id, transaction_id, "Profile already set");
1094 continue;
1095 }
1096
1097 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1098 Some(profile) => {
1099 trace!(event_id, transaction_id, "Adding profile");
1100 let updated_item =
1101 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1102 let new_item = entry.with_kind(updated_item);
1103 ObservableItemsEntry::replace(&mut entry, new_item);
1104 }
1105 None => {
1106 if !event_item.sender_profile().is_unavailable() {
1107 trace!(event_id, transaction_id, "Marking profile unavailable");
1108 let updated_item =
1109 event_item.with_sender_profile(TimelineDetails::Unavailable);
1110 let new_item = entry.with_kind(updated_item);
1111 ObservableItemsEntry::replace(&mut entry, new_item);
1112 } else {
1113 debug!(event_id, transaction_id, "Profile already marked unavailable");
1114 }
1115 }
1116 }
1117 }
1118
1119 trace!("Done updating missing sender profiles");
1120 }
1121
1122 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1124 trace!("Forcing update of sender profiles: {sender_ids:?}");
1125
1126 let mut state = self.state.write().await;
1127 let mut entries = state.items.entries();
1128 while let Some(mut entry) = entries.next() {
1129 let Some(event_item) = entry.as_event() else { continue };
1130 if !sender_ids.contains(event_item.sender()) {
1131 continue;
1132 }
1133
1134 let event_id = event_item.event_id().map(debug);
1135 let transaction_id = event_item.transaction_id().map(debug);
1136
1137 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1138 Some(profile) => {
1139 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1140 {
1141 debug!(event_id, transaction_id, "Profile already up-to-date");
1142 } else {
1143 trace!(event_id, transaction_id, "Updating profile");
1144 let updated_item =
1145 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1146 let new_item = entry.with_kind(updated_item);
1147 ObservableItemsEntry::replace(&mut entry, new_item);
1148 }
1149 }
1150 None => {
1151 if !event_item.sender_profile().is_unavailable() {
1152 trace!(event_id, transaction_id, "Marking profile unavailable");
1153 let updated_item =
1154 event_item.with_sender_profile(TimelineDetails::Unavailable);
1155 let new_item = entry.with_kind(updated_item);
1156 ObservableItemsEntry::replace(&mut entry, new_item);
1157 } else {
1158 debug!(event_id, transaction_id, "Profile already marked unavailable");
1159 }
1160 }
1161 }
1162 }
1163
1164 trace!("Done forcing update of sender profiles");
1165 }
1166
1167 #[cfg(test)]
1168 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1169 let own_user_id = self.room_data_provider.own_user_id();
1170 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1171 }
1172
1173 pub(super) async fn latest_user_read_receipt(
1177 &self,
1178 user_id: &UserId,
1179 ) -> Option<(OwnedEventId, Receipt)> {
1180 self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
1181 }
1182
1183 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1186 &self,
1187 user_id: &UserId,
1188 ) -> Option<OwnedEventId> {
1189 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1190 }
1191
1192 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1194 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1195 }
1196
1197 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1199 match echo.content {
1200 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1201 let content = match serialized_event.deserialize() {
1202 Ok(d) => d,
1203 Err(err) => {
1204 warn!("error deserializing local echo: {err}");
1205 return;
1206 }
1207 };
1208
1209 self.handle_local_event(
1210 echo.transaction_id.clone(),
1211 TimelineEventKind::Message { content, relations: Default::default() },
1212 Some(send_handle),
1213 )
1214 .await;
1215
1216 if let Some(send_error) = send_error {
1217 self.update_event_send_state(
1218 &echo.transaction_id,
1219 EventSendState::SendingFailed {
1220 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)),
1221 is_recoverable: false,
1222 },
1223 )
1224 .await;
1225 }
1226 }
1227
1228 LocalEchoContent::React { key, send_handle, applies_to } => {
1229 self.handle_local_reaction(key, send_handle, applies_to).await;
1230 }
1231 }
1232 }
1233
1234 #[instrument(skip(self, send_handle))]
1236 async fn handle_local_reaction(
1237 &self,
1238 reaction_key: String,
1239 send_handle: SendReactionHandle,
1240 applies_to: OwnedTransactionId,
1241 ) {
1242 let mut state = self.state.write().await;
1243 let mut tr = state.transaction();
1244
1245 let target = TimelineEventItemId::TransactionId(applies_to);
1246
1247 let reaction_txn_id = send_handle.transaction_id().to_owned();
1248 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1249 let aggregation = Aggregation::new(
1250 TimelineEventItemId::TransactionId(reaction_txn_id),
1251 AggregationKind::Reaction {
1252 key: reaction_key.clone(),
1253 sender: self.room_data_provider.own_user_id().to_owned(),
1254 timestamp: MilliSecondsSinceUnixEpoch::now(),
1255 reaction_status,
1256 },
1257 );
1258
1259 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1260 find_item_and_apply_aggregation(&mut tr.items, &target, aggregation);
1261
1262 tr.commit();
1263 }
1264
1265 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1267 match update {
1268 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1269 self.handle_local_echo(echo).await;
1270 }
1271
1272 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1273 if !self.discard_local_echo(&transaction_id).await {
1274 warn!("couldn't find the local echo to discard");
1275 }
1276 }
1277
1278 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1279 let content = match new_content.deserialize() {
1280 Ok(d) => d,
1281 Err(err) => {
1282 warn!("error deserializing local echo (upon edit): {err}");
1283 return;
1284 }
1285 };
1286
1287 if !self.replace_local_echo(&transaction_id, content).await {
1288 warn!("couldn't find the local echo to replace");
1289 }
1290 }
1291
1292 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1293 self.update_event_send_state(
1294 &transaction_id,
1295 EventSendState::SendingFailed { error, is_recoverable },
1296 )
1297 .await;
1298 }
1299
1300 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1301 self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1302 }
1303
1304 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1305 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1306 .await;
1307 }
1308
1309 RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1310 info!(txn_id = %related_to, "some media for a media event has been uploaded");
1312 }
1313 }
1314 }
1315
1316 pub async fn insert_timeline_start_if_missing(&self) {
1319 let mut state = self.state.write().await;
1320 let mut txn = state.transaction();
1321 if txn.items.get(0).is_some_and(|item| item.is_timeline_start()) {
1322 return;
1323 }
1324 txn.items.push_front(txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart), None);
1325 txn.commit();
1326 }
1327}
1328
1329impl TimelineController {
1330 pub(super) fn room(&self) -> &Room {
1331 &self.room_data_provider
1332 }
1333
1334 #[instrument(skip(self))]
1337 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1338 let state = self.state.write().await;
1339 let (index, item) = rfind_event_by_id(&state.items, event_id)
1340 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1341 let remote_item = item
1342 .as_remote()
1343 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1344 .clone();
1345
1346 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1347 debug!("Event is not a message");
1348 return Ok(());
1349 };
1350 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1351 debug!("Event is not a reply");
1352 return Ok(());
1353 };
1354 if let TimelineDetails::Pending = &in_reply_to.event {
1355 debug!("Replied-to event is already being fetched");
1356 return Ok(());
1357 }
1358 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1359 debug!("Replied-to event has already been fetched");
1360 return Ok(());
1361 }
1362
1363 let internal_id = item.internal_id.to_owned();
1364 let item = item.clone();
1365 let event = fetch_replied_to_event(
1366 state,
1367 index,
1368 &item,
1369 internal_id,
1370 &msglike,
1371 &in_reply_to.event_id,
1372 self.room(),
1373 )
1374 .await?;
1375
1376 let mut state = self.state.write().await;
1379 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1380 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1381
1382 let TimelineItemContent::MsgLike(MsgLikeContent {
1385 kind: MsgLikeKind::Message(message),
1386 reactions,
1387 thread_root,
1388 in_reply_to,
1389 }) = item.content().clone()
1390 else {
1391 info!("Event is no longer a message (redacted?)");
1392 return Ok(());
1393 };
1394 let Some(in_reply_to) = in_reply_to else {
1395 warn!("Event no longer has a reply (bug?)");
1396 return Ok(());
1397 };
1398
1399 trace!("Updating in-reply-to details");
1402 let internal_id = item.internal_id.to_owned();
1403 let mut item = item.clone();
1404 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1405 kind: MsgLikeKind::Message(message),
1406 reactions,
1407 thread_root,
1408 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1409 }));
1410 state.items.replace(index, TimelineItem::new(item, internal_id));
1411
1412 Ok(())
1413 }
1414
1415 pub(super) async fn should_send_receipt(
1419 &self,
1420 receipt_type: &SendReceiptType,
1421 thread: &ReceiptThread,
1422 event_id: &EventId,
1423 ) -> bool {
1424 if *thread != ReceiptThread::Unthreaded {
1426 return true;
1427 }
1428
1429 let own_user_id = self.room().own_user_id();
1430 let state = self.state.read().await;
1431 let room = self.room();
1432
1433 match receipt_type {
1434 SendReceiptType::Read => {
1435 if let Some((old_pub_read, _)) = state
1436 .meta
1437 .user_receipt(
1438 own_user_id,
1439 ReceiptType::Read,
1440 room,
1441 state.items.all_remote_events(),
1442 )
1443 .await
1444 {
1445 trace!(%old_pub_read, "found a previous public receipt");
1446 if let Some(relative_pos) = state.meta.compare_events_positions(
1447 &old_pub_read,
1448 event_id,
1449 state.items.all_remote_events(),
1450 ) {
1451 trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1452 return relative_pos == RelativePosition::After;
1453 }
1454 }
1455 }
1456 SendReceiptType::ReadPrivate => {
1459 if let Some((old_priv_read, _)) =
1460 state.latest_user_read_receipt(own_user_id, room).await
1461 {
1462 trace!(%old_priv_read, "found a previous private receipt");
1463 if let Some(relative_pos) = state.meta.compare_events_positions(
1464 &old_priv_read,
1465 event_id,
1466 state.items.all_remote_events(),
1467 ) {
1468 trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1469 return relative_pos == RelativePosition::After;
1470 }
1471 }
1472 }
1473 SendReceiptType::FullyRead => {
1474 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1475 {
1476 if let Some(relative_pos) = state.meta.compare_events_positions(
1477 &prev_event_id,
1478 event_id,
1479 state.items.all_remote_events(),
1480 ) {
1481 return relative_pos == RelativePosition::After;
1482 }
1483 }
1484 }
1485 _ => {}
1486 }
1487
1488 true
1490 }
1491
1492 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1495 let state = self.state.read().await;
1496 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1497 }
1498
1499 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1500 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1501 self.retry_event_decryption_inner(self.room().to_owned(), session_ids).await
1502 }
1503}
1504
1505#[cfg(test)]
1506impl<P: RoomDataProvider> TimelineController<P, (OlmMachine, OwnedRoomId)> {
1507 pub(super) async fn retry_event_decryption_test(
1508 &self,
1509 room_id: &RoomId,
1510 olm_machine: OlmMachine,
1511 session_ids: Option<BTreeSet<String>>,
1512 ) {
1513 self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1514 }
1515}
1516
1517async fn fetch_replied_to_event(
1518 mut state: RwLockWriteGuard<'_, TimelineState>,
1519 index: usize,
1520 item: &EventTimelineItem,
1521 internal_id: TimelineUniqueId,
1522 msglike: &MsgLikeContent,
1523 in_reply_to: &EventId,
1524 room: &Room,
1525) -> Result<TimelineDetails<Box<RepliedToEvent>>, Error> {
1526 if let Some((_, item)) = rfind_event_by_id(&state.items, in_reply_to) {
1527 let details = TimelineDetails::Ready(Box::new(RepliedToEvent::from_timeline_item(&item)));
1528 trace!("Found replied-to event locally");
1529 return Ok(details);
1530 };
1531
1532 trace!("Setting in-reply-to details to pending");
1535 let in_reply_to_details =
1536 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1537
1538 let event_item = item
1539 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1540
1541 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1542 state.items.replace(index, new_timeline_item);
1543
1544 drop(state);
1546
1547 trace!("Fetching replied-to event");
1548 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1549 Ok(timeline_event) => TimelineDetails::Ready(Box::new(
1550 RepliedToEvent::try_from_timeline_event(timeline_event, room).await?,
1551 )),
1552 Err(e) => TimelineDetails::Error(Arc::new(e)),
1553 };
1554 Ok(res)
1555}