1use std::{ops::ControlFlow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20 crypto::types::events::UtdCause,
21 deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
22 ring_buffer::RingBuffer,
23 send_queue::SendHandle,
24};
25use ruma::{
26 events::{
27 poll::{
28 unstable_end::UnstablePollEndEventContent,
29 unstable_response::UnstablePollResponseEventContent,
30 unstable_start::{
31 NewUnstablePollStartEventContent, NewUnstablePollStartEventContentWithoutRelation,
32 UnstablePollStartEventContent,
33 },
34 },
35 reaction::ReactionEventContent,
36 receipt::Receipt,
37 relation::Replacement,
38 room::{
39 encrypted::RoomEncryptedEventContent,
40 member::RoomMemberEventContent,
41 message::{Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation},
42 },
43 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
44 AnySyncTimelineEvent, BundledMessageLikeRelations, EventContent, FullStateEventContent,
45 MessageLikeEventType, StateEventType, SyncStateEvent,
46 },
47 serde::Raw,
48 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
49 TransactionId,
50};
51use tracing::{debug, error, field::debug, info, instrument, trace, warn};
52
53use super::{
54 algorithms::{rfind_event_by_id, rfind_event_by_item_id},
55 controller::{
56 find_item_and_apply_aggregation, Aggregation, AggregationKind, ApplyAggregationResult,
57 ObservableItemsTransaction, PendingEdit, PendingEditKind, TimelineMetadata,
58 TimelineStateTransaction,
59 },
60 date_dividers::DateDividerAdjuster,
61 event_item::{
62 extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content,
63 AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
64 LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
65 TimelineEventItemId,
66 },
67 traits::RoomDataProvider,
68 EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent, MsgLikeKind, OtherState,
69 ReactionStatus, RepliedToEvent, Sticker, TimelineDetails, TimelineItem, TimelineItemContent,
70};
71use crate::events::SyncTimelineEventWithoutContent;
72
73pub(super) enum Flow {
75 Local {
77 txn_id: OwnedTransactionId,
79
80 send_handle: Option<SendHandle>,
82 },
83
84 Remote {
87 event_id: OwnedEventId,
89 txn_id: Option<OwnedTransactionId>,
92 raw_event: Raw<AnySyncTimelineEvent>,
94 position: TimelineItemPosition,
96 encryption_info: Option<EncryptionInfo>,
98 },
99}
100
101impl Flow {
102 pub(crate) fn event_id(&self) -> Option<&EventId> {
104 as_variant!(self, Flow::Remote { event_id, .. } => event_id)
105 }
106
107 pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
109 match self {
110 Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
111 Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
112 }
113 }
114
115 pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
117 as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
118 }
119}
120
121pub(super) struct TimelineEventContext {
122 pub(super) sender: OwnedUserId,
123 pub(super) sender_profile: Option<Profile>,
124 pub(super) timestamp: MilliSecondsSinceUnixEpoch,
126 pub(super) is_own_event: bool,
127 pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
128 pub(super) is_highlighted: bool,
129 pub(super) flow: Flow,
130
131 pub(super) should_add_new_items: bool,
137}
138
139#[derive(Clone, Debug)]
140pub(super) enum TimelineEventKind {
141 Message {
143 content: AnyMessageLikeEventContent,
144 relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
145 },
146
147 UnableToDecrypt { content: RoomEncryptedEventContent, utd_cause: UtdCause },
149
150 RedactedMessage { event_type: MessageLikeEventType },
154
155 Redaction { redacts: OwnedEventId },
158
159 RoomMember {
161 user_id: OwnedUserId,
162 content: FullStateEventContent<RoomMemberEventContent>,
163 sender: OwnedUserId,
164 },
165
166 OtherState { state_key: String, content: AnyOtherFullStateEventContent },
168
169 FailedToParseMessageLike { event_type: MessageLikeEventType, error: Arc<serde_json::Error> },
173
174 FailedToParseState {
177 event_type: StateEventType,
178 state_key: String,
179 error: Arc<serde_json::Error>,
180 },
181}
182
183impl TimelineEventKind {
184 pub async fn from_event<P: RoomDataProvider>(
196 event: AnySyncTimelineEvent,
197 raw_event: &Raw<AnySyncTimelineEvent>,
198 room_data_provider: &P,
199 unable_to_decrypt_info: Option<UnableToDecryptInfo>,
200 ) -> Self {
201 let room_version = room_data_provider.room_version();
202 match event {
203 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
204 if let Some(redacts) = ev.redacts(&room_version).map(ToOwned::to_owned) {
205 Self::Redaction { redacts }
206 } else {
207 Self::RedactedMessage { event_type: ev.event_type() }
208 }
209 }
210 AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
211 Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
212 if let Some(unable_to_decrypt_info) = unable_to_decrypt_info {
214 let utd_cause = UtdCause::determine(
215 raw_event,
216 room_data_provider.crypto_context_info().await,
217 &unable_to_decrypt_info,
218 );
219 Self::UnableToDecrypt { content, utd_cause }
220 } else {
221 Self::Message {
227 content: AnyMessageLikeEventContent::RoomEncrypted(content),
228 relations: ev.relations(),
229 }
230 }
231 }
232 Some(content) => Self::Message { content, relations: ev.relations() },
233 None => Self::RedactedMessage { event_type: ev.event_type() },
234 },
235 AnySyncTimelineEvent::State(ev) => match ev {
236 AnySyncStateEvent::RoomMember(ev) => match ev {
237 SyncStateEvent::Original(ev) => Self::RoomMember {
238 user_id: ev.state_key,
239 content: FullStateEventContent::Original {
240 content: ev.content,
241 prev_content: ev.unsigned.prev_content,
242 },
243 sender: ev.sender,
244 },
245 SyncStateEvent::Redacted(ev) => Self::RoomMember {
246 user_id: ev.state_key,
247 content: FullStateEventContent::Redacted(ev.content),
248 sender: ev.sender,
249 },
250 },
251 ev => Self::OtherState {
252 state_key: ev.state_key().to_owned(),
253 content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
254 },
255 },
256 }
257 }
258
259 pub(super) fn failed_to_parse(
260 event: SyncTimelineEventWithoutContent,
261 error: serde_json::Error,
262 ) -> Self {
263 let error = Arc::new(error);
264 match event {
265 SyncTimelineEventWithoutContent::OriginalMessageLike(ev) => {
266 Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
267 }
268 SyncTimelineEventWithoutContent::RedactedMessageLike(ev) => {
269 Self::FailedToParseMessageLike { event_type: ev.content.event_type, error }
270 }
271 SyncTimelineEventWithoutContent::OriginalState(ev) => Self::FailedToParseState {
272 event_type: ev.content.event_type,
273 state_key: ev.state_key,
274 error,
275 },
276 SyncTimelineEventWithoutContent::RedactedState(ev) => Self::FailedToParseState {
277 event_type: ev.content.event_type,
278 state_key: ev.state_key,
279 error,
280 },
281 }
282 }
283}
284
285#[derive(Clone, Copy, Debug)]
287pub(super) enum TimelineItemPosition {
288 Start {
291 origin: RemoteEventOrigin,
293 },
294
295 End {
298 origin: RemoteEventOrigin,
300 },
301
302 At {
304 event_index: usize,
306
307 origin: RemoteEventOrigin,
309 },
310
311 UpdateAt {
316 timeline_item_index: usize,
318 },
319}
320
321#[derive(Default)]
324pub(super) struct HandleEventResult {
325 pub(super) item_added: bool,
327
328 pub(super) item_removed: bool,
333
334 pub(super) items_updated: u16,
336}
337
338pub(super) struct TimelineEventHandler<'a, 'o> {
345 items: &'a mut ObservableItemsTransaction<'o>,
346 meta: &'a mut TimelineMetadata,
347 ctx: TimelineEventContext,
348 result: HandleEventResult,
349}
350
351impl<'a, 'o> TimelineEventHandler<'a, 'o> {
352 pub(super) fn new(
353 state: &'a mut TimelineStateTransaction<'o>,
354 ctx: TimelineEventContext,
355 ) -> Self {
356 let TimelineStateTransaction { items, meta, .. } = state;
357 Self { items, meta, ctx, result: HandleEventResult::default() }
358 }
359
360 #[instrument(skip_all, fields(txn_id, event_id, position))]
367 pub(super) async fn handle_event(
368 mut self,
369 date_divider_adjuster: &mut DateDividerAdjuster,
370 event_kind: TimelineEventKind,
371 ) -> HandleEventResult {
372 let span = tracing::Span::current();
373
374 date_divider_adjuster.mark_used();
375
376 match &self.ctx.flow {
377 Flow::Local { txn_id, .. } => {
378 span.record("txn_id", debug(txn_id));
379 debug!("Handling local event");
380 }
381
382 Flow::Remote { event_id, txn_id, position, .. } => {
383 span.record("event_id", debug(event_id));
384 span.record("position", debug(position));
385 if let Some(txn_id) = txn_id {
386 span.record("txn_id", debug(txn_id));
387 }
388 trace!("Handling remote event");
389 }
390 };
391
392 let should_add = self.ctx.should_add_new_items;
393
394 match event_kind {
395 TimelineEventKind::Message { content, relations } => match content {
396 AnyMessageLikeEventContent::Reaction(c) => {
397 self.handle_reaction(c);
398 }
399
400 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
401 relates_to: Some(Relation::Replacement(re)),
402 ..
403 }) => {
404 self.handle_room_message_edit(re);
405 }
406
407 AnyMessageLikeEventContent::RoomMessage(c) => {
408 if should_add {
409 self.handle_room_message(c, relations);
410 }
411 }
412
413 AnyMessageLikeEventContent::Sticker(content) => {
414 if should_add {
415 self.add_item(
416 TimelineItemContent::MsgLike(MsgLikeContent {
417 kind: MsgLikeKind::Sticker(Sticker { content }),
418 reactions: Default::default(),
419 thread_root: None,
420 in_reply_to: None,
421 }),
422 None,
423 );
424 }
425 }
426
427 AnyMessageLikeEventContent::UnstablePollStart(
428 UnstablePollStartEventContent::Replacement(c),
429 ) => self.handle_poll_edit(c.relates_to),
430
431 AnyMessageLikeEventContent::UnstablePollStart(
432 UnstablePollStartEventContent::New(c),
433 ) => {
434 if should_add {
435 self.handle_poll_start(c, relations)
436 }
437 }
438
439 AnyMessageLikeEventContent::UnstablePollResponse(c) => self.handle_poll_response(c),
440
441 AnyMessageLikeEventContent::UnstablePollEnd(c) => self.handle_poll_end(c),
442
443 AnyMessageLikeEventContent::CallInvite(_) => {
444 if should_add {
445 self.add_item(TimelineItemContent::CallInvite, None);
446 }
447 }
448
449 AnyMessageLikeEventContent::CallNotify(_) => {
450 if should_add {
451 self.add_item(TimelineItemContent::CallNotify, None)
452 }
453 }
454
455 _ => {
457 debug!(
458 "Ignoring message-like event of type `{}`, not supported (yet)",
459 content.event_type()
460 );
461 }
462 },
463
464 TimelineEventKind::UnableToDecrypt { content, utd_cause } => {
465 if should_add {
467 self.add_item(
468 TimelineItemContent::MsgLike(MsgLikeContent::unable_to_decrypt(
469 EncryptedMessage::from_content(content, utd_cause),
470 )),
471 None,
472 );
473 }
474
475 if let Some(hook) = self.meta.unable_to_decrypt_hook.as_ref() {
478 if let Some(event_id) = &self.ctx.flow.event_id() {
479 hook.on_utd(event_id, utd_cause, self.ctx.timestamp, &self.ctx.sender)
480 .await;
481 }
482 }
483 }
484
485 TimelineEventKind::RedactedMessage { event_type } => {
486 if event_type != MessageLikeEventType::Reaction && should_add {
487 self.add_item(TimelineItemContent::MsgLike(MsgLikeContent::redacted()), None);
488 }
489 }
490
491 TimelineEventKind::Redaction { redacts } => {
492 self.handle_redaction(redacts);
493 }
494
495 TimelineEventKind::RoomMember { user_id, content, sender } => {
496 if should_add {
497 self.add_item(TimelineItemContent::room_member(user_id, content, sender), None);
498 }
499 }
500
501 TimelineEventKind::OtherState { state_key, content } => {
502 if should_add {
505 self.add_item(
506 TimelineItemContent::OtherState(OtherState { state_key, content }),
507 None,
508 );
509 }
510 }
511
512 TimelineEventKind::FailedToParseMessageLike { event_type, error } => {
513 if should_add {
514 self.add_item(
515 TimelineItemContent::FailedToParseMessageLike { event_type, error },
516 None,
517 );
518 }
519 }
520
521 TimelineEventKind::FailedToParseState { event_type, state_key, error } => {
522 if should_add {
523 self.add_item(
524 TimelineItemContent::FailedToParseState { event_type, state_key, error },
525 None,
526 );
527 }
528 }
529 }
530
531 if !self.result.item_added {
532 trace!("No new item added");
533
534 if let Flow::Remote {
535 position: TimelineItemPosition::UpdateAt { timeline_item_index },
536 ..
537 } = self.ctx.flow
538 {
539 trace!("Removing UTD that was successfully retried");
542 self.items.remove(timeline_item_index);
543
544 self.result.item_removed = true;
545 }
546
547 }
549
550 self.result
551 }
552
553 #[instrument(skip_all)]
555 fn handle_room_message(
556 &mut self,
557 msg: RoomMessageEventContent,
558 relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
559 ) {
560 let pending_edit = self
565 .ctx
566 .flow
567 .event_id()
568 .and_then(|event_id| {
569 Self::maybe_unstash_pending_edit(&mut self.meta.pending_edits, event_id)
570 })
571 .and_then(|edit| match edit.kind {
572 PendingEditKind::RoomMessage(replacement) => {
573 Some((Some(edit.event_json), replacement.new_content))
574 }
575 _ => None,
576 });
577
578 let (edit_json, edit_content) = extract_room_msg_edit_content(relations)
579 .map(|content| {
580 let edit_json = self.ctx.flow.raw_event().and_then(extract_bundled_edit_event_json);
581 (edit_json, content)
582 })
583 .or(pending_edit)
584 .unzip();
585
586 let mut replied_to_event_id = None;
587 let mut thread_root = None;
588 let in_reply_to_details = msg.relates_to.as_ref().and_then(|relation| match relation {
589 Relation::Reply { in_reply_to } => {
590 replied_to_event_id = Some(in_reply_to.event_id.clone());
591 Some(InReplyToDetails::new(in_reply_to.event_id.clone(), self.items))
592 }
593 Relation::Thread(thread) => {
594 thread_root = Some(thread.event_id.clone());
595 thread.in_reply_to.as_ref().map(|in_reply_to| {
596 replied_to_event_id = Some(in_reply_to.event_id.clone());
597 InReplyToDetails::new(in_reply_to.event_id.clone(), self.items)
598 })
599 }
600 _ => None,
601 });
602
603 if let Some(event_id) = self.ctx.flow.event_id() {
606 if let Some(replied_to_event_id) = replied_to_event_id {
607 self.meta
609 .replies
610 .entry(replied_to_event_id)
611 .or_default()
612 .insert(event_id.to_owned());
613 }
614 }
615
616 let edit_json = edit_json.flatten();
617
618 self.add_item(
619 TimelineItemContent::message(
620 msg,
621 edit_content,
622 Default::default(),
623 thread_root,
624 in_reply_to_details,
625 ),
626 edit_json,
627 );
628 }
629
630 #[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))]
631 fn handle_room_message_edit(
632 &mut self,
633 replacement: Replacement<RoomMessageEventContentWithoutRelation>,
634 ) {
635 if let Some((item_pos, item)) = rfind_event_by_id(self.items, &replacement.event_id) {
636 let edit_json = self.ctx.flow.raw_event().cloned();
637 if let Some(new_item) = self.apply_msg_edit(&item, replacement.new_content, edit_json) {
638 trace!("Applied edit");
639
640 let internal_id = item.internal_id.to_owned();
641
642 Self::maybe_update_responses(
644 self.meta,
645 self.items,
646 &replacement.event_id,
647 &new_item,
648 );
649
650 self.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
652 self.result.items_updated += 1;
653 }
654 } else if let Flow::Remote { position, raw_event, .. } = &self.ctx.flow {
655 let replaced_event_id = replacement.event_id.clone();
656 let replacement = PendingEdit {
657 kind: PendingEditKind::RoomMessage(replacement),
658 event_json: raw_event.clone(),
659 };
660 self.stash_pending_edit(*position, replaced_event_id, replacement);
661 } else {
662 debug!("Local message edit for a timeline item not found, discarding");
663 }
664 }
665
666 #[instrument(skip(self, replacement))]
668 fn stash_pending_edit(
669 &mut self,
670 position: TimelineItemPosition,
671 replaced_event_id: OwnedEventId,
672 replacement: PendingEdit,
673 ) {
674 match position {
675 TimelineItemPosition::Start { .. }
676 | TimelineItemPosition::At { .. }
677 | TimelineItemPosition::UpdateAt { .. } => {
678 if !self
689 .meta
690 .pending_edits
691 .iter()
692 .any(|edit| edit.edited_event() == replaced_event_id)
693 {
694 self.meta.pending_edits.push(replacement);
695 debug!("Timeline item not found, stashing edit");
696 } else {
697 debug!("Timeline item not found, but there was a previous edit for the event: discarding");
698 }
699 }
700
701 TimelineItemPosition::End { .. } => {
702 let edits = &mut self.meta.pending_edits;
706 let _ = Self::maybe_unstash_pending_edit(edits, &replaced_event_id);
707 edits.push(replacement);
708 debug!("Timeline item not found, stashing edit");
709 }
710 }
711 }
712
713 fn maybe_unstash_pending_edit(
716 edits: &mut RingBuffer<PendingEdit>,
717 event_id: &EventId,
718 ) -> Option<PendingEdit> {
719 let pos = edits.iter().position(|edit| edit.edited_event() == event_id)?;
720 trace!(edited_event = %event_id, "unstashed pending edit");
721 Some(edits.remove(pos).unwrap())
722 }
723
724 fn apply_msg_edit(
729 &self,
730 item: &EventTimelineItem,
731 new_content: RoomMessageEventContentWithoutRelation,
732 edit_json: Option<Raw<AnySyncTimelineEvent>>,
733 ) -> Option<EventTimelineItem> {
734 if self.ctx.sender != item.sender() {
735 info!(
736 original_sender = ?item.sender(), edit_sender = ?self.ctx.sender,
737 "Edit event applies to another user's timeline item, discarding"
738 );
739 return None;
740 }
741
742 let TimelineItemContent::MsgLike(MsgLikeContent {
743 kind: MsgLikeKind::Message(msg),
744 reactions,
745 thread_root,
746 in_reply_to,
747 }) = item.content()
748 else {
749 info!(
750 "Edit of message event applies to {:?}, discarding",
751 item.content().debug_string(),
752 );
753 return None;
754 };
755
756 let mut new_msg = msg.clone();
757 new_msg.apply_edit(new_content);
758
759 let mut new_item = item.with_content_and_latest_edit(
760 TimelineItemContent::MsgLike(MsgLikeContent {
761 kind: MsgLikeKind::Message(new_msg),
762 reactions: reactions.clone(),
763 thread_root: thread_root.clone(),
764 in_reply_to: in_reply_to.clone(),
765 }),
766 edit_json,
767 );
768
769 if let Flow::Remote { encryption_info, .. } = &self.ctx.flow {
770 new_item = new_item.with_encryption_info(encryption_info.clone());
771 }
772
773 Some(new_item)
774 }
775
776 #[instrument(skip_all, fields(relates_to_event_id = ?c.relates_to.event_id))]
781 fn handle_reaction(&mut self, c: ReactionEventContent) {
782 let target = TimelineEventItemId::EventId(c.relates_to.event_id);
783
784 let reaction_status = match &self.ctx.flow {
786 Flow::Local { send_handle, .. } => {
787 ReactionStatus::LocalToRemote(send_handle.clone())
789 }
790 Flow::Remote { event_id, .. } => {
791 ReactionStatus::RemoteToRemote(event_id.clone())
793 }
794 };
795 let aggregation = Aggregation::new(
796 self.ctx.flow.timeline_item_id(),
797 AggregationKind::Reaction {
798 key: c.relates_to.key,
799 sender: self.ctx.sender.clone(),
800 timestamp: self.ctx.timestamp,
801 reaction_status,
802 },
803 );
804
805 self.meta.aggregations.add(target.clone(), aggregation.clone());
806 if find_item_and_apply_aggregation(self.items, &target, aggregation) {
807 self.result.items_updated += 1;
808 }
809 }
810
811 #[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))]
812 fn handle_poll_edit(
813 &mut self,
814 replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation>,
815 ) {
816 let Some((item_pos, item)) = rfind_event_by_id(self.items, &replacement.event_id) else {
817 if let Flow::Remote { position, raw_event, .. } = &self.ctx.flow {
818 let replaced_event_id = replacement.event_id.clone();
819 let replacement = PendingEdit {
820 kind: PendingEditKind::Poll(replacement),
821 event_json: raw_event.clone(),
822 };
823 self.stash_pending_edit(*position, replaced_event_id, replacement);
824 } else {
825 debug!("Local poll edit for a timeline item not found, discarding");
826 }
827 return;
828 };
829
830 let edit_json = self.ctx.flow.raw_event().cloned();
831
832 let Some(new_item) = self.apply_poll_edit(item.inner, replacement, edit_json) else {
833 return;
834 };
835
836 trace!("Applying poll start edit.");
837 self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned()));
838 self.result.items_updated += 1;
839 }
840
841 fn apply_poll_edit(
842 &self,
843 item: &EventTimelineItem,
844 replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation>,
845 edit_json: Option<Raw<AnySyncTimelineEvent>>,
846 ) -> Option<EventTimelineItem> {
847 if self.ctx.sender != item.sender() {
848 info!(
849 original_sender = ?item.sender(), edit_sender = ?self.ctx.sender,
850 "Edit event applies to another user's timeline item, discarding"
851 );
852 return None;
853 }
854
855 let TimelineItemContent::MsgLike(MsgLikeContent {
856 kind: MsgLikeKind::Poll(poll_state),
857 reactions,
858 thread_root,
859 in_reply_to,
860 }) = &item.content()
861 else {
862 info!("Edit of poll event applies to {}, discarding", item.content().debug_string(),);
863 return None;
864 };
865
866 let new_content = match poll_state.edit(replacement.new_content) {
867 Some(edited_poll_state) => TimelineItemContent::MsgLike(MsgLikeContent {
868 kind: MsgLikeKind::Poll(edited_poll_state),
869 reactions: reactions.clone(),
870 thread_root: thread_root.clone(),
871 in_reply_to: in_reply_to.clone(),
872 }),
873 None => {
874 info!("Not applying edit to a poll that's already ended");
875 return None;
876 }
877 };
878
879 Some(item.with_content_and_latest_edit(new_content, edit_json))
880 }
881
882 fn handle_poll_start(
884 &mut self,
885 c: NewUnstablePollStartEventContent,
886 relations: BundledMessageLikeRelations<AnySyncMessageLikeEvent>,
887 ) {
888 let pending_edit = self
893 .ctx
894 .flow
895 .event_id()
896 .and_then(|event_id| {
897 Self::maybe_unstash_pending_edit(&mut self.meta.pending_edits, event_id)
898 })
899 .and_then(|edit| match edit.kind {
900 PendingEditKind::Poll(replacement) => {
901 Some((Some(edit.event_json), replacement.new_content))
902 }
903 _ => None,
904 });
905
906 let (edit_json, edit_content) = extract_poll_edit_content(relations)
907 .map(|content| {
908 let edit_json = self.ctx.flow.raw_event().and_then(extract_bundled_edit_event_json);
909 (edit_json, content)
910 })
911 .or(pending_edit)
912 .unzip();
913
914 let poll_state = PollState::new(c, edit_content);
915
916 let edit_json = edit_json.flatten();
917
918 self.add_item(
919 TimelineItemContent::MsgLike(MsgLikeContent {
920 kind: MsgLikeKind::Poll(poll_state),
921 reactions: Default::default(),
922 thread_root: None,
923 in_reply_to: None,
924 }),
925 edit_json,
926 );
927 }
928
929 fn handle_poll_response(&mut self, c: UnstablePollResponseEventContent) {
930 let target = TimelineEventItemId::EventId(c.relates_to.event_id);
931 let aggregation = Aggregation::new(
932 self.ctx.flow.timeline_item_id(),
933 AggregationKind::PollResponse {
934 sender: self.ctx.sender.clone(),
935 timestamp: self.ctx.timestamp,
936 answers: c.poll_response.answers,
937 },
938 );
939 self.meta.aggregations.add(target.clone(), aggregation.clone());
940 if find_item_and_apply_aggregation(self.items, &target, aggregation) {
941 self.result.items_updated += 1;
942 }
943 }
944
945 fn handle_poll_end(&mut self, c: UnstablePollEndEventContent) {
946 let target = TimelineEventItemId::EventId(c.relates_to.event_id);
947 let aggregation = Aggregation::new(
948 self.ctx.flow.timeline_item_id(),
949 AggregationKind::PollEnd { end_date: self.ctx.timestamp },
950 );
951 self.meta.aggregations.add(target.clone(), aggregation.clone());
952 if find_item_and_apply_aggregation(self.items, &target, aggregation) {
953 self.result.items_updated += 1;
954 }
955 }
956
957 #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
963 fn handle_redaction(&mut self, redacted: OwnedEventId) {
964 if self.handle_aggregation_redaction(redacted.clone()) {
969 return;
972 }
973
974 if let Some((idx, item)) = rfind_event_by_id(self.items, &redacted) {
976 if item.as_remote().is_some() {
977 if item.content.is_redacted() {
978 debug!("event item is already redacted");
979 } else {
980 let new_item = item.redact(&self.meta.room_version);
981 let internal_id = item.internal_id.to_owned();
982
983 Self::maybe_update_responses(self.meta, self.items, &redacted, &new_item);
986
987 self.items.replace(idx, TimelineItem::new(new_item, internal_id));
988 self.result.items_updated += 1;
989 }
990 } else {
991 error!("inconsistent state: redaction received on a non-remote event item");
992 }
993 } else {
994 debug!("Timeline item not found, discarding redaction");
995 };
996 }
997
998 #[instrument(skip_all, fields(redacts = ?aggregation_id))]
1003 fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
1004 let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
1005
1006 let Some((target, aggregation)) =
1007 self.meta.aggregations.try_remove_aggregation(&aggregation_id)
1008 else {
1009 return false;
1011 };
1012
1013 if let Some((item_pos, item)) = rfind_event_by_item_id(self.items, target) {
1014 let mut content = item.content().clone();
1015 match aggregation.unapply(&mut content) {
1016 ApplyAggregationResult::UpdatedItem => {
1017 trace!("removed aggregation");
1018 let internal_id = item.internal_id.to_owned();
1019 let new_item = item.with_content(content);
1020 self.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
1021 self.result.items_updated += 1;
1022 }
1023 ApplyAggregationResult::LeftItemIntact => {}
1024 ApplyAggregationResult::Error(err) => {
1025 warn!("error when unapplying aggregation: {err}");
1026 }
1027 }
1028 } else {
1029 info!("missing related-to item ({target:?}) for aggregation {aggregation_id:?}");
1030 }
1031
1032 true
1034 }
1035
1036 fn add_item(
1049 &mut self,
1050 mut content: TimelineItemContent,
1051 edit_json: Option<Raw<AnySyncTimelineEvent>>,
1052 ) {
1053 self.result.item_added = true;
1054
1055 if let Err(err) =
1057 self.meta.aggregations.apply(&self.ctx.flow.timeline_item_id(), &mut content)
1058 {
1059 warn!("discarding aggregations: {err}");
1060 }
1061
1062 let sender = self.ctx.sender.to_owned();
1063 let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
1064 let timestamp = self.ctx.timestamp;
1065
1066 let kind: EventTimelineItemKind = match &self.ctx.flow {
1067 Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
1068 send_state: EventSendState::NotSentYet,
1069 transaction_id: txn_id.to_owned(),
1070 send_handle: send_handle.clone(),
1071 }
1072 .into(),
1073
1074 Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
1075 let origin = match *position {
1076 TimelineItemPosition::Start { origin }
1077 | TimelineItemPosition::End { origin }
1078 | TimelineItemPosition::At { origin, .. } => origin,
1079
1080 TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
1082 .as_event()
1083 .and_then(|ev| Some(ev.as_remote()?.origin))
1084 .unwrap_or_else(|| {
1085 error!("Tried to update a local event");
1086 RemoteEventOrigin::Unknown
1087 }),
1088 };
1089
1090 RemoteEventTimelineItem {
1091 event_id: event_id.clone(),
1092 transaction_id: txn_id.clone(),
1093 read_receipts: self.ctx.read_receipts.clone(),
1094 is_own: self.ctx.is_own_event,
1095 is_highlighted: self.ctx.is_highlighted,
1096 encryption_info: encryption_info.clone(),
1097 original_json: Some(raw_event.clone()),
1098 latest_edit_json: edit_json,
1099 origin,
1100 }
1101 .into()
1102 }
1103 };
1104
1105 let is_room_encrypted = self.meta.is_room_encrypted;
1106
1107 let item = EventTimelineItem::new(
1108 sender,
1109 sender_profile,
1110 timestamp,
1111 content,
1112 kind,
1113 is_room_encrypted,
1114 );
1115
1116 match &self.ctx.flow {
1117 Flow::Local { .. } => {
1118 trace!("Adding new local timeline item");
1119
1120 let item = self.meta.new_timeline_item(item);
1121
1122 self.items.push_back(item, None);
1123 }
1124
1125 Flow::Remote {
1126 position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
1127 } => {
1128 let item = Self::recycle_local_or_create_item(
1129 self.items,
1130 self.meta,
1131 item,
1132 event_id,
1133 txn_id.as_deref(),
1134 );
1135
1136 trace!("Adding new remote timeline item at the start");
1137
1138 self.items.push_front(item, Some(0));
1139 }
1140
1141 Flow::Remote {
1142 position: TimelineItemPosition::At { event_index, .. },
1143 event_id,
1144 txn_id,
1145 ..
1146 } => {
1147 let item = Self::recycle_local_or_create_item(
1148 self.items,
1149 self.meta,
1150 item,
1151 event_id,
1152 txn_id.as_deref(),
1153 );
1154
1155 let all_remote_events = self.items.all_remote_events();
1156 let event_index = *event_index;
1157
1158 let timeline_item_index = all_remote_events
1160 .range(0..=event_index)
1161 .rev()
1162 .find_map(|event_meta| event_meta.timeline_item_index)
1163 .map(|timeline_item_index| timeline_item_index + 1);
1165
1166 let timeline_item_index = timeline_item_index.or_else(|| {
1169 all_remote_events
1170 .range(event_index + 1..)
1171 .find_map(|event_meta| event_meta.timeline_item_index)
1172 });
1173
1174 let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
1177 self.items
1178 .iter()
1179 .enumerate()
1180 .rev()
1181 .find_map(|(timeline_item_index, timeline_item)| {
1182 (!timeline_item.as_event()?.is_local_echo())
1183 .then_some(timeline_item_index + 1)
1184 })
1185 .unwrap_or_else(|| {
1186 if self.items.get(0).is_some_and(|item| item.is_timeline_start()) {
1191 1
1192 } else {
1193 0
1194 }
1195 })
1196 });
1197
1198 trace!(
1199 ?event_index,
1200 ?timeline_item_index,
1201 "Adding new remote timeline at specific event index"
1202 );
1203
1204 self.items.insert(timeline_item_index, item, Some(event_index));
1205 }
1206
1207 Flow::Remote {
1208 position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
1209 } => {
1210 let item = Self::recycle_local_or_create_item(
1211 self.items,
1212 self.meta,
1213 item,
1214 event_id,
1215 txn_id.as_deref(),
1216 );
1217
1218 let timeline_item_index = self
1221 .items
1222 .iter()
1223 .enumerate()
1224 .rev()
1225 .find_map(|(timeline_item_index, timeline_item)| {
1226 (!timeline_item.as_event()?.is_local_echo())
1227 .then_some(timeline_item_index + 1)
1228 })
1229 .unwrap_or(0);
1230
1231 let event_index = self
1232 .items
1233 .all_remote_events()
1234 .last_index()
1235 .or_else(|| {
1239 error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
1240
1241 Some(0)
1242 });
1243
1244 if timeline_item_index == self.items.len() {
1253 trace!("Adding new remote timeline item at the back");
1254 self.items.push_back(item, event_index);
1255 } else if timeline_item_index == 0 {
1256 trace!("Adding new remote timeline item at the front");
1257 self.items.push_front(item, event_index);
1258 } else {
1259 trace!(
1260 timeline_item_index,
1261 "Adding new remote timeline item at specific index"
1262 );
1263 self.items.insert(timeline_item_index, item, event_index);
1264 }
1265 }
1266
1267 Flow::Remote {
1268 event_id: decrypted_event_id,
1269 position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
1270 ..
1271 } => {
1272 trace!("Updating timeline item at position {idx}");
1273
1274 Self::maybe_update_responses(self.meta, self.items, decrypted_event_id, &item);
1276
1277 let internal_id = self.items[*idx].internal_id.clone();
1278 self.items.replace(*idx, TimelineItem::new(item, internal_id));
1279 }
1280 }
1281
1282 if !self.meta.has_up_to_date_read_marker_item {
1284 self.meta.update_read_marker(self.items);
1285 }
1286 }
1287
1288 fn recycle_local_or_create_item(
1294 items: &mut ObservableItemsTransaction<'_>,
1295 meta: &mut TimelineMetadata,
1296 mut new_item: EventTimelineItem,
1297 event_id: &EventId,
1298 transaction_id: Option<&TransactionId>,
1299 ) -> Arc<TimelineItem> {
1300 if let Some((local_timeline_item_index, local_timeline_item)) = items
1302 .iter()
1303 .enumerate()
1305 .rev()
1307 .try_fold((), |(), (nth, timeline_item)| {
1318 let Some(event_timeline_item) = timeline_item.as_event() else {
1319 return ControlFlow::Break(None);
1321 };
1322
1323 if !event_timeline_item.is_local_echo() {
1325 return ControlFlow::Break(None);
1326 }
1327
1328 if Some(event_id) == event_timeline_item.event_id()
1329 || (transaction_id.is_some()
1330 && transaction_id == event_timeline_item.transaction_id())
1331 {
1332 ControlFlow::Break(Some((nth, event_timeline_item)))
1334 } else {
1335 ControlFlow::Continue(())
1338 }
1339 })
1340 .break_value()
1341 .flatten()
1342 {
1343 trace!(
1344 ?event_id,
1345 ?transaction_id,
1346 ?local_timeline_item_index,
1347 "Removing local timeline item"
1348 );
1349
1350 transfer_details(&mut new_item, local_timeline_item);
1351
1352 let recycled = items.remove(local_timeline_item_index);
1354 TimelineItem::new(new_item, recycled.internal_id.clone())
1355 } else {
1356 meta.new_timeline_item(new_item)
1358 }
1359 }
1360
1361 fn maybe_update_responses(
1364 meta: &mut TimelineMetadata,
1365 items: &mut ObservableItemsTransaction<'_>,
1366 target_event_id: &EventId,
1367 new_item: &EventTimelineItem,
1368 ) {
1369 let Some(replies) = meta.replies.get(target_event_id) else {
1370 trace!("item has no replies");
1371 return;
1372 };
1373
1374 for reply_id in replies {
1375 let Some(timeline_item_index) = items
1376 .get_remote_event_by_event_id(reply_id)
1377 .and_then(|meta| meta.timeline_item_index)
1378 else {
1379 warn!(%reply_id, "event not known as an item in the timeline");
1380 continue;
1381 };
1382
1383 let Some(item) = items.get(timeline_item_index) else {
1384 warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1385 continue;
1386 };
1387
1388 let Some(event_item) = item.as_event() else { continue };
1389 let Some(msglike) = event_item.content.as_msglike() else { continue };
1390 let Some(message) = msglike.as_message() else { continue };
1391 let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1392
1393 trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1394 let in_reply_to = Some(InReplyToDetails {
1395 event_id: in_reply_to.event_id.clone(),
1396 event: TimelineDetails::Ready(Box::new(RepliedToEvent::from_timeline_item(
1397 new_item,
1398 ))),
1399 });
1400
1401 let new_reply_content = TimelineItemContent::MsgLike(MsgLikeContent {
1402 kind: MsgLikeKind::Message(message.clone()),
1403 reactions: msglike.reactions.clone(),
1404 thread_root: msglike.thread_root.clone(),
1405 in_reply_to,
1406 });
1407 let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1408 items.replace(timeline_item_index, new_reply_item);
1409 }
1410 }
1411}
1412
1413fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1421 let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1422 return;
1423 };
1424 let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1425 return;
1426 };
1427
1428 let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1429 let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1430
1431 if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1432 in_reply_to.event = old_in_reply_to.event.clone();
1433 }
1434}