1use std::{ops::ControlFlow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 crypto::types::events::UtdCause,
21 deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
22 ring_buffer::RingBuffer,
23 send_queue::SendHandle,
24};
25use ruma::{
26 events::{
27 poll::{
28 unstable_end::UnstablePollEndEventContent,
29 unstable_response::UnstablePollResponseEventContent,
30 unstable_start::{
31 NewUnstablePollStartEventContent, NewUnstablePollStartEventContentWithoutRelation,
32 UnstablePollStartEventContent,
33 },
34 },
35 reaction::ReactionEventContent,
36 receipt::Receipt,
37 relation::Replacement,
38 room::{
39 encrypted::RoomEncryptedEventContent,
40 member::RoomMemberEventContent,
41 message::{Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation},
42 },
43 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
44 AnySyncTimelineEvent, BundledMessageLikeRelations, EventContent, FullStateEventContent,
45 MessageLikeEventType, StateEventType, SyncStateEvent,
46 },
47 serde::Raw,
48 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
49 TransactionId,
50};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53use super::{
54 algorithms::{rfind_event_by_id, rfind_event_by_item_id},
55 controller::{
56 find_item_and_apply_aggregation, Aggregation, AggregationKind, ApplyAggregationResult,
57 ObservableItemsTransaction, PendingEdit, PendingEditKind, TimelineMetadata,
58 TimelineStateTransaction,
59 },
60 date_dividers::DateDividerAdjuster,
61 event_item::{
62 extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content,
63 AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
64 LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
65 TimelineEventItemId,
66 },
67 traits::RoomDataProvider,
68 EventTimelineItem, InReplyToDetails, OtherState, ReactionStatus, RepliedToEvent, Sticker,
69 TimelineDetails, TimelineItem, TimelineItemContent,
70};
71use crate::events::SyncTimelineEventWithoutContent;
72
73pub(super) enum Flow {
75 Local {
77 txn_id: OwnedTransactionId,
79
80 send_handle: Option<SendHandle>,
82 },
83
84 Remote {
87 event_id: OwnedEventId,
89 txn_id: Option<OwnedTransactionId>,
92 raw_event: Raw<AnySyncTimelineEvent>,
94 position: TimelineItemPosition,
96 encryption_info: Option<EncryptionInfo>,
98 },
99}
100
101impl Flow {
102 pub(crate) fn event_id(&self) -> Option<&EventId> {
104 as_variant!(self, Flow::Remote { event_id, .. } => event_id)
105 }
106
107 pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
109 match self {
110 Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
111 Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
112 }
113 }
114
115 pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
117 as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
118 }
119}
120
121pub(super) struct TimelineEventContext {
122 pub(super) sender: OwnedUserId,
123 pub(super) sender_profile: Option<Profile>,
124 pub(super) timestamp: MilliSecondsSinceUnixEpoch,
126 pub(super) is_own_event: bool,
127 pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
128 pub(super) is_highlighted: bool,
129 pub(super) flow: Flow,
130
131 pub(super) should_add_new_items: bool,
137}
138
139#[derive(Clone, Debug)]
140pub(super) enum TimelineEventKind {
141 Message {
143 content: AnyMessageLikeEventContent,
144 relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
145 },
146
147 UnableToDecrypt { content: RoomEncryptedEventContent, utd_cause: UtdCause },
149
150 RedactedMessage { event_type: MessageLikeEventType },
154
155 Redaction { redacts: OwnedEventId },
158
159 RoomMember {
161 user_id: OwnedUserId,
162 content: FullStateEventContent<RoomMemberEventContent>,
163 sender: OwnedUserId,
164 },
165
166 OtherState { state_key: String, content: AnyOtherFullStateEventContent },
168
169 FailedToParseMessageLike { event_type: MessageLikeEventType, error: Arc<serde_json::Error> },
173
174 FailedToParseState {
177 event_type: StateEventType,
178 state_key: String,
179 error: Arc<serde_json::Error>,
180 },
181}
182
183impl TimelineEventKind {
184 pub async fn from_event<P: RoomDataProvider>(
196 event: AnySyncTimelineEvent,
197 raw_event: &Raw<AnySyncTimelineEvent>,
198 room_data_provider: &P,
199 unable_to_decrypt_info: Option<UnableToDecryptInfo>,
200 ) -> Self {
201 let room_version = room_data_provider.room_version();
202 match event {
203 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
204 if let Some(redacts) = ev.redacts(&room_version).map(ToOwned::to_owned) {
205 Self::Redaction { redacts }
206 } else {
207 Self::RedactedMessage { event_type: ev.event_type() }
208 }
209 }
210 AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
211 Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
212 if let Some(unable_to_decrypt_info) = unable_to_decrypt_info {
214 let utd_cause = UtdCause::determine(
215 raw_event,
216 room_data_provider.crypto_context_info().await,
217 &unable_to_decrypt_info,
218 );
219 Self::UnableToDecrypt { content, utd_cause }
220 } else {
221 Self::Message {
227 content: AnyMessageLikeEventContent::RoomEncrypted(content),
228 relations: ev.relations(),
229 }
230 }
231 }
232 Some(content) => Self::Message { content, relations: ev.relations() },
233 None => Self::RedactedMessage { event_type: ev.event_type() },
234 },
235 AnySyncTimelineEvent::State(ev) => match ev {
236 AnySyncStateEvent::RoomMember(ev) => match ev {
237 SyncStateEvent::Original(ev) => Self::RoomMember {
238 user_id: ev.state_key,
239 content: FullStateEventContent::Original {
240 content: ev.content,
241 prev_content: ev.unsigned.prev_content,
242 },
243 sender: ev.sender,
244 },
245 SyncStateEvent::Redacted(ev) => Self::RoomMember {
246 user_id: ev.state_key,
247 content: FullStateEventContent::Redacted(ev.content),
248 sender: ev.sender,
249 },
250 },
251 ev => Self::OtherState {
252 state_key: ev.state_key().to_owned(),
253 content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
254 },
255 },
256 }
257 }
258
259 pub(super) fn failed_to_parse(
260 event: SyncTimelineEventWithoutContent,
261 error: serde_json::Error,
262 ) -> Self {
263 let error = Arc::new(error);
264 match event {
265 SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => {
266 Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
267 }
268 SyncTimelineEventWithoutContent::RedactedMessageLike(ev) => {
269 Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
270 }
271 SyncTimelineEventWithoutContent::OriginalState(ev) => Self::FailedToParseState {
272 event_type: ev.content.event_type,
273 state_key: ev.state_key,
274 error,
275 },
276 SyncTimelineEventWithoutContent::RedactedState(ev) => Self::FailedToParseState {
277 event_type: ev.content.event_type,
278 state_key: ev.state_key,
279 error,
280 },
281 }
282 }
283}
284
285#[derive(Clone, Copy, Debug)]
287pub(super) enum TimelineItemPosition {
288 Start {
291 origin: RemoteEventOrigin,
293 },
294
295 End {
298 origin: RemoteEventOrigin,
300 },
301
302 At {
304 event_index: usize,
306
307 origin: RemoteEventOrigin,
309 },
310
311 UpdateAt {
316 timeline_item_index: usize,
318 },
319}
320
321#[derive(Default)]
324pub(super) struct HandleEventResult {
325 pub(super) item_added: bool,
327
328 pub(super) item_removed: bool,
333
334 pub(super) items_updated: u16,
336}
337
338pub(super) struct TimelineEventHandler<'a, 'o> {
345 items: &'a mut ObservableItemsTransaction<'o>,
346 meta: &'a mut TimelineMetadata,
347 ctx: TimelineEventContext,
348 result: HandleEventResult,
349}
350
351impl<'a, 'o> TimelineEventHandler<'a, 'o> {
352 pub(super) fn new(
353 state: &'a mut TimelineStateTransaction<'o>,
354 ctx: TimelineEventContext,
355 ) -> Self {
356 let TimelineStateTransaction { items, meta, .. } = state;
357 Self { items, meta, ctx, result: HandleEventResult::default() }
358 }
359
360 #[instrument(skip_all, fields(txn_id, event_id, position))]
367 pub(super) async fn handle_event(
368 mut self,
369 date_divider_adjuster: &mut DateDividerAdjuster,
370 event_kind: TimelineEventKind,
371 ) -> HandleEventResult {
372 let span = tracing::Span::current();
373
374 date_divider_adjuster.mark_used();
375
376 match &self.ctx.flow {
377 Flow::Local { txn_id, .. } => {
378 span.record("txn_id", debug(txn_id));
379 debug!("Handling local event");
380 }
381
382 Flow::Remote { event_id, txn_id, position, .. } => {
383 span.record("event_id", debug(event_id));
384 span.record("position", debug(position));
385 if let Some(txn_id) = txn_id {
386 span.record("txn_id", debug(txn_id));
387 }
388 trace!("Handling remote event");
389 }
390 };
391
392 let should_add = self.ctx.should_add_new_items;
393
394 match event_kind {
395 TimelineEventKind::Message { content, relations } => match content {
396 AnyMessageLikeEventContent::Reaction(c) => {
397 self.handle_reaction(c);
398 }
399
400 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
401 relates_to: Some(Relation::Replacement(re)),
402 ..
403 }) => {
404 self.handle_room_message_edit(re);
405 }
406
407 AnyMessageLikeEventContent::RoomMessage(c) => {
408 if should_add {
409 self.handle_room_message(c, relations);
410 }
411 }
412
413 AnyMessageLikeEventContent::Sticker(content) => {
414 if should_add {
415 self.add_item(
416 TimelineItemContent::Sticker(Sticker {
417 content,
418 reactions: Default::default(),
419 }),
420 None,
421 );
422 }
423 }
424
425 AnyMessageLikeEventContent::UnstablePollStart(
426 UnstablePollStartEventContent::Replacement(c),
427 ) => self.handle_poll_edit(c.relates_to),
428
429 AnyMessageLikeEventContent::UnstablePollStart(
430 UnstablePollStartEventContent::New(c),
431 ) => {
432 if should_add {
433 self.handle_poll_start(c, relations)
434 }
435 }
436
437 AnyMessageLikeEventContent::UnstablePollResponse(c) => self.handle_poll_response(c),
438
439 AnyMessageLikeEventContent::UnstablePollEnd(c) => self.handle_poll_end(c),
440
441 AnyMessageLikeEventContent::CallInvite(_) => {
442 if should_add {
443 self.add_item(TimelineItemContent::CallInvite, None);
444 }
445 }
446
447 AnyMessageLikeEventContent::CallNotify(_) => {
448 if should_add {
449 self.add_item(TimelineItemContent::CallNotify, None)
450 }
451 }
452
453 _ => {
455 debug!(
456 "Ignoring message-like event of type `{}`, not supported (yet)",
457 content.event_type()
458 );
459 }
460 },
461
462 TimelineEventKind::UnableToDecrypt { content, utd_cause } => {
463 if should_add {
465 self.add_item(TimelineItemContent::unable_to_decrypt(content, utd_cause), None);
466 }
467
468 if let Some(hook) = self.meta.unable_to_decrypt_hook.as_ref() {
471 if let Some(event_id) = &self.ctx.flow.event_id() {
472 hook.on_utd(event_id, utd_cause, self.ctx.timestamp, &self.ctx.sender)
473 .await;
474 }
475 }
476 }
477
478 TimelineEventKind::RedactedMessage { event_type } => {
479 if event_type != MessageLikeEventType::Reaction && should_add {
480 self.add_item(TimelineItemContent::RedactedMessage, None);
481 }
482 }
483
484 TimelineEventKind::Redaction { redacts } => {
485 self.handle_redaction(redacts);
486 }
487
488 TimelineEventKind::RoomMember { user_id, content, sender } => {
489 if should_add {
490 self.add_item(TimelineItemContent::room_member(user_id, content, sender), None);
491 }
492 }
493
494 TimelineEventKind::OtherState { state_key, content } => {
495 if should_add {
498 self.add_item(
499 TimelineItemContent::OtherState(OtherState { state_key, content }),
500 None,
501 );
502 }
503 }
504
505 TimelineEventKind::FailedToParseMessageLike { event_type, error } => {
506 if should_add {
507 self.add_item(
508 TimelineItemContent::FailedToParseMessageLike { event_type, error },
509 None,
510 );
511 }
512 }
513
514 TimelineEventKind::FailedToParseState { event_type, state_key, error } => {
515 if should_add {
516 self.add_item(
517 TimelineItemContent::FailedToParseState { event_type, state_key, error },
518 None,
519 );
520 }
521 }
522 }
523
524 if !self.result.item_added {
525 trace!("No new item added");
526
527 if let Flow::Remote {
528 position: TimelineItemPosition::UpdateAt { timeline_item_index },
529 ..
530 } = self.ctx.flow
531 {
532 trace!("Removing UTD that was successfully retried");
535 self.items.remove(timeline_item_index);
536
537 self.result.item_removed = true;
538 }
539
540 }
542
543 self.result
544 }
545
546 #[instrument(skip_all)]
548 fn handle_room_message(
549 &mut self,
550 msg: RoomMessageEventContent,
551 relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
552 ) {
553 let pending_edit = self
558 .ctx
559 .flow
560 .event_id()
561 .and_then(|event_id| {
562 Self::maybe_unstash_pending_edit(&mut self.meta.pending_edits, event_id)
563 })
564 .and_then(|edit| match edit.kind {
565 PendingEditKind::RoomMessage(replacement) => {
566 Some((Some(edit.event_json), replacement.new_content))
567 }
568 _ => None,
569 });
570
571 let (edit_json, edit_content) = extract_room_msg_edit_content(relations)
572 .map(|content| {
573 let edit_json = self.ctx.flow.raw_event().and_then(extract_bundled_edit_event_json);
574 (edit_json, content)
575 })
576 .or(pending_edit)
577 .unzip();
578
579 let edit_json = edit_json.flatten();
580
581 if let Some(event_id) = self.ctx.flow.event_id() {
584 let replied_to_event_id =
585 msg.relates_to.as_ref().and_then(|relates_to| match relates_to {
586 Relation::Reply { in_reply_to } => Some(in_reply_to.event_id.clone()),
587 Relation::Thread(thread) => {
588 thread.in_reply_to.as_ref().map(|in_reply_to| in_reply_to.event_id.clone())
589 }
590 _ => None,
591 });
592 if let Some(replied_to_event_id) = replied_to_event_id {
593 self.meta
595 .replies
596 .entry(replied_to_event_id)
597 .or_default()
598 .insert(event_id.to_owned());
599 }
600 }
601
602 self.add_item(
603 TimelineItemContent::message(msg, edit_content, self.items, Default::default()),
604 edit_json,
605 );
606 }
607
608 #[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))]
609 fn handle_room_message_edit(
610 &mut self,
611 replacement: Replacement<RoomMessageEventContentWithoutRelation>,
612 ) {
613 if let Some((item_pos, item)) = rfind_event_by_id(self.items, &replacement.event_id) {
614 let edit_json = self.ctx.flow.raw_event().cloned();
615 if let Some(new_item) = self.apply_msg_edit(&item, replacement.new_content, edit_json) {
616 trace!("Applied edit");
617
618 let internal_id = item.internal_id.to_owned();
619
620 Self::maybe_update_responses(
622 self.meta,
623 self.items,
624 &replacement.event_id,
625 &new_item,
626 );
627
628 self.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
630 self.result.items_updated += 1;
631 }
632 } else if let Flow::Remote { position, raw_event, .. } = &self.ctx.flow {
633 let replaced_event_id = replacement.event_id.clone();
634 let replacement = PendingEdit {
635 kind: PendingEditKind::RoomMessage(replacement),
636 event_json: raw_event.clone(),
637 };
638 self.stash_pending_edit(*position, replaced_event_id, replacement);
639 } else {
640 debug!("Local message edit for a timeline item not found, discarding");
641 }
642 }
643
644 #[instrument(skip(self, replacement))]
646 fn stash_pending_edit(
647 &mut self,
648 position: TimelineItemPosition,
649 replaced_event_id: OwnedEventId,
650 replacement: PendingEdit,
651 ) {
652 match position {
653 TimelineItemPosition::Start { .. }
654 | TimelineItemPosition::At { .. }
655 | TimelineItemPosition::UpdateAt { .. } => {
656 if !self
667 .meta
668 .pending_edits
669 .iter()
670 .any(|edit| edit.edited_event() == replaced_event_id)
671 {
672 self.meta.pending_edits.push(replacement);
673 debug!("Timeline item not found, stashing edit");
674 } else {
675 debug!("Timeline item not found, but there was a previous edit for the event: discarding");
676 }
677 }
678
679 TimelineItemPosition::End { .. } => {
680 let edits = &mut self.meta.pending_edits;
684 let _ = Self::maybe_unstash_pending_edit(edits, &replaced_event_id);
685 edits.push(replacement);
686 debug!("Timeline item not found, stashing edit");
687 }
688 }
689 }
690
691 fn maybe_unstash_pending_edit(
694 edits: &mut RingBuffer<PendingEdit>,
695 event_id: &EventId,
696 ) -> Option<PendingEdit> {
697 let pos = edits.iter().position(|edit| edit.edited_event() == event_id)?;
698 trace!(edited_event = %event_id, "unstashed pending edit");
699 Some(edits.remove(pos).unwrap())
700 }
701
702 fn apply_msg_edit(
707 &self,
708 item: &EventTimelineItem,
709 new_content: RoomMessageEventContentWithoutRelation,
710 edit_json: Option<Raw<AnySyncTimelineEvent>>,
711 ) -> Option<EventTimelineItem> {
712 if self.ctx.sender != item.sender() {
713 info!(
714 original_sender = ?item.sender(), edit_sender = ?self.ctx.sender,
715 "Edit event applies to another user's timeline item, discarding"
716 );
717 return None;
718 }
719
720 let TimelineItemContent::Message(msg) = item.content() else {
721 info!(
722 "Edit of message event applies to {:?}, discarding",
723 item.content().debug_string(),
724 );
725 return None;
726 };
727
728 let mut new_msg = msg.clone();
729 new_msg.apply_edit(new_content);
730
731 let mut new_item =
732 item.with_content_and_latest_edit(TimelineItemContent::Message(new_msg), edit_json);
733
734 if let Flow::Remote { encryption_info, .. } = &self.ctx.flow {
735 new_item = new_item.with_encryption_info(encryption_info.clone());
736 }
737
738 Some(new_item)
739 }
740
741 #[instrument(skip_all, fields(relates_to_event_id = ?c.relates_to.event_id))]
746 fn handle_reaction(&mut self, c: ReactionEventContent) {
747 let target = TimelineEventItemId::EventId(c.relates_to.event_id);
748
749 let reaction_status = match &self.ctx.flow {
751 Flow::Local { send_handle, .. } => {
752 ReactionStatus::LocalToRemote(send_handle.clone())
754 }
755 Flow::Remote { event_id, .. } => {
756 ReactionStatus::RemoteToRemote(event_id.clone())
758 }
759 };
760 let aggregation = Aggregation::new(
761 self.ctx.flow.timeline_item_id(),
762 AggregationKind::Reaction {
763 key: c.relates_to.key,
764 sender: self.ctx.sender.clone(),
765 timestamp: self.ctx.timestamp,
766 reaction_status,
767 },
768 );
769
770 self.meta.aggregations.add(target.clone(), aggregation.clone());
771 if find_item_and_apply_aggregation(self.items, &target, aggregation) {
772 self.result.items_updated += 1;
773 }
774 }
775
776 #[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))]
777 fn handle_poll_edit(
778 &mut self,
779 replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation>,
780 ) {
781 let Some((item_pos, item)) = rfind_event_by_id(self.items, &replacement.event_id) else {
782 if let Flow::Remote { position, raw_event, .. } = &self.ctx.flow {
783 let replaced_event_id = replacement.event_id.clone();
784 let replacement = PendingEdit {
785 kind: PendingEditKind::Poll(replacement),
786 event_json: raw_event.clone(),
787 };
788 self.stash_pending_edit(*position, replaced_event_id, replacement);
789 } else {
790 debug!("Local poll edit for a timeline item not found, discarding");
791 }
792 return;
793 };
794
795 let edit_json = self.ctx.flow.raw_event().cloned();
796
797 let Some(new_item) = self.apply_poll_edit(item.inner, replacement, edit_json) else {
798 return;
799 };
800
801 trace!("Applying poll start edit.");
802 self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned()));
803 self.result.items_updated += 1;
804 }
805
806 fn apply_poll_edit(
807 &self,
808 item: &EventTimelineItem,
809 replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation>,
810 edit_json: Option<Raw<AnySyncTimelineEvent>>,
811 ) -> Option<EventTimelineItem> {
812 if self.ctx.sender != item.sender() {
813 info!(
814 original_sender = ?item.sender(), edit_sender = ?self.ctx.sender,
815 "Edit event applies to another user's timeline item, discarding"
816 );
817 return None;
818 }
819
820 let TimelineItemContent::Poll(poll_state) = &item.content() else {
821 info!("Edit of poll event applies to {}, discarding", item.content().debug_string(),);
822 return None;
823 };
824
825 let new_content = match poll_state.edit(replacement.new_content) {
826 Some(edited_poll_state) => TimelineItemContent::Poll(edited_poll_state),
827 None => {
828 info!("Not applying edit to a poll that's already ended");
829 return None;
830 }
831 };
832
833 Some(item.with_content_and_latest_edit(new_content, edit_json))
834 }
835
836 fn handle_poll_start(
838 &mut self,
839 c: NewUnstablePollStartEventContent,
840 relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
841 ) {
842 let pending_edit = self
847 .ctx
848 .flow
849 .event_id()
850 .and_then(|event_id| {
851 Self::maybe_unstash_pending_edit(&mut self.meta.pending_edits, event_id)
852 })
853 .and_then(|edit| match edit.kind {
854 PendingEditKind::Poll(replacement) => {
855 Some((Some(edit.event_json), replacement.new_content))
856 }
857 _ => None,
858 });
859
860 let (edit_json, edit_content) = extract_poll_edit_content(relations)
861 .map(|content| {
862 let edit_json = self.ctx.flow.raw_event().and_then(extract_bundled_edit_event_json);
863 (edit_json, content)
864 })
865 .or(pending_edit)
866 .unzip();
867
868 let poll_state = PollState::new(c, edit_content, Default::default());
869
870 let edit_json = edit_json.flatten();
871
872 self.add_item(TimelineItemContent::Poll(poll_state), edit_json);
873 }
874
875 fn handle_poll_response(&mut self, c: UnstablePollResponseEventContent) {
876 let target = TimelineEventItemId::EventId(c.relates_to.event_id);
877 let aggregation = Aggregation::new(
878 self.ctx.flow.timeline_item_id(),
879 AggregationKind::PollResponse {
880 sender: self.ctx.sender.clone(),
881 timestamp: self.ctx.timestamp,
882 answers: c.poll_response.answers,
883 },
884 );
885 self.meta.aggregations.add(target.clone(), aggregation.clone());
886 if find_item_and_apply_aggregation(self.items, &target, aggregation) {
887 self.result.items_updated += 1;
888 }
889 }
890
891 fn handle_poll_end(&mut self, c: UnstablePollEndEventContent) {
892 let target = TimelineEventItemId::EventId(c.relates_to.event_id);
893 let aggregation = Aggregation::new(
894 self.ctx.flow.timeline_item_id(),
895 AggregationKind::PollEnd { end_date: self.ctx.timestamp },
896 );
897 self.meta.aggregations.add(target.clone(), aggregation.clone());
898 if find_item_and_apply_aggregation(self.items, &target, aggregation) {
899 self.result.items_updated += 1;
900 }
901 }
902
903 #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
909 fn handle_redaction(&mut self, redacted: OwnedEventId) {
910 if self.handle_aggregation_redaction(redacted.clone()) {
915 return;
918 }
919
920 if let Some((idx, item)) = rfind_event_by_id(self.items, &redacted) {
922 if item.as_remote().is_some() {
923 if let TimelineItemContent::RedactedMessage = &item.content {
924 debug!("event item is already redacted");
925 } else {
926 let new_item = item.redact(&self.meta.room_version);
927 let internal_id = item.internal_id.to_owned();
928
929 Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
932
933 self.items.replace(idx, TimelineItem::new(new_item, internal_id));
934 self.result.items_updated += 1;
935 }
936 } else {
937 error!("inconsistent state: redaction received on a non-remote event item");
938 }
939 } else {
940 debug!("Timeline item not found, discarding redaction");
941 };
942 }
943
944 #[instrument(skip_all, fields(redacts = ?aggregation_id))]
949 fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
950 let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
951
952 let Some((target, aggregation)) =
953 self.meta.aggregations.try_remove_aggregation(&aggregation_id)
954 else {
955 return false;
957 };
958
959 if let Some((item_pos, item)) = rfind_event_by_item_id(self.items, target) {
960 let mut content = item.content().clone();
961 match aggregation.unapply(&mut content) {
962 ApplyAggregationResult::UpdatedItem => {
963 trace!("removed aggregation");
964 let internal_id = item.internal_id.to_owned();
965 let new_item = item.with_content(content);
966 self.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
967 self.result.items_updated += 1;
968 }
969 ApplyAggregationResult::LeftItemIntact => {}
970 ApplyAggregationResult::Error(err) => {
971 warn!("error when unapplying aggregation: {err}");
972 }
973 }
974 } else {
975 info!("missing related-to item ({target:?}) for aggregation {aggregation_id:?}");
976 }
977
978 true
980 }
981
982 fn add_item(
995 &mut self,
996 mut content: TimelineItemContent,
997 edit_json: Option<Raw<AnySyncTimelineEvent>>,
998 ) {
999 self.result.item_added = true;
1000
1001 if let Err(err) =
1003 self.meta.aggregations.apply(&self.ctx.flow.timeline_item_id(), &mut content)
1004 {
1005 warn!("discarding aggregations: {err}");
1006 }
1007
1008 let sender = self.ctx.sender.to_owned();
1009 let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
1010 let timestamp = self.ctx.timestamp;
1011
1012 let kind: EventTimelineItemKind = match &self.ctx.flow {
1013 Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
1014 send_state: EventSendState::NotSentYet,
1015 transaction_id: txn_id.to_owned(),
1016 send_handle: send_handle.clone(),
1017 }
1018 .into(),
1019
1020 Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
1021 let origin = match *position {
1022 TimelineItemPosition::Start { origin }
1023 | TimelineItemPosition::End { origin }
1024 | TimelineItemPosition::At { origin, .. } => origin,
1025
1026 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
1028 .as_event()
1029 .and_then(|ev| Some(ev.as_remote()?.origin))
1030 .unwrap_or_else(|| {
1031 error!("Decryption retried on a local event");
1032 RemoteEventOrigin::Unknown
1033 }),
1034 };
1035
1036 RemoteEventTimelineItem {
1037 event_id: event_id.clone(),
1038 transaction_id: txn_id.clone(),
1039 read_receipts: self.ctx.read_receipts.clone(),
1040 is_own: self.ctx.is_own_event,
1041 is_highlighted: self.ctx.is_highlighted,
1042 encryption_info: encryption_info.clone(),
1043 original_json: Some(raw_event.clone()),
1044 latest_edit_json: edit_json,
1045 origin,
1046 }
1047 .into()
1048 }
1049 };
1050
1051 let is_room_encrypted = self.meta.is_room_encrypted;
1052
1053 let mut item = EventTimelineItem::new(
1054 sender,
1055 sender_profile,
1056 timestamp,
1057 content,
1058 kind,
1059 is_room_encrypted,
1060 );
1061
1062 match &self.ctx.flow {
1063 Flow::Local { .. } => {
1064 trace!("Adding new local timeline item");
1065
1066 let item = self.meta.new_timeline_item(item);
1067
1068 self.items.push_back(item, None);
1069 }
1070
1071 Flow::Remote {
1072 position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
1073 } => {
1074 let removed_duplicated_timeline_item = Self::deduplicate_local_timeline_item(
1075 self.items,
1076 &mut item,
1077 Some(event_id),
1078 txn_id.as_ref().map(AsRef::as_ref),
1079 );
1080 let item = new_timeline_item(self.meta, item, removed_duplicated_timeline_item);
1081
1082 trace!("Adding new remote timeline item at the start");
1083
1084 self.items.push_front(item, Some(0));
1085 }
1086
1087 Flow::Remote {
1088 position: TimelineItemPosition::At { event_index, .. },
1089 event_id,
1090 txn_id,
1091 ..
1092 } => {
1093 let removed_duplicated_timeline_item = Self::deduplicate_local_timeline_item(
1094 self.items,
1095 &mut item,
1096 Some(event_id),
1097 txn_id.as_ref().map(AsRef::as_ref),
1098 );
1099 let item = new_timeline_item(self.meta, item, removed_duplicated_timeline_item);
1100
1101 let all_remote_events = self.items.all_remote_events();
1102 let event_index = *event_index;
1103
1104 let timeline_item_index = all_remote_events
1106 .range(0..=event_index)
1107 .rev()
1108 .find_map(|event_meta| event_meta.timeline_item_index)
1109 .map(|timeline_item_index| timeline_item_index + 1);
1111
1112 let timeline_item_index = timeline_item_index.or_else(|| {
1115 all_remote_events
1116 .range(event_index + 1..)
1117 .find_map(|event_meta| event_meta.timeline_item_index)
1118 });
1119
1120 let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
1123 self.items
1124 .iter()
1125 .enumerate()
1126 .rev()
1127 .find_map(|(timeline_item_index, timeline_item)| {
1128 (!timeline_item.as_event()?.is_local_echo())
1129 .then_some(timeline_item_index + 1)
1130 })
1131 .unwrap_or(0)
1132 });
1133
1134 trace!(
1135 ?event_index,
1136 ?timeline_item_index,
1137 "Adding new remote timeline at specific event index"
1138 );
1139
1140 self.items.insert(timeline_item_index, item, Some(event_index));
1141 }
1142
1143 Flow::Remote {
1144 position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
1145 } => {
1146 let removed_duplicated_timeline_item = Self::deduplicate_local_timeline_item(
1147 self.items,
1148 &mut item,
1149 Some(event_id),
1150 txn_id.as_ref().map(AsRef::as_ref),
1151 );
1152 let item = new_timeline_item(self.meta, item, removed_duplicated_timeline_item);
1153
1154 let timeline_item_index = self
1157 .items
1158 .iter()
1159 .enumerate()
1160 .rev()
1161 .find_map(|(timeline_item_index, timeline_item)| {
1162 (!timeline_item.as_event()?.is_local_echo())
1163 .then_some(timeline_item_index + 1)
1164 })
1165 .unwrap_or(0);
1166
1167 let event_index = self
1168 .items
1169 .all_remote_events()
1170 .last_index()
1171 .or_else(|| {
1175 error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
1176
1177 Some(0)
1178 });
1179
1180 if timeline_item_index == self.items.len() {
1189 trace!("Adding new remote timeline item at the back");
1190 self.items.push_back(item, event_index);
1191 } else if timeline_item_index == 0 {
1192 trace!("Adding new remote timeline item at the front");
1193 self.items.push_front(item, event_index);
1194 } else {
1195 trace!(
1196 timeline_item_index,
1197 "Adding new remote timeline item at specific index"
1198 );
1199 self.items.insert(timeline_item_index, item, event_index);
1200 }
1201 }
1202
1203 Flow::Remote {
1204 event_id: decrypted_event_id,
1205 position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
1206 ..
1207 } => {
1208 trace!("Updating timeline item at position {idx}");
1209
1210 Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
1212
1213 let internal_id = self.items[*idx].internal_id.clone();
1214 self.items.replace(*idx, TimelineItem::new(item, internal_id));
1215 }
1216 }
1217
1218 if !self.meta.has_up_to_date_read_marker_item {
1220 self.meta.update_read_marker(self.items);
1221 }
1222 }
1223
1224 fn deduplicate_local_timeline_item(
1229 items: &mut ObservableItemsTransaction<'_>,
1230 new_event_timeline_item: &mut EventTimelineItem,
1231 event_id: Option<&EventId>,
1232 transaction_id: Option<&TransactionId>,
1233 ) -> Option<Arc<TimelineItem>> {
1234 if let Some((local_timeline_item_index, local_timeline_item)) = items
1236 .iter()
1237 .enumerate()
1239 .rev()
1241 .try_fold((), |(), (nth, timeline_item)| {
1252 let Some(event_timeline_item) = timeline_item.as_event() else {
1253 return ControlFlow::Break(None);
1255 };
1256
1257 if !event_timeline_item.is_local_echo() {
1259 return ControlFlow::Break(None);
1260 }
1261
1262 if event_id == event_timeline_item.event_id()
1263 || (transaction_id.is_some()
1264 && transaction_id == event_timeline_item.transaction_id())
1265 {
1266 ControlFlow::Break(Some((nth, event_timeline_item)))
1268 } else {
1269 ControlFlow::Continue(())
1272 }
1273 })
1274 .break_value()
1275 .flatten()
1276 {
1277 trace!(
1278 ?event_id,
1279 ?transaction_id,
1280 ?local_timeline_item_index,
1281 "Removing local timeline item"
1282 );
1283
1284 transfer_details(new_event_timeline_item, local_timeline_item);
1285
1286 return Some(items.remove(local_timeline_item_index));
1288 };
1289
1290 None
1291 }
1292
1293 fn maybe_update_responses(
1296 meta: &mut TimelineMetadata,
1297 items: &mut ObservableItemsTransaction<'_>,
1298 target_event_id: &EventId,
1299 new_item: &EventTimelineItem,
1300 ) {
1301 let Some(replies) = meta.replies.get(target_event_id) else {
1302 trace!("item has no replies");
1303 return;
1304 };
1305
1306 for reply_id in replies {
1307 let Some(timeline_item_index) = items
1308 .get_remote_event_by_event_id(reply_id)
1309 .and_then(|meta| meta.timeline_item_index)
1310 else {
1311 warn!(%reply_id, "event not known as an item in the timeline");
1312 continue;
1313 };
1314
1315 let Some(item) = items.get(timeline_item_index) else {
1316 warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1317 continue;
1318 };
1319
1320 let Some(event_item) = item.as_event() else { continue };
1321 let Some(message) = event_item.content.as_message() else { continue };
1322 let Some(in_reply_to) = message.in_reply_to() else { continue };
1323
1324 trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1325 let in_reply_to = InReplyToDetails {
1326 event_id: in_reply_to.event_id.clone(),
1327 event: TimelineDetails::Ready(Box::new(RepliedToEvent::from_timeline_item(
1328 new_item,
1329 ))),
1330 };
1331
1332 let new_reply_content =
1333 TimelineItemContent::Message(message.with_in_reply_to(in_reply_to));
1334 let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1335 items.replace(timeline_item_index, new_reply_item);
1336 }
1337 }
1338}
1339
1340fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1348 let TimelineItemContent::Message(msg) = &mut new_item.content else { return };
1349 let TimelineItemContent::Message(old_msg) = &old_item.content else { return };
1350
1351 let Some(in_reply_to) = &mut msg.in_reply_to else { return };
1352 let Some(old_in_reply_to) = &old_msg.in_reply_to else { return };
1353
1354 if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1355 in_reply_to.event = old_in_reply_to.event.clone();
1356 }
1357}
1358
1359fn new_timeline_item(
1365 metadata: &mut TimelineMetadata,
1366 event_timeline_item: EventTimelineItem,
1367 replaced_timeline_item: Option<Arc<TimelineItem>>,
1368) -> Arc<TimelineItem> {
1369 match replaced_timeline_item {
1370 Some(to_replace_timeline_item) => {
1372 TimelineItem::new(event_timeline_item, to_replace_timeline_item.internal_id.clone())
1373 }
1374
1375 None => metadata.new_timeline_item(event_timeline_item),
1376 }
1377}