1use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
21 send_queue::SendHandle,
22};
23use matrix_sdk_base::crypto::types::events::UtdCause;
24use ruma::{
25 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
26 TransactionId,
27 events::{
28 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
29 AnySyncTimelineEvent, FullStateEventContent, MessageLikeEventContent, MessageLikeEventType,
30 StateEventType, SyncStateEvent,
31 poll::unstable_start::{
32 NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
33 },
34 receipt::Receipt,
35 relation::Replacement,
36 room::message::{
37 Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
38 },
39 },
40 serde::Raw,
41};
42use tracing::{debug, error, field::debug, instrument, trace, warn};
43
44use super::{
45 EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent,
46 MsgLikeKind, OtherState, ReactionStatus, Sticker, ThreadSummary, TimelineDetails, TimelineItem,
47 TimelineItemContent,
48 controller::{
49 Aggregation, AggregationKind, ObservableItemsTransaction, PendingEditKind,
50 TimelineMetadata, TimelineStateTransaction, find_item_and_apply_aggregation,
51 },
52 date_dividers::DateDividerAdjuster,
53 event_item::{
54 AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
55 LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
56 TimelineEventItemId,
57 },
58 traits::RoomDataProvider,
59};
60use crate::{
61 timeline::{controller::aggregations::PendingEdit, event_item::OtherMessageLike},
62 unable_to_decrypt_hook::UtdHookManager,
63};
64
65pub(super) enum Flow {
67 Local {
69 txn_id: OwnedTransactionId,
71
72 send_handle: Option<SendHandle>,
74 },
75
76 Remote {
79 event_id: OwnedEventId,
81 txn_id: Option<OwnedTransactionId>,
84 raw_event: Raw<AnySyncTimelineEvent>,
86 position: TimelineItemPosition,
88 encryption_info: Option<Arc<EncryptionInfo>>,
90 },
91}
92
93impl Flow {
94 pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
96 match self {
97 Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
98 Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
99 }
100 }
101
102 pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
104 as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
105 }
106}
107
108pub(super) struct TimelineEventContext {
109 pub(super) sender: OwnedUserId,
110 pub(super) sender_profile: Option<Profile>,
111 pub(super) timestamp: MilliSecondsSinceUnixEpoch,
113 pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
114 pub(super) is_highlighted: bool,
115 pub(super) flow: Flow,
116
117 pub(super) should_add_new_items: bool,
123}
124
125#[derive(Clone, Debug)]
128pub(super) enum HandleAggregationKind {
129 Reaction { key: String },
131
132 Redaction,
134
135 Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
137
138 PollResponse { answers: Vec<String> },
140
141 PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
143
144 PollEnd,
146}
147
148impl HandleAggregationKind {
149 pub fn debug_string(&self) -> &'static str {
151 match self {
152 HandleAggregationKind::Reaction { .. } => "a reaction",
153 HandleAggregationKind::Redaction => "a redaction",
154 HandleAggregationKind::Edit { .. } => "an edit",
155 HandleAggregationKind::PollResponse { .. } => "a poll response",
156 HandleAggregationKind::PollEdit { .. } => "a poll edit",
157 HandleAggregationKind::PollEnd => "a poll end",
158 }
159 }
160}
161
162#[derive(Clone, Debug)]
164#[allow(clippy::large_enum_variant)]
165pub(super) enum TimelineAction {
166 AddItem {
172 content: TimelineItemContent,
174 },
175
176 HandleAggregation {
182 related_event: OwnedEventId,
184 kind: HandleAggregationKind,
186 },
187}
188
189impl TimelineAction {
190 fn add_item(content: TimelineItemContent) -> Self {
192 Self::AddItem { content }
193 }
194
195 #[allow(clippy::too_many_arguments)]
199 pub async fn from_event<P: RoomDataProvider>(
200 event: AnySyncTimelineEvent,
201 raw_event: &Raw<AnySyncTimelineEvent>,
202 room_data_provider: &P,
203 unable_to_decrypt: Option<(UnableToDecryptInfo, Option<&Arc<UtdHookManager>>)>,
204 in_reply_to: Option<InReplyToDetails>,
205 thread_root: Option<OwnedEventId>,
206 thread_summary: Option<ThreadSummary>,
207 ) -> Option<Self> {
208 let redaction_rules = room_data_provider.room_version_rules().redaction;
209
210 let redacted_message_or_none = |event_type: MessageLikeEventType| {
211 (event_type != MessageLikeEventType::Reaction)
212 .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
213 };
214
215 Some(match event {
216 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
217 if let Some(redacts) = ev.redacts(&redaction_rules).map(ToOwned::to_owned) {
218 Self::HandleAggregation {
219 related_event: redacts,
220 kind: HandleAggregationKind::Redaction,
221 }
222 } else {
223 Self::add_item(redacted_message_or_none(ev.event_type())?)
224 }
225 }
226
227 AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
228 Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
229 if let Some((unable_to_decrypt_info, unable_to_decrypt_hook_manager)) =
231 unable_to_decrypt
232 {
233 let utd_cause = UtdCause::determine(
234 raw_event,
235 room_data_provider.crypto_context_info().await,
236 &unable_to_decrypt_info,
237 );
238
239 if let Some(hook) = unable_to_decrypt_hook_manager {
242 hook.on_utd(
243 ev.event_id(),
244 utd_cause,
245 ev.origin_server_ts(),
246 ev.sender(),
247 )
248 .await;
249 }
250
251 Self::add_item(TimelineItemContent::MsgLike(
252 MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
253 content, utd_cause,
254 )),
255 ))
256 } else {
257 Self::from_content(
262 AnyMessageLikeEventContent::RoomEncrypted(content),
263 in_reply_to,
264 thread_root,
265 thread_summary,
266 )
267 }
268 }
269
270 Some(content) => {
271 Self::from_content(content, in_reply_to, thread_root, thread_summary)
272 }
273
274 None => Self::add_item(redacted_message_or_none(ev.event_type())?),
275 },
276
277 AnySyncTimelineEvent::State(ev) => match ev {
278 AnySyncStateEvent::RoomMember(ev) => match ev {
279 SyncStateEvent::Original(ev) => {
280 Self::add_item(TimelineItemContent::room_member(
281 ev.state_key,
282 FullStateEventContent::Original {
283 content: ev.content,
284 prev_content: ev.unsigned.prev_content,
285 },
286 ev.sender,
287 ))
288 }
289 SyncStateEvent::Redacted(ev) => {
290 Self::add_item(TimelineItemContent::room_member(
291 ev.state_key,
292 FullStateEventContent::Redacted(ev.content),
293 ev.sender,
294 ))
295 }
296 },
297 ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
298 state_key: ev.state_key().to_owned(),
299 content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
300 })),
301 },
302 })
303 }
304
305 pub(super) fn from_content(
314 content: AnyMessageLikeEventContent,
315 in_reply_to: Option<InReplyToDetails>,
316 thread_root: Option<OwnedEventId>,
317 thread_summary: Option<ThreadSummary>,
318 ) -> Self {
319 match content {
320 AnyMessageLikeEventContent::Reaction(c) => {
321 Self::HandleAggregation {
323 related_event: c.relates_to.event_id.clone(),
324 kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
325 }
326 }
327
328 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
329 relates_to: Some(Relation::Replacement(re)),
330 ..
331 }) => Self::HandleAggregation {
332 related_event: re.event_id.clone(),
333 kind: HandleAggregationKind::Edit { replacement: re },
334 },
335
336 AnyMessageLikeEventContent::UnstablePollStart(
337 UnstablePollStartEventContent::Replacement(re),
338 ) => Self::HandleAggregation {
339 related_event: re.relates_to.event_id.clone(),
340 kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
341 },
342
343 AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
344 related_event: c.relates_to.event_id,
345 kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
346 },
347
348 AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
349 related_event: c.relates_to.event_id,
350 kind: HandleAggregationKind::PollEnd,
351 },
352
353 AnyMessageLikeEventContent::CallInvite(_) => {
354 Self::add_item(TimelineItemContent::CallInvite)
355 }
356
357 AnyMessageLikeEventContent::RtcNotification(_) => {
358 Self::add_item(TimelineItemContent::RtcNotification)
359 }
360
361 AnyMessageLikeEventContent::Sticker(content) => {
362 Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
363 kind: MsgLikeKind::Sticker(Sticker { content }),
364 reactions: Default::default(),
365 thread_root,
366 in_reply_to,
367 thread_summary,
368 }))
369 }
370
371 AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
372 c,
373 )) => {
374 let poll_state = PollState::new(c.poll_start, c.text);
375
376 Self::AddItem {
377 content: TimelineItemContent::MsgLike(MsgLikeContent {
378 kind: MsgLikeKind::Poll(poll_state),
379 reactions: Default::default(),
380 thread_root,
381 in_reply_to,
382 thread_summary,
383 }),
384 }
385 }
386
387 AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
388 content: TimelineItemContent::message(
389 msg.msgtype,
390 msg.mentions,
391 Default::default(),
392 thread_root,
393 in_reply_to,
394 thread_summary,
395 ),
396 },
397
398 event => {
399 let other = OtherMessageLike { event_type: event.event_type() };
400
401 Self::AddItem {
402 content: TimelineItemContent::MsgLike(MsgLikeContent {
403 kind: MsgLikeKind::Other(other),
404 reactions: Default::default(),
405 thread_root,
406 in_reply_to,
407 thread_summary,
408 }),
409 }
410 }
411 }
412 }
413
414 pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
415 let error = Arc::new(error);
416 match event {
417 FailedToParseEvent::State { event_type, state_key } => {
418 Self::add_item(TimelineItemContent::FailedToParseState {
419 event_type,
420 state_key,
421 error,
422 })
423 }
424 FailedToParseEvent::MsgLike(event_type) => {
425 Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
426 }
427 }
428 }
429}
430
431#[derive(Debug)]
432pub(super) enum FailedToParseEvent {
433 MsgLike(MessageLikeEventType),
434 State { event_type: StateEventType, state_key: String },
435}
436
437#[derive(Clone, Copy, Debug)]
439pub(super) enum TimelineItemPosition {
440 Start {
443 origin: RemoteEventOrigin,
445 },
446
447 End {
450 origin: RemoteEventOrigin,
452 },
453
454 At {
456 event_index: usize,
458
459 origin: RemoteEventOrigin,
461 },
462
463 UpdateAt {
468 timeline_item_index: usize,
470 },
471}
472
473pub(super) type RemovedItem = bool;
475
476pub(super) struct TimelineEventHandler<'a, 'o> {
483 items: &'a mut ObservableItemsTransaction<'o>,
484 meta: &'a mut TimelineMetadata,
485 ctx: TimelineEventContext,
486}
487
488impl<'a, 'o> TimelineEventHandler<'a, 'o> {
489 pub(super) fn new<P: RoomDataProvider>(
490 state: &'a mut TimelineStateTransaction<'o, P>,
491 ctx: TimelineEventContext,
492 ) -> Self {
493 let TimelineStateTransaction { items, meta, .. } = state;
494 Self { items, meta, ctx }
495 }
496
497 #[instrument(skip_all, fields(txn_id, event_id, position))]
511 pub(super) async fn handle_event(
512 mut self,
513 date_divider_adjuster: &mut DateDividerAdjuster,
514 timeline_action: TimelineAction,
515 ) -> bool {
516 let span = tracing::Span::current();
517
518 date_divider_adjuster.mark_used();
519
520 match &self.ctx.flow {
521 Flow::Local { txn_id, .. } => {
522 span.record("txn_id", debug(txn_id));
523 debug!("Handling local event");
524 }
525
526 Flow::Remote { event_id, txn_id, position, .. } => {
527 span.record("event_id", debug(event_id));
528 span.record("position", debug(position));
529 if let Some(txn_id) = txn_id {
530 span.record("txn_id", debug(txn_id));
531 }
532 trace!("Handling remote event");
533 }
534 }
535
536 let mut added_item = false;
537
538 match timeline_action {
539 TimelineAction::AddItem { content } => {
540 if self.ctx.should_add_new_items {
541 self.add_item(content);
542 added_item = true;
543 }
544 }
545
546 TimelineAction::HandleAggregation { related_event, kind } => match kind {
547 HandleAggregationKind::Reaction { key } => {
548 self.handle_reaction(related_event, key);
549 }
550 HandleAggregationKind::Redaction => {
551 self.handle_redaction(related_event);
552 }
553 HandleAggregationKind::Edit { replacement } => {
554 self.handle_edit(
555 replacement.event_id.clone(),
556 PendingEditKind::RoomMessage(replacement),
557 );
558 }
559 HandleAggregationKind::PollResponse { answers } => {
560 self.handle_poll_response(related_event, answers);
561 }
562 HandleAggregationKind::PollEdit { replacement } => {
563 self.handle_edit(
564 replacement.event_id.clone(),
565 PendingEditKind::Poll(replacement),
566 );
567 }
568 HandleAggregationKind::PollEnd => {
569 self.handle_poll_end(related_event);
570 }
571 },
572 }
573
574 added_item
575 }
576
577 #[instrument(skip(self, edit_kind))]
578 fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
579 let target = TimelineEventItemId::EventId(edited_event_id.clone());
580
581 let encryption_info =
582 as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
583 let aggregation = Aggregation::new(
584 self.ctx.flow.timeline_item_id(),
585 AggregationKind::Edit(PendingEdit {
586 kind: edit_kind,
587 edit_json: self.ctx.flow.raw_event().cloned(),
588 encryption_info,
589 bundled_item_owner: None,
590 }),
591 );
592
593 self.meta.aggregations.add(target.clone(), aggregation.clone());
594
595 if let Some(new_item) = find_item_and_apply_aggregation(
596 &self.meta.aggregations,
597 self.items,
598 &target,
599 aggregation,
600 &self.meta.room_version_rules,
601 ) {
602 Self::maybe_update_responses(
604 self.meta,
605 self.items,
606 &edited_event_id,
607 EmbeddedEvent::from_timeline_item(&new_item),
608 );
609 }
610 }
611
612 #[instrument(skip(self))]
617 fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
618 let target = TimelineEventItemId::EventId(relates_to);
619
620 let reaction_status = match &self.ctx.flow {
622 Flow::Local { send_handle, .. } => {
623 ReactionStatus::LocalToRemote(send_handle.clone())
625 }
626 Flow::Remote { event_id, .. } => {
627 ReactionStatus::RemoteToRemote(event_id.clone())
629 }
630 };
631
632 let aggregation = Aggregation::new(
633 self.ctx.flow.timeline_item_id(),
634 AggregationKind::Reaction {
635 key: reaction_key,
636 sender: self.ctx.sender.clone(),
637 timestamp: self.ctx.timestamp,
638 reaction_status,
639 },
640 );
641
642 self.meta.aggregations.add(target.clone(), aggregation.clone());
643 find_item_and_apply_aggregation(
644 &self.meta.aggregations,
645 self.items,
646 &target,
647 aggregation,
648 &self.meta.room_version_rules,
649 );
650 }
651
652 fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
653 let target = TimelineEventItemId::EventId(poll_event_id);
654 let aggregation = Aggregation::new(
655 self.ctx.flow.timeline_item_id(),
656 AggregationKind::PollResponse {
657 sender: self.ctx.sender.clone(),
658 timestamp: self.ctx.timestamp,
659 answers,
660 },
661 );
662 self.meta.aggregations.add(target.clone(), aggregation.clone());
663 find_item_and_apply_aggregation(
664 &self.meta.aggregations,
665 self.items,
666 &target,
667 aggregation,
668 &self.meta.room_version_rules,
669 );
670 }
671
672 fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
673 let target = TimelineEventItemId::EventId(poll_event_id);
674 let aggregation = Aggregation::new(
675 self.ctx.flow.timeline_item_id(),
676 AggregationKind::PollEnd { end_date: self.ctx.timestamp },
677 );
678 self.meta.aggregations.add(target.clone(), aggregation.clone());
679 find_item_and_apply_aggregation(
680 &self.meta.aggregations,
681 self.items,
682 &target,
683 aggregation,
684 &self.meta.room_version_rules,
685 );
686 }
687
688 #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
694 fn handle_redaction(&mut self, redacted: OwnedEventId) {
695 if self.handle_aggregation_redaction(redacted.clone()) {
700 return;
703 }
704
705 let target = TimelineEventItemId::EventId(redacted.clone());
706 let aggregation =
707 Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
708 self.meta.aggregations.add(target.clone(), aggregation.clone());
709
710 find_item_and_apply_aggregation(
711 &self.meta.aggregations,
712 self.items,
713 &target,
714 aggregation,
715 &self.meta.room_version_rules,
716 );
717
718 let embedded_event = EmbeddedEvent {
721 content: TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
722 sender: self.ctx.sender.clone(),
723 sender_profile: TimelineDetails::from_initial_value(self.ctx.sender_profile.clone()),
724 timestamp: self.ctx.timestamp,
725 identifier: TimelineEventItemId::EventId(redacted.clone()),
726 };
727
728 Self::maybe_update_responses(self.meta, self.items, &redacted, embedded_event);
729 }
730
731 #[instrument(skip_all, fields(redacts = ?aggregation_id))]
736 fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
737 let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
738
739 match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
740 Ok(val) => val,
741 Err(err) => {
743 warn!("error while attempting to remove aggregation: {err}");
744 true
746 }
747 }
748 }
749
750 fn add_item(&mut self, content: TimelineItemContent) {
763 let sender = self.ctx.sender.to_owned();
764 let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
765 let timestamp = self.ctx.timestamp;
766
767 let kind: EventTimelineItemKind = match &self.ctx.flow {
768 Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
769 send_state: EventSendState::NotSentYet { progress: None },
770 transaction_id: txn_id.to_owned(),
771 send_handle: send_handle.clone(),
772 }
773 .into(),
774
775 Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
776 let origin = match *position {
777 TimelineItemPosition::Start { origin }
778 | TimelineItemPosition::End { origin }
779 | TimelineItemPosition::At { origin, .. } => origin,
780
781 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
783 .as_event()
784 .and_then(|ev| Some(ev.as_remote()?.origin))
785 .unwrap_or_else(|| {
786 error!("Tried to update a local event");
787 RemoteEventOrigin::Unknown
788 }),
789 };
790
791 RemoteEventTimelineItem {
792 event_id: event_id.clone(),
793 transaction_id: txn_id.clone(),
794 read_receipts: self.ctx.read_receipts.clone(),
795 is_own: self.ctx.sender == self.meta.own_user_id,
796 is_highlighted: self.ctx.is_highlighted,
797 encryption_info: encryption_info.clone(),
798 original_json: Some(raw_event.clone()),
799 latest_edit_json: None,
800 origin,
801 }
802 .into()
803 }
804 };
805
806 let is_room_encrypted = self.meta.is_room_encrypted;
807
808 let item = EventTimelineItem::new(
809 sender,
810 sender_profile,
811 timestamp,
812 content,
813 kind,
814 is_room_encrypted,
815 );
816
817 let mut cowed = Cow::Owned(item);
819 if let Err(err) = self.meta.aggregations.apply_all(
820 &self.ctx.flow.timeline_item_id(),
821 &mut cowed,
822 self.items,
823 &self.meta.room_version_rules,
824 ) {
825 warn!("discarding aggregations: {err}");
826 }
827 let item = cowed.into_owned();
828
829 match &self.ctx.flow {
830 Flow::Local { .. } => {
831 trace!("Adding new local timeline item");
832
833 let item = self.meta.new_timeline_item(item);
834
835 self.items.push_local(item);
836 }
837
838 Flow::Remote {
839 position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
840 } => {
841 let item = Self::recycle_local_or_create_item(
842 self.items,
843 self.meta,
844 item,
845 event_id,
846 txn_id.as_deref(),
847 );
848
849 trace!("Adding new remote timeline item at the start");
850
851 self.items.push_front(item, Some(0));
852 }
853
854 Flow::Remote {
855 position: TimelineItemPosition::At { event_index, .. },
856 event_id,
857 txn_id,
858 ..
859 } => {
860 let item = Self::recycle_local_or_create_item(
861 self.items,
862 self.meta,
863 item,
864 event_id,
865 txn_id.as_deref(),
866 );
867
868 let all_remote_events = self.items.all_remote_events();
869 let event_index = *event_index;
870
871 let timeline_item_index = all_remote_events
873 .range(0..=event_index)
874 .rev()
875 .find_map(|event_meta| event_meta.timeline_item_index)
876 .map(|timeline_item_index| timeline_item_index + 1);
878
879 let timeline_item_index = timeline_item_index.or_else(|| {
882 all_remote_events
883 .range(event_index + 1..)
884 .find_map(|event_meta| event_meta.timeline_item_index)
885 });
886
887 let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
890 self.items
891 .iter_remotes_region()
892 .rev()
893 .find_map(|(timeline_item_index, timeline_item)| {
894 timeline_item.as_event().map(|_| timeline_item_index + 1)
895 })
896 .unwrap_or_else(|| {
897 self.items.first_remotes_region_index()
900 })
901 });
902
903 trace!(
904 ?event_index,
905 ?timeline_item_index,
906 "Adding new remote timeline at specific event index"
907 );
908
909 self.items.insert(timeline_item_index, item, Some(event_index));
910 }
911
912 Flow::Remote {
913 position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
914 } => {
915 let item = Self::recycle_local_or_create_item(
916 self.items,
917 self.meta,
918 item,
919 event_id,
920 txn_id.as_deref(),
921 );
922
923 let timeline_item_index = self
925 .items
926 .iter_remotes_region()
927 .rev()
928 .find_map(|(timeline_item_index, timeline_item)| {
929 timeline_item.as_event().map(|_| timeline_item_index + 1)
930 })
931 .unwrap_or_else(|| {
932 self.items.first_remotes_region_index()
935 });
936
937 let event_index = self
938 .items
939 .all_remote_events()
940 .last_index()
941 .or_else(|| {
945 error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
946
947 Some(0)
948 });
949
950 if timeline_item_index == self.items.len() {
959 trace!("Adding new remote timeline item at the back");
960 self.items.push_back(item, event_index);
961 } else if timeline_item_index == 0 {
962 trace!("Adding new remote timeline item at the front");
963 self.items.push_front(item, event_index);
964 } else {
965 trace!(
966 timeline_item_index,
967 "Adding new remote timeline item at specific index"
968 );
969 self.items.insert(timeline_item_index, item, event_index);
970 }
971 }
972
973 Flow::Remote {
974 event_id: decrypted_event_id,
975 position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
976 ..
977 } => {
978 trace!("Updating timeline item at position {idx}");
979
980 Self::maybe_update_responses(
982 self.meta,
983 self.items,
984 decrypted_event_id,
985 EmbeddedEvent::from_timeline_item(&item),
986 );
987
988 let internal_id = self.items[*idx].internal_id.clone();
989 self.items.replace(*idx, TimelineItem::new(item, internal_id));
990 }
991 }
992
993 if !self.meta.has_up_to_date_read_marker_item {
995 self.meta.update_read_marker(self.items);
996 }
997 }
998
999 fn recycle_local_or_create_item(
1005 items: &mut ObservableItemsTransaction<'_>,
1006 meta: &mut TimelineMetadata,
1007 mut new_item: EventTimelineItem,
1008 event_id: &EventId,
1009 transaction_id: Option<&TransactionId>,
1010 ) -> Arc<TimelineItem> {
1011 if let Some((local_timeline_item_index, local_timeline_item)) = items
1013 .iter_locals_region()
1015 .rev()
1017 .find_map(|(nth, timeline_item)| {
1018 let event_timeline_item = timeline_item.as_event()?;
1019
1020 if Some(event_id) == event_timeline_item.event_id()
1021 || (transaction_id.is_some()
1022 && transaction_id == event_timeline_item.transaction_id())
1023 {
1024 Some((nth, event_timeline_item))
1026 } else {
1027 None
1030 }
1031 })
1032 {
1033 trace!(
1034 ?event_id,
1035 ?transaction_id,
1036 ?local_timeline_item_index,
1037 "Removing local timeline item"
1038 );
1039
1040 transfer_details(&mut new_item, local_timeline_item);
1041
1042 let recycled = items.remove(local_timeline_item_index);
1044 TimelineItem::new(new_item, recycled.internal_id.clone())
1045 } else {
1046 meta.new_timeline_item(new_item)
1048 }
1049 }
1050
1051 fn maybe_update_responses(
1054 meta: &mut TimelineMetadata,
1055 items: &mut ObservableItemsTransaction<'_>,
1056 target_event_id: &EventId,
1057 new_embedded_event: EmbeddedEvent,
1058 ) {
1059 let Some(replies) = meta.replies.get(target_event_id) else {
1060 trace!("item has no replies");
1061 return;
1062 };
1063
1064 for reply_id in replies {
1065 let Some(timeline_item_index) = items
1066 .get_remote_event_by_event_id(reply_id)
1067 .and_then(|meta| meta.timeline_item_index)
1068 else {
1069 warn!(%reply_id, "event not known as an item in the timeline");
1070 continue;
1071 };
1072
1073 let Some(item) = items.get(timeline_item_index) else {
1074 warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1075 continue;
1076 };
1077
1078 let Some(event_item) = item.as_event() else { continue };
1079 let Some(msglike) = event_item.content.as_msglike() else { continue };
1080 let Some(message) = msglike.as_message() else { continue };
1081 let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1082
1083 trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1084 let in_reply_to = InReplyToDetails {
1085 event_id: in_reply_to.event_id.clone(),
1086 event: TimelineDetails::Ready(Box::new(new_embedded_event.clone())),
1087 };
1088
1089 let new_reply_content = TimelineItemContent::MsgLike(
1090 msglike
1091 .with_in_reply_to(in_reply_to)
1092 .with_kind(MsgLikeKind::Message(message.clone())),
1093 );
1094 let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1095 items.replace(timeline_item_index, new_reply_item);
1096 }
1097 }
1098}
1099
1100fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1108 let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1109 return;
1110 };
1111 let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1112 return;
1113 };
1114
1115 let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1116 let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1117
1118 if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1119 in_reply_to.event = old_in_reply_to.event.clone();
1120 }
1121}