matrix_sdk_ui/timeline/
event_handler.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{borrow::Cow, sync::Arc};
16
17use as_variant::as_variant;
18use indexmap::IndexMap;
19use matrix_sdk::{
20    deserialized_responses::{EncryptionInfo, UnableToDecryptInfo},
21    send_queue::SendHandle,
22};
23use matrix_sdk_base::crypto::types::events::UtdCause;
24use ruma::{
25    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
26    TransactionId,
27    events::{
28        AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncStateEvent,
29        AnySyncTimelineEvent, FullStateEventContent, MessageLikeEventContent, MessageLikeEventType,
30        StateEventType, SyncStateEvent,
31        poll::unstable_start::{
32            NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartEventContent,
33        },
34        receipt::Receipt,
35        relation::Replacement,
36        room::message::{
37            Relation, RoomMessageEventContent, RoomMessageEventContentWithoutRelation,
38        },
39    },
40    serde::Raw,
41};
42use tracing::{debug, error, field::debug, instrument, trace, warn};
43
44use super::{
45    EmbeddedEvent, EncryptedMessage, EventTimelineItem, InReplyToDetails, MsgLikeContent,
46    MsgLikeKind, OtherState, ReactionStatus, Sticker, ThreadSummary, TimelineDetails, TimelineItem,
47    TimelineItemContent,
48    controller::{
49        Aggregation, AggregationKind, ObservableItemsTransaction, PendingEditKind,
50        TimelineMetadata, TimelineStateTransaction, find_item_and_apply_aggregation,
51    },
52    date_dividers::DateDividerAdjuster,
53    event_item::{
54        AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind,
55        LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem,
56        TimelineEventItemId,
57    },
58    traits::RoomDataProvider,
59};
60use crate::{
61    timeline::{controller::aggregations::PendingEdit, event_item::OtherMessageLike},
62    unable_to_decrypt_hook::UtdHookManager,
63};
64
65/// When adding an event, useful information related to the source of the event.
66pub(super) enum Flow {
67    /// The event was locally created.
68    Local {
69        /// The transaction id we've used in requests associated to this event.
70        txn_id: OwnedTransactionId,
71
72        /// A handle to manipulate this event.
73        send_handle: Option<SendHandle>,
74    },
75
76    /// The event has been received from a remote source (sync, pagination,
77    /// etc.). This can be a "remote echo".
78    Remote {
79        /// The event identifier as returned by the server.
80        event_id: OwnedEventId,
81        /// The transaction id we might have used, if we're the sender of the
82        /// event.
83        txn_id: Option<OwnedTransactionId>,
84        /// The raw serialized JSON event.
85        raw_event: Raw<AnySyncTimelineEvent>,
86        /// Where should this be added in the timeline.
87        position: TimelineItemPosition,
88        /// Information about the encryption for this event.
89        encryption_info: Option<Arc<EncryptionInfo>>,
90    },
91}
92
93impl Flow {
94    /// Returns the [`TimelineEventItemId`] associated to this future item.
95    pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId {
96        match self {
97            Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()),
98            Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()),
99        }
100    }
101
102    /// If the flow is remote, returns the associated full raw event.
103    pub(crate) fn raw_event(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
104        as_variant!(self, Flow::Remote { raw_event, .. } => raw_event)
105    }
106}
107
108pub(super) struct TimelineEventContext {
109    pub(super) sender: OwnedUserId,
110    pub(super) sender_profile: Option<Profile>,
111    /// The event's `origin_server_ts` field (or creation time for local echo).
112    pub(super) timestamp: MilliSecondsSinceUnixEpoch,
113    pub(super) read_receipts: IndexMap<OwnedUserId, Receipt>,
114    pub(super) is_highlighted: bool,
115    pub(super) flow: Flow,
116
117    /// If the event represents a new item, should it be added to the timeline?
118    ///
119    /// This controls whether a new timeline *may* be added. If the update kind
120    /// is about an update to an existing timeline item (redaction, edit,
121    /// reaction, etc.), it's always handled by default.
122    pub(super) should_add_new_items: bool,
123}
124
125/// Which kind of aggregation (i.e. modification of a related event) are we
126/// going to handle?
127#[derive(Clone, Debug)]
128pub(super) enum HandleAggregationKind {
129    /// Adding a reaction to the related event.
130    Reaction { key: String },
131
132    /// Redacting (removing) the related event.
133    Redaction,
134
135    /// Editing (replacing) the related event with another one.
136    Edit { replacement: Replacement<RoomMessageEventContentWithoutRelation> },
137
138    /// Responding to the related poll event.
139    PollResponse { answers: Vec<String> },
140
141    /// Editing a related poll event's description.
142    PollEdit { replacement: Replacement<NewUnstablePollStartEventContentWithoutRelation> },
143
144    /// Ending a related poll.
145    PollEnd,
146}
147
148impl HandleAggregationKind {
149    /// Returns a small string describing this aggregation, for debug purposes.
150    pub fn debug_string(&self) -> &'static str {
151        match self {
152            HandleAggregationKind::Reaction { .. } => "a reaction",
153            HandleAggregationKind::Redaction => "a redaction",
154            HandleAggregationKind::Edit { .. } => "an edit",
155            HandleAggregationKind::PollResponse { .. } => "a poll response",
156            HandleAggregationKind::PollEdit { .. } => "a poll edit",
157            HandleAggregationKind::PollEnd => "a poll end",
158        }
159    }
160}
161
162/// An action that we want to cause on the timeline.
163#[derive(Clone, Debug)]
164#[allow(clippy::large_enum_variant)]
165pub(super) enum TimelineAction {
166    /// Add a new timeline item.
167    ///
168    /// This enqueues adding a new item to the timeline (i.e. push to the items
169    /// array in its state). The item may be filtered out, and thus not
170    /// added later.
171    AddItem {
172        /// The content of the item we want to add.
173        content: TimelineItemContent,
174    },
175
176    /// Handle an aggregation to another event.
177    ///
178    /// The event the aggregation is related to might not be included in the
179    /// timeline, in which case it will be stashed somewhere, until we see
180    /// the related event.
181    HandleAggregation {
182        /// To which other event does this aggregation apply to?
183        related_event: OwnedEventId,
184        /// What kind of aggregation are we handling here?
185        kind: HandleAggregationKind,
186    },
187}
188
189impl TimelineAction {
190    /// Create a new [`TimelineEventKind::AddItem`].
191    fn add_item(content: TimelineItemContent) -> Self {
192        Self::AddItem { content }
193    }
194
195    /// Create a new [`TimelineAction`] from a given remote event.
196    ///
197    /// The return value may be `None` if the event was a redacted reaction.
198    #[allow(clippy::too_many_arguments)]
199    pub async fn from_event<P: RoomDataProvider>(
200        event: AnySyncTimelineEvent,
201        raw_event: &Raw<AnySyncTimelineEvent>,
202        room_data_provider: &P,
203        unable_to_decrypt: Option<(UnableToDecryptInfo, Option<&Arc<UtdHookManager>>)>,
204        in_reply_to: Option<InReplyToDetails>,
205        thread_root: Option<OwnedEventId>,
206        thread_summary: Option<ThreadSummary>,
207    ) -> Option<Self> {
208        let redaction_rules = room_data_provider.room_version_rules().redaction;
209
210        let redacted_message_or_none = |event_type: MessageLikeEventType| {
211            (event_type != MessageLikeEventType::Reaction)
212                .then_some(TimelineItemContent::MsgLike(MsgLikeContent::redacted()))
213        };
214
215        Some(match event {
216            AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
217                if let Some(redacts) = ev.redacts(&redaction_rules).map(ToOwned::to_owned) {
218                    Self::HandleAggregation {
219                        related_event: redacts,
220                        kind: HandleAggregationKind::Redaction,
221                    }
222                } else {
223                    Self::add_item(redacted_message_or_none(ev.event_type())?)
224                }
225            }
226
227            AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
228                Some(AnyMessageLikeEventContent::RoomEncrypted(content)) => {
229                    // An event which is still encrypted.
230                    if let Some((unable_to_decrypt_info, unable_to_decrypt_hook_manager)) =
231                        unable_to_decrypt
232                    {
233                        let utd_cause = UtdCause::determine(
234                            raw_event,
235                            room_data_provider.crypto_context_info().await,
236                            &unable_to_decrypt_info,
237                        );
238
239                        // Let the hook know that we ran into an unable-to-decrypt that is added to
240                        // the timeline.
241                        if let Some(hook) = unable_to_decrypt_hook_manager {
242                            hook.on_utd(
243                                ev.event_id(),
244                                utd_cause,
245                                ev.origin_server_ts(),
246                                ev.sender(),
247                            )
248                            .await;
249                        }
250
251                        Self::add_item(TimelineItemContent::MsgLike(
252                            MsgLikeContent::unable_to_decrypt(EncryptedMessage::from_content(
253                                content, utd_cause,
254                            )),
255                        ))
256                    } else {
257                        // If we get here, it means that some part of the code has created a
258                        // `TimelineEvent` containing an `m.room.encrypted` event without
259                        // decrypting it. Possibly this means that encryption has not been
260                        // configured. We treat it the same as any other message-like event.
261                        Self::from_content(
262                            AnyMessageLikeEventContent::RoomEncrypted(content),
263                            in_reply_to,
264                            thread_root,
265                            thread_summary,
266                        )
267                    }
268                }
269
270                Some(content) => {
271                    Self::from_content(content, in_reply_to, thread_root, thread_summary)
272                }
273
274                None => Self::add_item(redacted_message_or_none(ev.event_type())?),
275            },
276
277            AnySyncTimelineEvent::State(ev) => match ev {
278                AnySyncStateEvent::RoomMember(ev) => match ev {
279                    SyncStateEvent::Original(ev) => {
280                        Self::add_item(TimelineItemContent::room_member(
281                            ev.state_key,
282                            FullStateEventContent::Original {
283                                content: ev.content,
284                                prev_content: ev.unsigned.prev_content,
285                            },
286                            ev.sender,
287                        ))
288                    }
289                    SyncStateEvent::Redacted(ev) => {
290                        Self::add_item(TimelineItemContent::room_member(
291                            ev.state_key,
292                            FullStateEventContent::Redacted(ev.content),
293                            ev.sender,
294                        ))
295                    }
296                },
297                ev => Self::add_item(TimelineItemContent::OtherState(OtherState {
298                    state_key: ev.state_key().to_owned(),
299                    content: AnyOtherFullStateEventContent::with_event_content(ev.content()),
300                })),
301            },
302        })
303    }
304
305    /// Create a new [`TimelineAction`] from a given event's content.
306    ///
307    /// This is applicable to both remote event (as this is called from
308    /// [`TimelineAction::from_event`]) or local events (for which we only have
309    /// the content).
310    ///
311    /// The return value may be `None` if handling the event (be it a new item
312    /// or an aggregation) is not supported for this event type.
313    pub(super) fn from_content(
314        content: AnyMessageLikeEventContent,
315        in_reply_to: Option<InReplyToDetails>,
316        thread_root: Option<OwnedEventId>,
317        thread_summary: Option<ThreadSummary>,
318    ) -> Self {
319        match content {
320            AnyMessageLikeEventContent::Reaction(c) => {
321                // This is a reaction to a message.
322                Self::HandleAggregation {
323                    related_event: c.relates_to.event_id.clone(),
324                    kind: HandleAggregationKind::Reaction { key: c.relates_to.key },
325                }
326            }
327
328            AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
329                relates_to: Some(Relation::Replacement(re)),
330                ..
331            }) => Self::HandleAggregation {
332                related_event: re.event_id.clone(),
333                kind: HandleAggregationKind::Edit { replacement: re },
334            },
335
336            AnyMessageLikeEventContent::UnstablePollStart(
337                UnstablePollStartEventContent::Replacement(re),
338            ) => Self::HandleAggregation {
339                related_event: re.relates_to.event_id.clone(),
340                kind: HandleAggregationKind::PollEdit { replacement: re.relates_to },
341            },
342
343            AnyMessageLikeEventContent::UnstablePollResponse(c) => Self::HandleAggregation {
344                related_event: c.relates_to.event_id,
345                kind: HandleAggregationKind::PollResponse { answers: c.poll_response.answers },
346            },
347
348            AnyMessageLikeEventContent::UnstablePollEnd(c) => Self::HandleAggregation {
349                related_event: c.relates_to.event_id,
350                kind: HandleAggregationKind::PollEnd,
351            },
352
353            AnyMessageLikeEventContent::CallInvite(_) => {
354                Self::add_item(TimelineItemContent::CallInvite)
355            }
356
357            AnyMessageLikeEventContent::RtcNotification(_) => {
358                Self::add_item(TimelineItemContent::RtcNotification)
359            }
360
361            AnyMessageLikeEventContent::Sticker(content) => {
362                Self::add_item(TimelineItemContent::MsgLike(MsgLikeContent {
363                    kind: MsgLikeKind::Sticker(Sticker { content }),
364                    reactions: Default::default(),
365                    thread_root,
366                    in_reply_to,
367                    thread_summary,
368                }))
369            }
370
371            AnyMessageLikeEventContent::UnstablePollStart(UnstablePollStartEventContent::New(
372                c,
373            )) => {
374                let poll_state = PollState::new(c.poll_start, c.text);
375
376                Self::AddItem {
377                    content: TimelineItemContent::MsgLike(MsgLikeContent {
378                        kind: MsgLikeKind::Poll(poll_state),
379                        reactions: Default::default(),
380                        thread_root,
381                        in_reply_to,
382                        thread_summary,
383                    }),
384                }
385            }
386
387            AnyMessageLikeEventContent::RoomMessage(msg) => Self::AddItem {
388                content: TimelineItemContent::message(
389                    msg.msgtype,
390                    msg.mentions,
391                    Default::default(),
392                    thread_root,
393                    in_reply_to,
394                    thread_summary,
395                ),
396            },
397
398            event => {
399                let other = OtherMessageLike { event_type: event.event_type() };
400
401                Self::AddItem {
402                    content: TimelineItemContent::MsgLike(MsgLikeContent {
403                        kind: MsgLikeKind::Other(other),
404                        reactions: Default::default(),
405                        thread_root,
406                        in_reply_to,
407                        thread_summary,
408                    }),
409                }
410            }
411        }
412    }
413
414    pub(super) fn failed_to_parse(event: FailedToParseEvent, error: serde_json::Error) -> Self {
415        let error = Arc::new(error);
416        match event {
417            FailedToParseEvent::State { event_type, state_key } => {
418                Self::add_item(TimelineItemContent::FailedToParseState {
419                    event_type,
420                    state_key,
421                    error,
422                })
423            }
424            FailedToParseEvent::MsgLike(event_type) => {
425                Self::add_item(TimelineItemContent::FailedToParseMessageLike { event_type, error })
426            }
427        }
428    }
429}
430
431#[derive(Debug)]
432pub(super) enum FailedToParseEvent {
433    MsgLike(MessageLikeEventType),
434    State { event_type: StateEventType, state_key: String },
435}
436
437/// The position at which to perform an update of the timeline with events.
438#[derive(Clone, Copy, Debug)]
439pub(super) enum TimelineItemPosition {
440    /// One or more items are prepended to the timeline (i.e. they're the
441    /// oldest).
442    Start {
443        /// The origin of the new item(s).
444        origin: RemoteEventOrigin,
445    },
446
447    /// One or more items are appended to the timeline (i.e. they're the most
448    /// recent).
449    End {
450        /// The origin of the new item(s).
451        origin: RemoteEventOrigin,
452    },
453
454    /// One item is inserted to the timeline.
455    At {
456        /// Where to insert the remote event.
457        event_index: usize,
458
459        /// The origin of the new item.
460        origin: RemoteEventOrigin,
461    },
462
463    /// A single item is updated.
464    ///
465    /// This can happen for instance after a UTD has been successfully
466    /// decrypted, or when it's been redacted at the source.
467    UpdateAt {
468        /// The index of the **timeline item**.
469        timeline_item_index: usize,
470    },
471}
472
473/// Whether an item was removed or not.
474pub(super) type RemovedItem = bool;
475
476/// Data necessary to update the timeline, given a single event to handle.
477///
478/// Bundles together a few things that are needed throughout the different
479/// stages of handling an event (figuring out whether it should update an
480/// existing timeline item, transforming that item or creating a new one,
481/// updating the reactive Vec).
482pub(super) struct TimelineEventHandler<'a, 'o> {
483    items: &'a mut ObservableItemsTransaction<'o>,
484    meta: &'a mut TimelineMetadata,
485    ctx: TimelineEventContext,
486}
487
488impl<'a, 'o> TimelineEventHandler<'a, 'o> {
489    pub(super) fn new<P: RoomDataProvider>(
490        state: &'a mut TimelineStateTransaction<'o, P>,
491        ctx: TimelineEventContext,
492    ) -> Self {
493        let TimelineStateTransaction { items, meta, .. } = state;
494        Self { items, meta, ctx }
495    }
496
497    /// Handle an event.
498    ///
499    /// Returns if an item was added to the timeline due to the new timeline
500    /// action. Items might not be added to the timeline for various reasons,
501    /// some common ones are if the item:
502    ///     - Contains an unsupported event type.
503    ///     - Is an edit or a redaction.
504    ///     - Contains a local echo turning into a remote echo.
505    ///     - Contains a message that is already in the timeline but was now
506    ///       decrypted.
507    ///
508    /// `raw_event` is only needed to determine the cause of any UTDs,
509    /// so if we know this is not a UTD it can be None.
510    #[instrument(skip_all, fields(txn_id, event_id, position))]
511    pub(super) async fn handle_event(
512        mut self,
513        date_divider_adjuster: &mut DateDividerAdjuster,
514        timeline_action: TimelineAction,
515    ) -> bool {
516        let span = tracing::Span::current();
517
518        date_divider_adjuster.mark_used();
519
520        match &self.ctx.flow {
521            Flow::Local { txn_id, .. } => {
522                span.record("txn_id", debug(txn_id));
523                debug!("Handling local event");
524            }
525
526            Flow::Remote { event_id, txn_id, position, .. } => {
527                span.record("event_id", debug(event_id));
528                span.record("position", debug(position));
529                if let Some(txn_id) = txn_id {
530                    span.record("txn_id", debug(txn_id));
531                }
532                trace!("Handling remote event");
533            }
534        }
535
536        let mut added_item = false;
537
538        match timeline_action {
539            TimelineAction::AddItem { content } => {
540                if self.ctx.should_add_new_items {
541                    self.add_item(content);
542                    added_item = true;
543                }
544            }
545
546            TimelineAction::HandleAggregation { related_event, kind } => match kind {
547                HandleAggregationKind::Reaction { key } => {
548                    self.handle_reaction(related_event, key);
549                }
550                HandleAggregationKind::Redaction => {
551                    self.handle_redaction(related_event);
552                }
553                HandleAggregationKind::Edit { replacement } => {
554                    self.handle_edit(
555                        replacement.event_id.clone(),
556                        PendingEditKind::RoomMessage(replacement),
557                    );
558                }
559                HandleAggregationKind::PollResponse { answers } => {
560                    self.handle_poll_response(related_event, answers);
561                }
562                HandleAggregationKind::PollEdit { replacement } => {
563                    self.handle_edit(
564                        replacement.event_id.clone(),
565                        PendingEditKind::Poll(replacement),
566                    );
567                }
568                HandleAggregationKind::PollEnd => {
569                    self.handle_poll_end(related_event);
570                }
571            },
572        }
573
574        added_item
575    }
576
577    #[instrument(skip(self, edit_kind))]
578    fn handle_edit(&mut self, edited_event_id: OwnedEventId, edit_kind: PendingEditKind) {
579        let target = TimelineEventItemId::EventId(edited_event_id.clone());
580
581        let encryption_info =
582            as_variant!(&self.ctx.flow, Flow::Remote { encryption_info, .. } => encryption_info.clone()).flatten();
583        let aggregation = Aggregation::new(
584            self.ctx.flow.timeline_item_id(),
585            AggregationKind::Edit(PendingEdit {
586                kind: edit_kind,
587                edit_json: self.ctx.flow.raw_event().cloned(),
588                encryption_info,
589                bundled_item_owner: None,
590            }),
591        );
592
593        self.meta.aggregations.add(target.clone(), aggregation.clone());
594
595        if let Some(new_item) = find_item_and_apply_aggregation(
596            &self.meta.aggregations,
597            self.items,
598            &target,
599            aggregation,
600            &self.meta.room_version_rules,
601        ) {
602            // Update all events that replied to this message with the edited content.
603            Self::maybe_update_responses(
604                self.meta,
605                self.items,
606                &edited_event_id,
607                EmbeddedEvent::from_timeline_item(&new_item),
608            );
609        }
610    }
611
612    /// Apply a reaction to a *remote* event.
613    ///
614    /// Reactions to local events are applied in
615    /// [`crate::timeline::TimelineController::handle_local_echo`].
616    #[instrument(skip(self))]
617    fn handle_reaction(&mut self, relates_to: OwnedEventId, reaction_key: String) {
618        let target = TimelineEventItemId::EventId(relates_to);
619
620        // Add the aggregation to the manager.
621        let reaction_status = match &self.ctx.flow {
622            Flow::Local { send_handle, .. } => {
623                // This is a local echo for a reaction to a remote event.
624                ReactionStatus::LocalToRemote(send_handle.clone())
625            }
626            Flow::Remote { event_id, .. } => {
627                // This is the remote echo for a reaction to a remote event.
628                ReactionStatus::RemoteToRemote(event_id.clone())
629            }
630        };
631
632        let aggregation = Aggregation::new(
633            self.ctx.flow.timeline_item_id(),
634            AggregationKind::Reaction {
635                key: reaction_key,
636                sender: self.ctx.sender.clone(),
637                timestamp: self.ctx.timestamp,
638                reaction_status,
639            },
640        );
641
642        self.meta.aggregations.add(target.clone(), aggregation.clone());
643        find_item_and_apply_aggregation(
644            &self.meta.aggregations,
645            self.items,
646            &target,
647            aggregation,
648            &self.meta.room_version_rules,
649        );
650    }
651
652    fn handle_poll_response(&mut self, poll_event_id: OwnedEventId, answers: Vec<String>) {
653        let target = TimelineEventItemId::EventId(poll_event_id);
654        let aggregation = Aggregation::new(
655            self.ctx.flow.timeline_item_id(),
656            AggregationKind::PollResponse {
657                sender: self.ctx.sender.clone(),
658                timestamp: self.ctx.timestamp,
659                answers,
660            },
661        );
662        self.meta.aggregations.add(target.clone(), aggregation.clone());
663        find_item_and_apply_aggregation(
664            &self.meta.aggregations,
665            self.items,
666            &target,
667            aggregation,
668            &self.meta.room_version_rules,
669        );
670    }
671
672    fn handle_poll_end(&mut self, poll_event_id: OwnedEventId) {
673        let target = TimelineEventItemId::EventId(poll_event_id);
674        let aggregation = Aggregation::new(
675            self.ctx.flow.timeline_item_id(),
676            AggregationKind::PollEnd { end_date: self.ctx.timestamp },
677        );
678        self.meta.aggregations.add(target.clone(), aggregation.clone());
679        find_item_and_apply_aggregation(
680            &self.meta.aggregations,
681            self.items,
682            &target,
683            aggregation,
684            &self.meta.room_version_rules,
685        );
686    }
687
688    /// Looks for the redacted event in all the timeline event items, and
689    /// redacts it.
690    ///
691    /// This assumes the redacted event was present in the timeline in the first
692    /// place; it will warn if the redacted event has not been found.
693    #[instrument(skip_all, fields(redacts_event_id = ?redacted))]
694    fn handle_redaction(&mut self, redacted: OwnedEventId) {
695        // TODO: Apply local redaction of PollResponse and PollEnd events.
696        // https://github.com/matrix-org/matrix-rust-sdk/pull/2381#issuecomment-1689647825
697
698        // If it's an aggregation that's being redacted, handle it here.
699        if self.handle_aggregation_redaction(redacted.clone()) {
700            // When we have raw timeline items, we should not return here anymore, as we
701            // might need to redact the raw item as well.
702            return;
703        }
704
705        let target = TimelineEventItemId::EventId(redacted.clone());
706        let aggregation =
707            Aggregation::new(self.ctx.flow.timeline_item_id(), AggregationKind::Redaction);
708        self.meta.aggregations.add(target.clone(), aggregation.clone());
709
710        find_item_and_apply_aggregation(
711            &self.meta.aggregations,
712            self.items,
713            &target,
714            aggregation,
715            &self.meta.room_version_rules,
716        );
717
718        // Even if the redacted event wasn't in the timeline, we can always update
719        // responses with a placeholder "redacted" embedded item.
720        let embedded_event = EmbeddedEvent {
721            content: TimelineItemContent::MsgLike(MsgLikeContent::redacted()),
722            sender: self.ctx.sender.clone(),
723            sender_profile: TimelineDetails::from_initial_value(self.ctx.sender_profile.clone()),
724            timestamp: self.ctx.timestamp,
725            identifier: TimelineEventItemId::EventId(redacted.clone()),
726        };
727
728        Self::maybe_update_responses(self.meta, self.items, &redacted, embedded_event);
729    }
730
731    /// Attempts to redact an aggregation (e.g. a reaction, a poll response,
732    /// etc.).
733    ///
734    /// Returns true if it's succeeded.
735    #[instrument(skip_all, fields(redacts = ?aggregation_id))]
736    fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool {
737        let aggregation_id = TimelineEventItemId::EventId(aggregation_id);
738
739        match self.meta.aggregations.try_remove_aggregation(&aggregation_id, self.items) {
740            Ok(val) => val,
741            // This wasn't a known aggregation that was redacted.
742            Err(err) => {
743                warn!("error while attempting to remove aggregation: {err}");
744                // It could find an aggregation but didn't properly unapply it.
745                true
746            }
747        }
748    }
749
750    /// Add a new event item in the timeline.
751    ///
752    /// # Safety
753    ///
754    /// This method is not marked as unsafe **but** it manipulates
755    /// [`ObservableItemsTransaction::all_remote_events`]. 2 rules **must** be
756    /// respected:
757    ///
758    /// 1. the remote event of the item being added **must** be present in
759    ///    `all_remote_events`,
760    /// 2. the lastly added or updated remote event must be associated to the
761    ///    timeline item being added here.
762    fn add_item(&mut self, content: TimelineItemContent) {
763        let sender = self.ctx.sender.to_owned();
764        let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone());
765        let timestamp = self.ctx.timestamp;
766
767        let kind: EventTimelineItemKind = match &self.ctx.flow {
768            Flow::Local { txn_id, send_handle } => LocalEventTimelineItem {
769                send_state: EventSendState::NotSentYet { progress: None },
770                transaction_id: txn_id.to_owned(),
771                send_handle: send_handle.clone(),
772            }
773            .into(),
774
775            Flow::Remote { event_id, raw_event, position, txn_id, encryption_info, .. } => {
776                let origin = match *position {
777                    TimelineItemPosition::Start { origin }
778                    | TimelineItemPosition::End { origin }
779                    | TimelineItemPosition::At { origin, .. } => origin,
780
781                    // For updates, reuse the origin of the encrypted event.
782                    TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self.items[idx]
783                        .as_event()
784                        .and_then(|ev| Some(ev.as_remote()?.origin))
785                        .unwrap_or_else(|| {
786                            error!("Tried to update a local event");
787                            RemoteEventOrigin::Unknown
788                        }),
789                };
790
791                RemoteEventTimelineItem {
792                    event_id: event_id.clone(),
793                    transaction_id: txn_id.clone(),
794                    read_receipts: self.ctx.read_receipts.clone(),
795                    is_own: self.ctx.sender == self.meta.own_user_id,
796                    is_highlighted: self.ctx.is_highlighted,
797                    encryption_info: encryption_info.clone(),
798                    original_json: Some(raw_event.clone()),
799                    latest_edit_json: None,
800                    origin,
801                }
802                .into()
803            }
804        };
805
806        let is_room_encrypted = self.meta.is_room_encrypted;
807
808        let item = EventTimelineItem::new(
809            sender,
810            sender_profile,
811            timestamp,
812            content,
813            kind,
814            is_room_encrypted,
815        );
816
817        // Apply any pending or stashed aggregations.
818        let mut cowed = Cow::Owned(item);
819        if let Err(err) = self.meta.aggregations.apply_all(
820            &self.ctx.flow.timeline_item_id(),
821            &mut cowed,
822            self.items,
823            &self.meta.room_version_rules,
824        ) {
825            warn!("discarding aggregations: {err}");
826        }
827        let item = cowed.into_owned();
828
829        match &self.ctx.flow {
830            Flow::Local { .. } => {
831                trace!("Adding new local timeline item");
832
833                let item = self.meta.new_timeline_item(item);
834
835                self.items.push_local(item);
836            }
837
838            Flow::Remote {
839                position: TimelineItemPosition::Start { .. }, event_id, txn_id, ..
840            } => {
841                let item = Self::recycle_local_or_create_item(
842                    self.items,
843                    self.meta,
844                    item,
845                    event_id,
846                    txn_id.as_deref(),
847                );
848
849                trace!("Adding new remote timeline item at the start");
850
851                self.items.push_front(item, Some(0));
852            }
853
854            Flow::Remote {
855                position: TimelineItemPosition::At { event_index, .. },
856                event_id,
857                txn_id,
858                ..
859            } => {
860                let item = Self::recycle_local_or_create_item(
861                    self.items,
862                    self.meta,
863                    item,
864                    event_id,
865                    txn_id.as_deref(),
866                );
867
868                let all_remote_events = self.items.all_remote_events();
869                let event_index = *event_index;
870
871                // Look for the closest `timeline_item_index` at the left of `event_index`.
872                let timeline_item_index = all_remote_events
873                    .range(0..=event_index)
874                    .rev()
875                    .find_map(|event_meta| event_meta.timeline_item_index)
876                    // The new `timeline_item_index` is the previous + 1.
877                    .map(|timeline_item_index| timeline_item_index + 1);
878
879                // No index? Look for the closest `timeline_item_index` at the right of
880                // `event_index`.
881                let timeline_item_index = timeline_item_index.or_else(|| {
882                    all_remote_events
883                        .range(event_index + 1..)
884                        .find_map(|event_meta| event_meta.timeline_item_index)
885                });
886
887                // Still no index? Well, it means there is no existing `timeline_item_index`
888                // so we are inserting at the last non-local item position as a fallback.
889                let timeline_item_index = timeline_item_index.unwrap_or_else(|| {
890                    self.items
891                        .iter_remotes_region()
892                        .rev()
893                        .find_map(|(timeline_item_index, timeline_item)| {
894                            timeline_item.as_event().map(|_| timeline_item_index + 1)
895                        })
896                        .unwrap_or_else(|| {
897                            // There is no remote timeline item, so we could insert at the start of
898                            // the remotes region.
899                            self.items.first_remotes_region_index()
900                        })
901                });
902
903                trace!(
904                    ?event_index,
905                    ?timeline_item_index,
906                    "Adding new remote timeline at specific event index"
907                );
908
909                self.items.insert(timeline_item_index, item, Some(event_index));
910            }
911
912            Flow::Remote {
913                position: TimelineItemPosition::End { .. }, event_id, txn_id, ..
914            } => {
915                let item = Self::recycle_local_or_create_item(
916                    self.items,
917                    self.meta,
918                    item,
919                    event_id,
920                    txn_id.as_deref(),
921                );
922
923                // Let's find the latest remote event and insert after it
924                let timeline_item_index = self
925                    .items
926                    .iter_remotes_region()
927                    .rev()
928                    .find_map(|(timeline_item_index, timeline_item)| {
929                        timeline_item.as_event().map(|_| timeline_item_index + 1)
930                    })
931                    .unwrap_or_else(|| {
932                        // There is no remote timeline item, so we could insert at the start of
933                        // the remotes region.
934                        self.items.first_remotes_region_index()
935                    });
936
937                let event_index = self
938                    .items
939                    .all_remote_events()
940                    .last_index()
941                    // The last remote event is necessarily associated to this
942                    // timeline item, see the contract of this method. Let's fallback to a similar
943                    // value as `timeline_item_index` instead of panicking.
944                    .or_else(|| {
945                        error!(?event_id, "Failed to read the last event index from `AllRemoteEvents`: at least one event must be present");
946
947                        Some(0)
948                    });
949
950                // Try to keep precise insertion semantics here, in this exact order:
951                //
952                // * _push back_ when the new item is inserted after all items (the assumption
953                // being that this is the hot path, because most of the time new events
954                // come from the sync),
955                // * _push front_ when the new item is inserted at index 0,
956                // * _insert_ otherwise.
957
958                if timeline_item_index == self.items.len() {
959                    trace!("Adding new remote timeline item at the back");
960                    self.items.push_back(item, event_index);
961                } else if timeline_item_index == 0 {
962                    trace!("Adding new remote timeline item at the front");
963                    self.items.push_front(item, event_index);
964                } else {
965                    trace!(
966                        timeline_item_index,
967                        "Adding new remote timeline item at specific index"
968                    );
969                    self.items.insert(timeline_item_index, item, event_index);
970                }
971            }
972
973            Flow::Remote {
974                event_id: decrypted_event_id,
975                position: TimelineItemPosition::UpdateAt { timeline_item_index: idx },
976                ..
977            } => {
978                trace!("Updating timeline item at position {idx}");
979
980                // Update all events that replied to this previously encrypted message.
981                Self::maybe_update_responses(
982                    self.meta,
983                    self.items,
984                    decrypted_event_id,
985                    EmbeddedEvent::from_timeline_item(&item),
986                );
987
988                let internal_id = self.items[*idx].internal_id.clone();
989                self.items.replace(*idx, TimelineItem::new(item, internal_id));
990            }
991        }
992
993        // If we don't have a read marker item, look if we need to add one now.
994        if !self.meta.has_up_to_date_read_marker_item {
995            self.meta.update_read_marker(self.items);
996        }
997    }
998
999    /// Try to recycle a local timeline item for the same event, or create a new
1000    /// timeline item for it.
1001    ///
1002    /// Note: this method doesn't take `&mut self` to avoid a borrow checker
1003    /// conflict with `TimelineEventHandler::add_item`.
1004    fn recycle_local_or_create_item(
1005        items: &mut ObservableItemsTransaction<'_>,
1006        meta: &mut TimelineMetadata,
1007        mut new_item: EventTimelineItem,
1008        event_id: &EventId,
1009        transaction_id: Option<&TransactionId>,
1010    ) -> Arc<TimelineItem> {
1011        // Detect a local timeline item that matches `event_id` or `transaction_id`.
1012        if let Some((local_timeline_item_index, local_timeline_item)) = items
1013            // Iterate the locals region.
1014            .iter_locals_region()
1015            // Iterate from the end to the start.
1016            .rev()
1017            .find_map(|(nth, timeline_item)| {
1018                let event_timeline_item = timeline_item.as_event()?;
1019
1020                if Some(event_id) == event_timeline_item.event_id()
1021                    || (transaction_id.is_some()
1022                        && transaction_id == event_timeline_item.transaction_id())
1023                {
1024                    // A duplicate local event timeline item has been found!
1025                    Some((nth, event_timeline_item))
1026                } else {
1027                    // This local event timeline is not the one we are looking for. Continue our
1028                    // search.
1029                    None
1030                }
1031            })
1032        {
1033            trace!(
1034                ?event_id,
1035                ?transaction_id,
1036                ?local_timeline_item_index,
1037                "Removing local timeline item"
1038            );
1039
1040            transfer_details(&mut new_item, local_timeline_item);
1041
1042            // Remove the local timeline item.
1043            let recycled = items.remove(local_timeline_item_index);
1044            TimelineItem::new(new_item, recycled.internal_id.clone())
1045        } else {
1046            // We haven't found a matching local item to recycle; create a new item.
1047            meta.new_timeline_item(new_item)
1048        }
1049    }
1050
1051    /// After updating the timeline item `new_item` which id is
1052    /// `target_event_id`, update other items that are responses to this item.
1053    fn maybe_update_responses(
1054        meta: &mut TimelineMetadata,
1055        items: &mut ObservableItemsTransaction<'_>,
1056        target_event_id: &EventId,
1057        new_embedded_event: EmbeddedEvent,
1058    ) {
1059        let Some(replies) = meta.replies.get(target_event_id) else {
1060            trace!("item has no replies");
1061            return;
1062        };
1063
1064        for reply_id in replies {
1065            let Some(timeline_item_index) = items
1066                .get_remote_event_by_event_id(reply_id)
1067                .and_then(|meta| meta.timeline_item_index)
1068            else {
1069                warn!(%reply_id, "event not known as an item in the timeline");
1070                continue;
1071            };
1072
1073            let Some(item) = items.get(timeline_item_index) else {
1074                warn!(%reply_id, timeline_item_index, "mapping from event id to timeline item likely incorrect");
1075                continue;
1076            };
1077
1078            let Some(event_item) = item.as_event() else { continue };
1079            let Some(msglike) = event_item.content.as_msglike() else { continue };
1080            let Some(message) = msglike.as_message() else { continue };
1081            let Some(in_reply_to) = msglike.in_reply_to.as_ref() else { continue };
1082
1083            trace!(reply_event_id = ?event_item.identifier(), "Updating response to updated event");
1084            let in_reply_to = InReplyToDetails {
1085                event_id: in_reply_to.event_id.clone(),
1086                event: TimelineDetails::Ready(Box::new(new_embedded_event.clone())),
1087            };
1088
1089            let new_reply_content = TimelineItemContent::MsgLike(
1090                msglike
1091                    .with_in_reply_to(in_reply_to)
1092                    .with_kind(MsgLikeKind::Message(message.clone())),
1093            );
1094            let new_reply_item = item.with_kind(event_item.with_content(new_reply_content));
1095            items.replace(timeline_item_index, new_reply_item);
1096        }
1097    }
1098}
1099
1100/// Transfer `TimelineDetails` that weren't available on the original
1101/// item and have been fetched separately (only `reply_to` for
1102/// now) from `old_item` to `item`, given two items for an event
1103/// that was re-received.
1104///
1105/// `old_item` *should* always be a local timeline item usually, but it
1106/// can be a remote timeline item.
1107fn transfer_details(new_item: &mut EventTimelineItem, old_item: &EventTimelineItem) {
1108    let TimelineItemContent::MsgLike(new_msglike) = &mut new_item.content else {
1109        return;
1110    };
1111    let TimelineItemContent::MsgLike(old_msglike) = &old_item.content else {
1112        return;
1113    };
1114
1115    let Some(in_reply_to) = &mut new_msglike.in_reply_to else { return };
1116    let Some(old_in_reply_to) = &old_msglike.in_reply_to else { return };
1117
1118    if matches!(&in_reply_to.event, TimelineDetails::Unavailable) {
1119        in_reply_to.event = old_in_reply_to.event.clone();
1120    }
1121}