1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::VectorDiff;
19use eyeball_im_util::vector::VectorObserverExt;
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
24use matrix_sdk::{
25 deserialized_responses::{TimelineEvent, TimelineEventKind as SdkTimelineEventKind},
26 event_cache::{
27 paginator::{PaginationResult, Paginator},
28 RoomEventCache,
29 },
30 send_queue::{
31 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
32 },
33 Result, Room,
34};
35use ruma::{
36 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
37 events::{
38 poll::unstable_start::UnstablePollStartEventContent,
39 reaction::ReactionEventContent,
40 receipt::{Receipt, ReceiptThread, ReceiptType},
41 relation::Annotation,
42 room::message::{MessageType, Relation},
43 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
44 AnySyncTimelineEvent, MessageLikeEventType,
45 },
46 serde::Raw,
47 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
48 TransactionId, UserId,
49};
50#[cfg(test)]
51use ruma::{events::receipt::ReceiptEventContent, RoomId};
52use tokio::sync::{RwLock, RwLockWriteGuard};
53use tracing::{
54 debug, error, field, field::debug, info, info_span, instrument, trace, warn, Instrument as _,
55};
56
57pub(super) use self::{
58 metadata::{RelativePosition, TimelineMetadata},
59 observable_items::{
60 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
61 ObservableItemsTransactionEntry,
62 },
63 state::{FullEventMeta, PendingEdit, PendingEditKind, TimelineState},
64 state_transaction::TimelineStateTransaction,
65};
66use super::{
67 algorithms::{rfind_event_by_id, rfind_event_item},
68 event_handler::TimelineEventKind,
69 event_item::{ReactionStatus, RemoteEventOrigin},
70 item::TimelineUniqueId,
71 subscriber::TimelineSubscriber,
72 traits::{Decryptor, RoomDataProvider},
73 DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message,
74 PaginationError, Profile, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus,
75 TimelineItem, TimelineItemContent, TimelineItemKind,
76};
77use crate::{
78 timeline::{
79 algorithms::rfind_event_by_item_id,
80 date_dividers::DateDividerAdjuster,
81 event_item::EventTimelineItemKind,
82 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
83 TimelineEventFilterFn,
84 },
85 unable_to_decrypt_hook::UtdHookManager,
86};
87
88mod aggregations;
89mod metadata;
90mod observable_items;
91mod read_receipts;
92mod state;
93mod state_transaction;
94
95pub(super) use aggregations::*;
96
97#[derive(Debug)]
99enum TimelineFocusData<P: RoomDataProvider> {
100 Live,
102
103 Event {
106 event_id: OwnedEventId,
108 paginator: Paginator<P>,
110 num_context_events: u16,
112 },
113
114 PinnedEvents {
115 loader: PinnedEventsLoader,
116 },
117}
118
119#[derive(Clone, Debug)]
120pub(super) struct TimelineController<P: RoomDataProvider = Room> {
121 state: Arc<RwLock<TimelineState>>,
123
124 focus: Arc<RwLock<TimelineFocusData<P>>>,
126
127 pub(crate) room_data_provider: P,
131
132 pub(super) settings: TimelineSettings,
134}
135
136#[derive(Clone)]
137pub(super) struct TimelineSettings {
138 pub(super) track_read_receipts: bool,
140
141 pub(super) event_filter: Arc<TimelineEventFilterFn>,
144
145 pub(super) add_failed_to_parse: bool,
147
148 pub(super) date_divider_mode: DateDividerMode,
150}
151
152#[cfg(not(tarpaulin_include))]
153impl fmt::Debug for TimelineSettings {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 f.debug_struct("TimelineSettings")
156 .field("track_read_receipts", &self.track_read_receipts)
157 .field("add_failed_to_parse", &self.add_failed_to_parse)
158 .finish_non_exhaustive()
159 }
160}
161
162impl Default for TimelineSettings {
163 fn default() -> Self {
164 Self {
165 track_read_receipts: false,
166 event_filter: Arc::new(default_event_filter),
167 add_failed_to_parse: true,
168 date_divider_mode: DateDividerMode::Daily,
169 }
170 }
171}
172
173#[derive(Debug, Clone, Copy)]
174pub(super) enum TimelineFocusKind {
175 Live,
176 Event,
177 PinnedEvents,
178}
179
180pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
190 match event {
191 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
192 if ev.redacts(room_version).is_some() {
193 false
196 } else {
197 ev.event_type() != MessageLikeEventType::Reaction
200 }
201 }
202
203 AnySyncTimelineEvent::MessageLike(msg) => {
204 match msg.original_content() {
205 None => {
206 msg.event_type() != MessageLikeEventType::Reaction
209 }
210
211 Some(original_content) => {
212 match original_content {
213 AnyMessageLikeEventContent::RoomMessage(content) => {
214 if content
215 .relates_to
216 .as_ref()
217 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
218 {
219 return false;
221 }
222
223 matches!(
224 content.msgtype,
225 MessageType::Audio(_)
226 | MessageType::Emote(_)
227 | MessageType::File(_)
228 | MessageType::Image(_)
229 | MessageType::Location(_)
230 | MessageType::Notice(_)
231 | MessageType::ServerNotice(_)
232 | MessageType::Text(_)
233 | MessageType::Video(_)
234 | MessageType::VerificationRequest(_)
235 )
236 }
237
238 AnyMessageLikeEventContent::Sticker(_)
239 | AnyMessageLikeEventContent::UnstablePollStart(
240 UnstablePollStartEventContent::New(_),
241 )
242 | AnyMessageLikeEventContent::CallInvite(_)
243 | AnyMessageLikeEventContent::CallNotify(_)
244 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
245
246 _ => false,
247 }
248 }
249 }
250 }
251
252 AnySyncTimelineEvent::State(_) => {
253 true
255 }
256 }
257}
258
259impl<P: RoomDataProvider> TimelineController<P> {
260 pub(super) fn new(
261 room_data_provider: P,
262 focus: TimelineFocus,
263 internal_id_prefix: Option<String>,
264 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
265 is_room_encrypted: bool,
266 ) -> Self {
267 let (focus_data, focus_kind) = match focus {
268 TimelineFocus::Live => (TimelineFocusData::Live, TimelineFocusKind::Live),
269
270 TimelineFocus::Event { target, num_context_events } => {
271 let paginator = Paginator::new(room_data_provider.clone());
272 (
273 TimelineFocusData::Event { paginator, event_id: target, num_context_events },
274 TimelineFocusKind::Event,
275 )
276 }
277
278 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
279 TimelineFocusData::PinnedEvents {
280 loader: PinnedEventsLoader::new(
281 Arc::new(room_data_provider.clone()),
282 max_events_to_load as usize,
283 max_concurrent_requests as usize,
284 ),
285 },
286 TimelineFocusKind::PinnedEvents,
287 ),
288 };
289
290 let state = TimelineState::new(
291 focus_kind,
292 room_data_provider.own_user_id().to_owned(),
293 room_data_provider.room_version(),
294 internal_id_prefix,
295 unable_to_decrypt_hook,
296 is_room_encrypted,
297 );
298
299 Self {
300 state: Arc::new(RwLock::new(state)),
301 focus: Arc::new(RwLock::new(focus_data)),
302 room_data_provider,
303 settings: Default::default(),
304 }
305 }
306
307 pub(super) async fn init_focus(
314 &self,
315 room_event_cache: &RoomEventCache,
316 ) -> Result<bool, Error> {
317 let focus_guard = self.focus.read().await;
318
319 match &*focus_guard {
320 TimelineFocusData::Live => {
321 let (events, _stream) = room_event_cache.subscribe().await;
323
324 let has_events = !events.is_empty();
325
326 self.replace_with_initial_remote_events(
327 events.into_iter(),
328 RemoteEventOrigin::Cache,
329 )
330 .await;
331
332 Ok(has_events)
333 }
334
335 TimelineFocusData::Event { event_id, paginator, num_context_events } => {
336 let start_from_result = paginator
338 .start_from(event_id, (*num_context_events).into())
339 .await
340 .map_err(PaginationError::Paginator)?;
341
342 drop(focus_guard);
343
344 let has_events = !start_from_result.events.is_empty();
345
346 self.replace_with_initial_remote_events(
347 start_from_result.events.into_iter(),
348 RemoteEventOrigin::Pagination,
349 )
350 .await;
351
352 Ok(has_events)
353 }
354
355 TimelineFocusData::PinnedEvents { loader } => {
356 let loaded_events = loader.load_events().await.map_err(Error::PinnedEventsError)?;
357
358 drop(focus_guard);
359
360 let has_events = !loaded_events.is_empty();
361
362 self.replace_with_initial_remote_events(
363 loaded_events.into_iter(),
364 RemoteEventOrigin::Pagination,
365 )
366 .await;
367
368 Ok(has_events)
369 }
370 }
371 }
372
373 pub async fn handle_encryption_state_changes(&self) {
378 let mut room_info = self.room_data_provider.room_info();
379
380 let mark_encrypted = || async {
382 let mut state = self.state.write().await;
383 state.meta.is_room_encrypted = true;
384 state.mark_all_events_as_encrypted();
385 };
386
387 if room_info.get().is_encrypted() {
388 mark_encrypted().await;
391 return;
392 }
393
394 while let Some(info) = room_info.next().await {
395 if info.is_encrypted() {
396 mark_encrypted().await;
397 break;
400 }
401 }
402 }
403
404 pub(crate) async fn reload_pinned_events(
405 &self,
406 ) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
407 let focus_guard = self.focus.read().await;
408
409 if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
410 loader.load_events().await
411 } else {
412 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
413 }
414 }
415
416 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
425 let state = self.state.read().await;
426
427 let (count, needs) = state
428 .meta
429 .subscriber_skip_count
430 .compute_next_when_paginating_backwards(num_events.into());
431
432 state.meta.subscriber_skip_count.update(count, &state.timeline_focus);
433
434 needs
435 }
436
437 pub(super) async fn focused_paginate_backwards(
442 &self,
443 num_events: u16,
444 ) -> Result<bool, PaginationError> {
445 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
446 TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
447 return Err(PaginationError::NotEventFocusMode)
448 }
449 TimelineFocusData::Event { paginator, .. } => paginator
450 .paginate_backward(num_events.into())
451 .await
452 .map_err(PaginationError::Paginator)?,
453 };
454
455 self.handle_remote_events_with_diffs(
458 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
459 RemoteEventOrigin::Pagination,
460 )
461 .await;
462
463 Ok(hit_end_of_timeline)
464 }
465
466 pub(super) async fn focused_paginate_forwards(
471 &self,
472 num_events: u16,
473 ) -> Result<bool, PaginationError> {
474 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
475 TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
476 return Err(PaginationError::NotEventFocusMode)
477 }
478 TimelineFocusData::Event { paginator, .. } => paginator
479 .paginate_forward(num_events.into())
480 .await
481 .map_err(PaginationError::Paginator)?,
482 };
483
484 self.handle_remote_events_with_diffs(
487 vec![VectorDiff::Append { values: events.into() }],
488 RemoteEventOrigin::Pagination,
489 )
490 .await;
491
492 Ok(hit_end_of_timeline)
493 }
494
495 pub(super) async fn is_live(&self) -> bool {
497 matches!(&*self.focus.read().await, TimelineFocusData::Live)
498 }
499
500 pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
501 self.settings = settings;
502 self
503 }
504
505 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
509 self.state.read().await.items.clone_items()
510 }
511
512 #[cfg(test)]
513 pub(super) async fn subscribe_raw(
514 &self,
515 ) -> (
516 Vector<Arc<TimelineItem>>,
517 impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
518 ) {
519 let state = self.state.read().await;
520
521 state.items.subscribe().into_values_and_stream()
522 }
523
524 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
525 let state = self.state.read().await;
526
527 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
528 }
529
530 pub(super) async fn subscribe_filter_map<U, F>(
531 &self,
532 f: F,
533 ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
534 where
535 U: Clone,
536 F: Fn(Arc<TimelineItem>) -> Option<U>,
537 {
538 self.state.read().await.items.subscribe().filter_map(f)
539 }
540
541 #[instrument(skip_all)]
545 pub(super) async fn toggle_reaction_local(
546 &self,
547 item_id: &TimelineEventItemId,
548 key: &str,
549 ) -> Result<bool, Error> {
550 let mut state = self.state.write().await;
551
552 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
553 warn!("Timeline item not found, can't add reaction");
554 return Err(Error::FailedToToggleReaction);
555 };
556
557 let user_id = self.room_data_provider.own_user_id();
558 let prev_status = item
559 .content()
560 .reactions()
561 .get(key)
562 .and_then(|group| group.get(user_id))
563 .map(|reaction_info| reaction_info.status.clone());
564
565 let Some(prev_status) = prev_status else {
566 match &item.kind {
567 EventTimelineItemKind::Local(local) => {
568 if let Some(send_handle) = &local.send_handle {
569 if send_handle
570 .react(key.to_owned())
571 .await
572 .map_err(|err| Error::SendQueueError(err.into()))?
573 .is_some()
574 {
575 trace!("adding a reaction to a local echo");
576 return Ok(true);
577 }
578
579 warn!("couldn't toggle reaction for local echo");
580 return Ok(false);
581 }
582
583 warn!("missing send handle for local echo; is this a test?");
584 return Ok(false);
585 }
586
587 EventTimelineItemKind::Remote(remote) => {
588 trace!("adding a reaction to a remote echo");
592 let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
593 self.room_data_provider
594 .send(ReactionEventContent::from(annotation).into())
595 .await?;
596 return Ok(true);
597 }
598 }
599 };
600
601 trace!("removing a previous reaction");
602 match prev_status {
603 ReactionStatus::LocalToLocal(send_reaction_handle) => {
604 if let Some(handle) = send_reaction_handle {
605 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
606 warn!("unexpectedly unable to abort sending of local reaction");
609 }
610 } else {
611 warn!("no send reaction handle (this should only happen in testing contexts)");
612 }
613 }
614
615 ReactionStatus::LocalToRemote(send_handle) => {
616 trace!("aborting send of the previous reaction that was a local echo");
619 if let Some(handle) = send_handle {
620 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
621 warn!("unexpectedly unable to abort sending of local reaction");
624 }
625 } else {
626 warn!("no send handle (this should only happen in testing contexts)");
627 }
628 }
629
630 ReactionStatus::RemoteToRemote(event_id) => {
631 let Some(annotated_event_id) =
633 item.as_remote().map(|event_item| event_item.event_id.clone())
634 else {
635 warn!("remote reaction to remote event, but the associated item isn't remote");
636 return Ok(false);
637 };
638
639 let mut reactions = item.content().reactions().clone();
640 let reaction_info = reactions.remove_reaction(user_id, key);
641
642 if reaction_info.is_some() {
643 let new_item = item.with_reactions(reactions);
644 state.items.replace(item_pos, new_item);
645 } else {
646 warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
647 }
648
649 drop(state);
651
652 trace!("sending redact for a previous reaction");
653 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
654 if let Some(reaction_info) = reaction_info {
655 debug!("sending redact failed, adding the reaction back to the list");
656
657 let mut state = self.state.write().await;
658 if let Some((item_pos, item)) =
659 rfind_event_by_id(&state.items, &annotated_event_id)
660 {
661 let mut reactions = item.content().reactions();
663 reactions
664 .entry(key.to_owned())
665 .or_default()
666 .insert(user_id.to_owned(), reaction_info);
667 let new_item = item.with_reactions(reactions);
668 state.items.replace(item_pos, new_item);
669 } else {
670 warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
671 }
672 }
673
674 return Err(err);
675 }
676 }
677 }
678
679 Ok(false)
680 }
681
682 pub(super) async fn handle_remote_events_with_diffs(
684 &self,
685 diffs: Vec<VectorDiff<TimelineEvent>>,
686 origin: RemoteEventOrigin,
687 ) {
688 if diffs.is_empty() {
689 return;
690 }
691
692 let mut state = self.state.write().await;
693 state
694 .handle_remote_events_with_diffs(
695 diffs,
696 origin,
697 &self.room_data_provider,
698 &self.settings,
699 )
700 .await
701 }
702
703 pub(super) async fn clear(&self) {
704 self.state.write().await.clear();
705 }
706
707 pub(super) async fn replace_with_initial_remote_events<Events>(
715 &self,
716 events: Events,
717 origin: RemoteEventOrigin,
718 ) where
719 Events: IntoIterator + ExactSizeIterator,
720 <Events as IntoIterator>::Item: Into<TimelineEvent>,
721 {
722 let mut state = self.state.write().await;
723
724 let track_read_markers = self.settings.track_read_receipts;
725 if track_read_markers {
726 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
727 state
728 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
729 .await;
730 }
731
732 if !state.items.is_empty() || events.len() > 0 {
738 state
739 .replace_with_remote_events(
740 events,
741 origin,
742 &self.room_data_provider,
743 &self.settings,
744 )
745 .await;
746 }
747
748 if track_read_markers {
749 if let Some(fully_read_event_id) =
750 self.room_data_provider.load_fully_read_marker().await
751 {
752 state.handle_fully_read_marker(fully_read_event_id);
753 }
754 }
755 }
756
757 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
758 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
759 }
760
761 pub(super) async fn handle_ephemeral_events(
762 &self,
763 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
764 ) {
765 let mut state = self.state.write().await;
766 state.handle_ephemeral_events(events, &self.room_data_provider).await;
767 }
768
769 #[instrument(skip_all)]
771 pub(super) async fn handle_local_event(
772 &self,
773 txn_id: OwnedTransactionId,
774 content: TimelineEventKind,
775 send_handle: Option<SendHandle>,
776 ) {
777 let sender = self.room_data_provider.own_user_id().to_owned();
778 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
779
780 let should_add_new_items = self.is_live().await;
782
783 let date_divider_mode = self.settings.date_divider_mode.clone();
784
785 let mut state = self.state.write().await;
786 state
787 .handle_local_event(
788 sender,
789 profile,
790 should_add_new_items,
791 date_divider_mode,
792 txn_id,
793 send_handle,
794 content,
795 )
796 .await;
797 }
798
799 #[instrument(skip(self))]
804 pub(super) async fn update_event_send_state(
805 &self,
806 txn_id: &TransactionId,
807 send_state: EventSendState,
808 ) {
809 let mut state = self.state.write().await;
810 let mut txn = state.transaction();
811
812 let new_event_id: Option<&EventId> =
813 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
814
815 if rfind_event_item(&txn.items, |it| {
818 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
819 })
820 .is_some()
821 {
822 trace!("Remote echo received before send-event response");
824
825 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
826
827 if let Some((idx, _)) = local_echo {
831 warn!("Message echo got duplicated, removing the local one");
832 txn.items.remove(idx);
833
834 let mut adjuster =
836 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
837 adjuster.run(&mut txn.items, &mut txn.meta);
838 }
839
840 txn.commit();
841 return;
842 }
843
844 let result = rfind_event_item(&txn.items, |it| {
846 it.transaction_id() == Some(txn_id)
847 || new_event_id.is_some()
848 && it.event_id() == new_event_id
849 && it.as_local().is_some()
850 });
851
852 let Some((idx, item)) = result else {
853 if let Some(new_event_id) = new_event_id {
858 match txn
859 .meta
860 .aggregations
861 .mark_aggregation_as_sent(txn_id.to_owned(), new_event_id.to_owned())
862 {
863 MarkAggregationSentResult::MarkedSent { update } => {
864 trace!("marked aggregation as sent");
865
866 if let Some((target, aggregation)) = update {
867 if let Some((item_pos, item)) =
868 rfind_event_by_item_id(&txn.items, &target)
869 {
870 let mut content = item.content().clone();
871 match aggregation.apply(&mut content) {
872 ApplyAggregationResult::UpdatedItem => {
873 trace!("reapplied aggregation in the event");
874 let internal_id = item.internal_id.to_owned();
875 let new_item = item.with_content(content);
876 txn.items.replace(
877 item_pos,
878 TimelineItem::new(new_item, internal_id),
879 );
880 txn.commit();
881 }
882 ApplyAggregationResult::LeftItemIntact => {}
883 ApplyAggregationResult::Error(err) => {
884 warn!("when reapplying aggregation just marked as sent: {err}");
885 }
886 }
887 }
888 }
889
890 return;
893 }
894
895 MarkAggregationSentResult::NotFound => {}
896 }
897 }
898
899 warn!("Timeline item not found, can't update send state");
900 return;
901 };
902
903 let Some(local_item) = item.as_local() else {
904 warn!("We looked for a local item, but it transitioned to remote.");
905 return;
906 };
907
908 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
911 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
912 }
913
914 if let Some(new_event_id) = new_event_id {
917 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
918 }
919
920 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
921 txn.items.replace(idx, new_item);
922
923 txn.commit();
924 }
925
926 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
927 let mut state = self.state.write().await;
928
929 if let Some((idx, _)) =
930 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
931 {
932 let mut txn = state.transaction();
933
934 txn.items.remove(idx);
935
936 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
939 adjuster.run(&mut txn.items, &mut txn.meta);
940
941 txn.meta.update_read_marker(&mut txn.items);
942
943 txn.commit();
944
945 debug!("discarded local echo");
946 return true;
947 }
948
949 let state = &mut *state;
952
953 if let Some((target, aggregation)) = state
955 .meta
956 .aggregations
957 .try_remove_aggregation(&TimelineEventItemId::TransactionId(txn_id.to_owned()))
958 {
959 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, target) else {
960 warn!("missing target item for a local aggregation");
961 return false;
962 };
963
964 let mut content = item.content().clone();
965 match aggregation.unapply(&mut content) {
966 ApplyAggregationResult::UpdatedItem => {
967 trace!("removed local reaction to local echo");
968 let internal_id = item.internal_id.clone();
969 let new_item = item.with_content(content);
970 state.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
971 }
972 ApplyAggregationResult::LeftItemIntact => {}
973 ApplyAggregationResult::Error(err) => {
974 warn!("when undoing local aggregation: {err}");
975 }
976 }
977
978 return true;
979 }
980
981 false
982 }
983
984 pub(super) async fn replace_local_echo(
985 &self,
986 txn_id: &TransactionId,
987 content: AnyMessageLikeEventContent,
988 ) -> bool {
989 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
990 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
995 return false;
996 };
997
998 let mut state = self.state.write().await;
999 let mut txn = state.transaction();
1000
1001 let Some((idx, prev_item)) =
1002 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1003 else {
1004 debug!("Can't find local echo to replace");
1005 return false;
1006 };
1007
1008 let ti_kind = {
1011 let Some(prev_local_item) = prev_item.as_local() else {
1012 warn!("We looked for a local item, but it transitioned as remote??");
1013 return false;
1014 };
1015 prev_local_item.with_send_state(EventSendState::NotSentYet)
1016 };
1017
1018 let prev_reactions = prev_item.content().reactions();
1020 let new_item = TimelineItem::new(
1021 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1022 content,
1023 None,
1024 &txn.items,
1025 prev_reactions,
1026 )),
1027 prev_item.internal_id.to_owned(),
1028 );
1029
1030 txn.items.replace(idx, new_item);
1031
1032 txn.commit();
1036
1037 debug!("Replaced local echo");
1038 true
1039 }
1040
1041 #[instrument(skip(self, room), fields(room_id = ?room.room_id()))]
1042 pub(super) async fn retry_event_decryption(
1043 &self,
1044 room: &Room,
1045 session_ids: Option<BTreeSet<String>>,
1046 ) {
1047 self.retry_event_decryption_inner(room.to_owned(), session_ids).await
1048 }
1049
1050 #[cfg(test)]
1051 pub(super) async fn retry_event_decryption_test(
1052 &self,
1053 room_id: &RoomId,
1054 olm_machine: OlmMachine,
1055 session_ids: Option<BTreeSet<String>>,
1056 ) {
1057 self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1058 }
1059
1060 async fn retry_event_decryption_inner(
1061 &self,
1062 decryptor: impl Decryptor,
1063 session_ids: Option<BTreeSet<String>>,
1064 ) {
1065 use super::EncryptedMessage;
1066
1067 let mut state = self.state.clone().write_owned().await;
1068
1069 let should_retry = move |session_id: &str| {
1070 if let Some(session_ids) = &session_ids {
1071 session_ids.contains(session_id)
1072 } else {
1073 true
1074 }
1075 };
1076
1077 let retry_indices: Vec<_> = state
1078 .items
1079 .iter()
1080 .enumerate()
1081 .filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? {
1082 EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
1083 if should_retry(session_id) =>
1084 {
1085 Some(idx)
1086 }
1087 EncryptedMessage::MegolmV1AesSha2 { .. }
1088 | EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
1089 | EncryptedMessage::Unknown => None,
1090 })
1091 .collect();
1092
1093 if retry_indices.is_empty() {
1094 return;
1095 }
1096
1097 debug!("Retrying decryption");
1098
1099 let settings = self.settings.clone();
1100 let room_data_provider = self.room_data_provider.clone();
1101 let push_rules_context = room_data_provider.push_rules_and_context().await;
1102 let unable_to_decrypt_hook = state.meta.unable_to_decrypt_hook.clone();
1103
1104 matrix_sdk::executor::spawn(async move {
1105 let retry_one = |item: Arc<TimelineItem>| {
1106 let decryptor = decryptor.clone();
1107 let should_retry = &should_retry;
1108 let unable_to_decrypt_hook = unable_to_decrypt_hook.clone();
1109 async move {
1110 let event_item = item.as_event()?;
1111
1112 let session_id = match event_item.content().as_unable_to_decrypt()? {
1113 EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
1114 if should_retry(session_id) =>
1115 {
1116 session_id
1117 }
1118 EncryptedMessage::MegolmV1AesSha2 { .. }
1119 | EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
1120 | EncryptedMessage::Unknown => return None,
1121 };
1122
1123 tracing::Span::current().record("session_id", session_id);
1124
1125 let Some(remote_event) = event_item.as_remote() else {
1126 error!("Key for unable-to-decrypt timeline item is not an event ID");
1127 return None;
1128 };
1129
1130 tracing::Span::current().record("event_id", debug(&remote_event.event_id));
1131
1132 let Some(original_json) = &remote_event.original_json else {
1133 error!("UTD item must contain original JSON");
1134 return None;
1135 };
1136
1137 match decryptor.decrypt_event_impl(original_json).await {
1138 Ok(event) => {
1139 if let SdkTimelineEventKind::UnableToDecrypt { utd_info, .. } =
1140 event.kind
1141 {
1142 info!(
1143 "Failed to decrypt event after receiving room key: {:?}",
1144 utd_info.reason
1145 );
1146 None
1147 } else {
1148 if let Some(hook) = unable_to_decrypt_hook {
1150 hook.on_late_decrypt(&remote_event.event_id).await;
1151 }
1152
1153 Some(event)
1154 }
1155 }
1156 Err(e) => {
1157 info!("Failed to decrypt event after receiving room key: {e}");
1158 None
1159 }
1160 }
1161 }
1162 .instrument(info_span!(
1163 "retry_one",
1164 session_id = field::Empty,
1165 event_id = field::Empty
1166 ))
1167 };
1168
1169 state
1170 .retry_event_decryption(
1171 retry_one,
1172 retry_indices,
1173 push_rules_context,
1174 &room_data_provider,
1175 &settings,
1176 )
1177 .await;
1178 });
1179 }
1180
1181 pub(super) async fn set_sender_profiles_pending(&self) {
1182 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1183 }
1184
1185 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1186 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1187 }
1188
1189 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1190 self.state.write().await.items.for_each(|mut entry| {
1191 let Some(event_item) = entry.as_event() else { return };
1192 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1193 let new_item = entry.with_kind(TimelineItemKind::Event(
1194 event_item.with_sender_profile(profile_state.clone()),
1195 ));
1196 ObservableItemsEntry::replace(&mut entry, new_item);
1197 }
1198 });
1199 }
1200
1201 pub(super) async fn update_missing_sender_profiles(&self) {
1202 trace!("Updating missing sender profiles");
1203
1204 let mut state = self.state.write().await;
1205 let mut entries = state.items.entries();
1206 while let Some(mut entry) = entries.next() {
1207 let Some(event_item) = entry.as_event() else { continue };
1208 let event_id = event_item.event_id().map(debug);
1209 let transaction_id = event_item.transaction_id().map(debug);
1210
1211 if event_item.sender_profile().is_ready() {
1212 trace!(event_id, transaction_id, "Profile already set");
1213 continue;
1214 }
1215
1216 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1217 Some(profile) => {
1218 trace!(event_id, transaction_id, "Adding profile");
1219 let updated_item =
1220 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1221 let new_item = entry.with_kind(updated_item);
1222 ObservableItemsEntry::replace(&mut entry, new_item);
1223 }
1224 None => {
1225 if !event_item.sender_profile().is_unavailable() {
1226 trace!(event_id, transaction_id, "Marking profile unavailable");
1227 let updated_item =
1228 event_item.with_sender_profile(TimelineDetails::Unavailable);
1229 let new_item = entry.with_kind(updated_item);
1230 ObservableItemsEntry::replace(&mut entry, new_item);
1231 } else {
1232 debug!(event_id, transaction_id, "Profile already marked unavailable");
1233 }
1234 }
1235 }
1236 }
1237
1238 trace!("Done updating missing sender profiles");
1239 }
1240
1241 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1243 trace!("Forcing update of sender profiles: {sender_ids:?}");
1244
1245 let mut state = self.state.write().await;
1246 let mut entries = state.items.entries();
1247 while let Some(mut entry) = entries.next() {
1248 let Some(event_item) = entry.as_event() else { continue };
1249 if !sender_ids.contains(event_item.sender()) {
1250 continue;
1251 }
1252
1253 let event_id = event_item.event_id().map(debug);
1254 let transaction_id = event_item.transaction_id().map(debug);
1255
1256 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1257 Some(profile) => {
1258 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1259 {
1260 debug!(event_id, transaction_id, "Profile already up-to-date");
1261 } else {
1262 trace!(event_id, transaction_id, "Updating profile");
1263 let updated_item =
1264 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1265 let new_item = entry.with_kind(updated_item);
1266 ObservableItemsEntry::replace(&mut entry, new_item);
1267 }
1268 }
1269 None => {
1270 if !event_item.sender_profile().is_unavailable() {
1271 trace!(event_id, transaction_id, "Marking profile unavailable");
1272 let updated_item =
1273 event_item.with_sender_profile(TimelineDetails::Unavailable);
1274 let new_item = entry.with_kind(updated_item);
1275 ObservableItemsEntry::replace(&mut entry, new_item);
1276 } else {
1277 debug!(event_id, transaction_id, "Profile already marked unavailable");
1278 }
1279 }
1280 }
1281 }
1282
1283 trace!("Done forcing update of sender profiles");
1284 }
1285
1286 #[cfg(test)]
1287 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1288 let own_user_id = self.room_data_provider.own_user_id();
1289 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1290 }
1291
1292 pub(super) async fn latest_user_read_receipt(
1296 &self,
1297 user_id: &UserId,
1298 ) -> Option<(OwnedEventId, Receipt)> {
1299 self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
1300 }
1301
1302 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1305 &self,
1306 user_id: &UserId,
1307 ) -> Option<OwnedEventId> {
1308 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1309 }
1310
1311 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1313 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1314 }
1315
1316 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1318 match echo.content {
1319 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1320 let content = match serialized_event.deserialize() {
1321 Ok(d) => d,
1322 Err(err) => {
1323 warn!("error deserializing local echo: {err}");
1324 return;
1325 }
1326 };
1327
1328 self.handle_local_event(
1329 echo.transaction_id.clone(),
1330 TimelineEventKind::Message { content, relations: Default::default() },
1331 Some(send_handle),
1332 )
1333 .await;
1334
1335 if let Some(send_error) = send_error {
1336 self.update_event_send_state(
1337 &echo.transaction_id,
1338 EventSendState::SendingFailed {
1339 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(send_error)),
1340 is_recoverable: false,
1341 },
1342 )
1343 .await;
1344 }
1345 }
1346
1347 LocalEchoContent::React { key, send_handle, applies_to } => {
1348 self.handle_local_reaction(key, send_handle, applies_to).await;
1349 }
1350 }
1351 }
1352
1353 #[instrument(skip(self, send_handle))]
1355 async fn handle_local_reaction(
1356 &self,
1357 reaction_key: String,
1358 send_handle: SendReactionHandle,
1359 applies_to: OwnedTransactionId,
1360 ) {
1361 let mut state = self.state.write().await;
1362 let mut tr = state.transaction();
1363
1364 let target = TimelineEventItemId::TransactionId(applies_to);
1365
1366 let reaction_txn_id = send_handle.transaction_id().to_owned();
1367 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1368 let aggregation = Aggregation::new(
1369 TimelineEventItemId::TransactionId(reaction_txn_id),
1370 AggregationKind::Reaction {
1371 key: reaction_key.clone(),
1372 sender: self.room_data_provider.own_user_id().to_owned(),
1373 timestamp: MilliSecondsSinceUnixEpoch::now(),
1374 reaction_status,
1375 },
1376 );
1377
1378 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1379 find_item_and_apply_aggregation(&mut tr.items, &target, aggregation);
1380
1381 tr.commit();
1382 }
1383
1384 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1386 match update {
1387 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1388 self.handle_local_echo(echo).await;
1389 }
1390
1391 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1392 if !self.discard_local_echo(&transaction_id).await {
1393 warn!("couldn't find the local echo to discard");
1394 }
1395 }
1396
1397 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1398 let content = match new_content.deserialize() {
1399 Ok(d) => d,
1400 Err(err) => {
1401 warn!("error deserializing local echo (upon edit): {err}");
1402 return;
1403 }
1404 };
1405
1406 if !self.replace_local_echo(&transaction_id, content).await {
1407 warn!("couldn't find the local echo to replace");
1408 }
1409 }
1410
1411 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1412 self.update_event_send_state(
1413 &transaction_id,
1414 EventSendState::SendingFailed { error, is_recoverable },
1415 )
1416 .await;
1417 }
1418
1419 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1420 self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1421 }
1422
1423 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1424 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1425 .await;
1426 }
1427
1428 RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1429 info!(txn_id = %related_to, "some media for a media event has been uploaded");
1431 }
1432 }
1433 }
1434}
1435
1436impl TimelineController {
1437 pub(super) fn room(&self) -> &Room {
1438 &self.room_data_provider
1439 }
1440
1441 #[instrument(skip(self))]
1444 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1445 let state = self.state.write().await;
1446 let (index, item) = rfind_event_by_id(&state.items, event_id)
1447 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1448 let remote_item = item
1449 .as_remote()
1450 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1451 .clone();
1452
1453 let TimelineItemContent::Message(message) = item.content().clone() else {
1454 debug!("Event is not a message");
1455 return Ok(());
1456 };
1457 let Some(in_reply_to) = message.in_reply_to() else {
1458 debug!("Event is not a reply");
1459 return Ok(());
1460 };
1461 if let TimelineDetails::Pending = &in_reply_to.event {
1462 debug!("Replied-to event is already being fetched");
1463 return Ok(());
1464 }
1465 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1466 debug!("Replied-to event has already been fetched");
1467 return Ok(());
1468 }
1469
1470 let internal_id = item.internal_id.to_owned();
1471 let item = item.clone();
1472 let event = fetch_replied_to_event(
1473 state,
1474 index,
1475 &item,
1476 internal_id,
1477 &message,
1478 &in_reply_to.event_id,
1479 self.room(),
1480 )
1481 .await?;
1482
1483 let mut state = self.state.write().await;
1486 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1487 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1488
1489 let TimelineItemContent::Message(message) = item.content().clone() else {
1492 info!("Event is no longer a message (redacted?)");
1493 return Ok(());
1494 };
1495 let Some(in_reply_to) = message.in_reply_to() else {
1496 warn!("Event no longer has a reply (bug?)");
1497 return Ok(());
1498 };
1499
1500 trace!("Updating in-reply-to details");
1503 let internal_id = item.internal_id.to_owned();
1504 let mut item = item.clone();
1505 item.set_content(TimelineItemContent::Message(
1506 message.with_in_reply_to(InReplyToDetails {
1507 event_id: in_reply_to.event_id.clone(),
1508 event,
1509 }),
1510 ));
1511 state.items.replace(index, TimelineItem::new(item, internal_id));
1512
1513 Ok(())
1514 }
1515
1516 pub(super) async fn should_send_receipt(
1520 &self,
1521 receipt_type: &SendReceiptType,
1522 thread: &ReceiptThread,
1523 event_id: &EventId,
1524 ) -> bool {
1525 if *thread != ReceiptThread::Unthreaded {
1527 return true;
1528 }
1529
1530 let own_user_id = self.room().own_user_id();
1531 let state = self.state.read().await;
1532 let room = self.room();
1533
1534 match receipt_type {
1535 SendReceiptType::Read => {
1536 if let Some((old_pub_read, _)) = state
1537 .meta
1538 .user_receipt(
1539 own_user_id,
1540 ReceiptType::Read,
1541 room,
1542 state.items.all_remote_events(),
1543 )
1544 .await
1545 {
1546 trace!(%old_pub_read, "found a previous public receipt");
1547 if let Some(relative_pos) = state.meta.compare_events_positions(
1548 &old_pub_read,
1549 event_id,
1550 state.items.all_remote_events(),
1551 ) {
1552 trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1553 return relative_pos == RelativePosition::After;
1554 }
1555 }
1556 }
1557 SendReceiptType::ReadPrivate => {
1560 if let Some((old_priv_read, _)) =
1561 state.latest_user_read_receipt(own_user_id, room).await
1562 {
1563 trace!(%old_priv_read, "found a previous private receipt");
1564 if let Some(relative_pos) = state.meta.compare_events_positions(
1565 &old_priv_read,
1566 event_id,
1567 state.items.all_remote_events(),
1568 ) {
1569 trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1570 return relative_pos == RelativePosition::After;
1571 }
1572 }
1573 }
1574 SendReceiptType::FullyRead => {
1575 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1576 {
1577 if let Some(relative_pos) = state.meta.compare_events_positions(
1578 &prev_event_id,
1579 event_id,
1580 state.items.all_remote_events(),
1581 ) {
1582 return relative_pos == RelativePosition::After;
1583 }
1584 }
1585 }
1586 _ => {}
1587 }
1588
1589 true
1591 }
1592
1593 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1596 let state = self.state.read().await;
1597 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1598 }
1599}
1600
1601async fn fetch_replied_to_event(
1602 mut state: RwLockWriteGuard<'_, TimelineState>,
1603 index: usize,
1604 item: &EventTimelineItem,
1605 internal_id: TimelineUniqueId,
1606 message: &Message,
1607 in_reply_to: &EventId,
1608 room: &Room,
1609) -> Result<TimelineDetails<Box<RepliedToEvent>>, Error> {
1610 if let Some((_, item)) = rfind_event_by_id(&state.items, in_reply_to) {
1611 let details = TimelineDetails::Ready(Box::new(RepliedToEvent {
1612 content: item.content.clone(),
1613 sender: item.sender().to_owned(),
1614 sender_profile: item.sender_profile().clone(),
1615 }));
1616
1617 trace!("Found replied-to event locally");
1618 return Ok(details);
1619 };
1620
1621 trace!("Setting in-reply-to details to pending");
1624 let reply = message.with_in_reply_to(InReplyToDetails {
1625 event_id: in_reply_to.to_owned(),
1626 event: TimelineDetails::Pending,
1627 });
1628 let event_item = item.with_content(TimelineItemContent::Message(reply));
1629
1630 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1631 state.items.replace(index, new_timeline_item);
1632
1633 drop(state);
1635
1636 trace!("Fetching replied-to event");
1637 let res = match room.event(in_reply_to, None).await {
1638 Ok(timeline_event) => TimelineDetails::Ready(Box::new(
1639 RepliedToEvent::try_from_timeline_event(timeline_event, room).await?,
1640 )),
1641 Err(e) => TimelineDetails::Error(Arc::new(e)),
1642 };
1643 Ok(res)
1644}