Skip to main content

matrix_sdk_ui/timeline/controller/
aggregations.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use as_variant::as_variant;
43use matrix_sdk::{check_validity_of_replacement_events, deserialized_responses::EncryptionInfo};
44use ruma::{
45    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
46    events::{
47        AnySyncTimelineEvent, beacon_info::BeaconInfoEventContent,
48        poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
49        relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
50    },
51    room_version_rules::RoomVersionRules,
52    serde::Raw,
53};
54use tracing::{error, info, trace, warn};
55
56use super::{ObservableItemsTransaction, rfind_event_by_item_id};
57use crate::timeline::{
58    BeaconInfo, EventTimelineItem, LiveLocationState, MsgLikeContent, MsgLikeKind, PollState,
59    ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
60    event_item::beacon_info_matches,
61};
62
63#[derive(Clone)]
64pub(in crate::timeline) enum PendingEditKind {
65    RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
66    Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
67}
68
69impl std::fmt::Debug for PendingEditKind {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match self {
72            Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
73            Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
74        }
75    }
76}
77
78#[derive(Clone, Debug)]
79pub(in crate::timeline) struct PendingEdit {
80    /// The kind of edit this is.
81    pub kind: PendingEditKind,
82
83    /// The raw JSON for the edit.
84    pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
85
86    /// The encryption info for this edit.
87    pub encryption_info: Option<Arc<EncryptionInfo>>,
88
89    /// If provided, this is the identifier of a remote event item that included
90    /// this bundled edit.
91    pub bundled_item_owner: Option<OwnedEventId>,
92}
93
94/// Which kind of aggregation (related event) is this?
95#[derive(Clone, Debug)]
96pub(crate) enum AggregationKind {
97    /// This is a response to a poll.
98    PollResponse {
99        /// Sender of the poll's response.
100        sender: OwnedUserId,
101        /// Timestamp at which the response has beens ent.
102        timestamp: MilliSecondsSinceUnixEpoch,
103        /// All the answers to the poll sent by the sender.
104        answers: Vec<String>,
105    },
106
107    /// This is the marker of the end of a poll.
108    PollEnd {
109        /// Timestamp at which the poll ends, i.e. all the responses with a
110        /// timestamp prior to this one should be taken into account
111        /// (and all the responses with a timestamp after this one
112        /// should be dropped).
113        end_date: MilliSecondsSinceUnixEpoch,
114    },
115
116    /// This is a reaction to another event.
117    Reaction {
118        /// The reaction "key" displayed by the client, often an emoji.
119        key: String,
120        /// Sender of the reaction.
121        sender: OwnedUserId,
122        /// Timestamp at which the reaction has been sent.
123        timestamp: MilliSecondsSinceUnixEpoch,
124        /// The send status of the reaction this is, with handles to abort it if
125        /// we can, etc.
126        reaction_status: ReactionStatus,
127    },
128
129    /// An event has been redacted.
130    Redaction {
131        /// Whether this aggregation results from the local echo of a redaction.
132        /// Local echoes of redactions are applied reversibly whereas remote
133        /// echoes of redactions are applied irreversibly.
134        is_local: bool,
135    },
136
137    /// An event has been edited.
138    ///
139    /// Note that edits can't be applied in isolation; we need to identify what
140    /// the *latest* edit is, based on the event ordering. As such, they're
141    /// handled exceptionally in `Aggregation::apply` and
142    /// `Aggregation::unapply`, and the callers have the responsibility of
143    /// considering all the edits and applying only the right one.
144    Edit(PendingEdit),
145
146    /// A location update for a live location sharing session (MSC3489).
147    BeaconUpdate { location: BeaconInfo },
148
149    /// A stop event for a live location sharing session (MSC3489).
150    ///
151    /// Carries the new (non-live) [`BeaconInfoEventContent`] that should
152    /// replace the stored content on the target item, flipping
153    /// [`LiveLocationState::is_live`] to `false`.
154    ///
155    /// Unlike [`BeaconUpdate`], a beacon stop is not reversible.
156    BeaconStop { content: BeaconInfoEventContent },
157}
158
159/// An aggregation is an event related to another event (for instance a
160/// reaction, a poll's response, etc.).
161///
162/// It can be either a local or a remote echo.
163#[derive(Clone, Debug)]
164pub(crate) struct Aggregation {
165    /// The kind of aggregation this represents.
166    pub kind: AggregationKind,
167
168    /// The own timeline identifier for an aggregation.
169    ///
170    /// It will be a transaction id when the aggregation is still a local echo,
171    /// and it will transition into an event id when the aggregation is a
172    /// remote echo (i.e. has been received in a sync response):
173    pub own_id: TimelineEventItemId,
174}
175
176/// Get the poll state from a given [`TimelineItemContent`].
177fn poll_state_from_item<'a>(
178    event: &'a mut Cow<'_, EventTimelineItem>,
179) -> Result<&'a mut PollState, AggregationError> {
180    if event.content().is_poll() {
181        // It was a poll! Now return the state as mutable.
182        let state = as_variant!(
183            event.to_mut().content_mut(),
184            TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
185        )
186        .expect("it was a poll just above");
187        Ok(state)
188    } else {
189        Err(AggregationError::InvalidType {
190            expected: "a poll".to_owned(),
191            actual: event.content().debug_string().to_owned(),
192        })
193    }
194}
195
196/// Get the [`LiveLocationState`] from a given [`TimelineItemContent`], mutably.
197fn live_location_state_from_item<'a>(
198    event: &'a mut Cow<'_, EventTimelineItem>,
199) -> Result<&'a mut LiveLocationState, AggregationError> {
200    if event.content().is_live_location() {
201        // It was a live location! Now return the state as mutable.
202        let state = event
203            .to_mut()
204            .content_mut()
205            .as_live_location_state_mut()
206            .expect("it was a live location just above");
207        Ok(state)
208    } else {
209        Err(AggregationError::InvalidType {
210            expected: "a live location".to_owned(),
211            actual: event.content().debug_string().to_owned(),
212        })
213    }
214}
215
216impl Aggregation {
217    /// Create a new [`Aggregation`].
218    pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
219        Self { kind, own_id }
220    }
221
222    /// Apply an aggregation in-place to a given [`TimelineItemContent`].
223    ///
224    /// In case of success, returns an enum indicating whether the applied
225    /// aggregation had an effect on the content; if it updated it, then the
226    /// caller has the responsibility to reflect that change.
227    ///
228    /// In case of error, returns an error detailing why the aggregation
229    /// couldn't be applied.
230    fn apply(
231        &self,
232        event: &mut Cow<'_, EventTimelineItem>,
233        rules: &RoomVersionRules,
234    ) -> ApplyAggregationResult {
235        match &self.kind {
236            AggregationKind::PollResponse { sender, timestamp, answers } => {
237                match poll_state_from_item(event) {
238                    Ok(state) => {
239                        state.add_response(sender.clone(), *timestamp, answers.clone());
240                        ApplyAggregationResult::UpdatedItem
241                    }
242                    Err(err) => ApplyAggregationResult::Error(err),
243                }
244            }
245
246            AggregationKind::Redaction { is_local } => {
247                let is_local_redacted =
248                    event.content().is_redacted() && event.unredacted_item.is_some();
249                let is_remote_redacted =
250                    event.content().is_redacted() && event.unredacted_item.is_none();
251                if *is_local && is_local_redacted || !*is_local && is_remote_redacted {
252                    ApplyAggregationResult::LeftItemIntact
253                } else {
254                    let new_item = event.redact(&rules.redaction, *is_local);
255                    *event = Cow::Owned(new_item);
256                    ApplyAggregationResult::UpdatedItem
257                }
258            }
259
260            AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
261                Ok(state) => {
262                    if !state.end(*end_date) {
263                        return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
264                    }
265                    ApplyAggregationResult::UpdatedItem
266                }
267                Err(err) => ApplyAggregationResult::Error(err),
268            },
269
270            AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
271                let Some(reactions) = event.content().reactions() else {
272                    // An item that can't hold any reactions.
273                    return ApplyAggregationResult::LeftItemIntact;
274                };
275
276                let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
277
278                // If the reaction was already added to the item, we don't need to add it back.
279                //
280                // Search for a previous reaction that would be equivalent.
281
282                let is_same = previous_reaction.is_some_and(|prev| {
283                    prev.timestamp == *timestamp
284                        && matches!(
285                            (&prev.status, reaction_status),
286                            (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
287                                | (
288                                    ReactionStatus::LocalToRemote(_),
289                                    ReactionStatus::LocalToRemote(_),
290                                )
291                                | (
292                                    ReactionStatus::RemoteToRemote(_),
293                                    ReactionStatus::RemoteToRemote(_),
294                                )
295                        )
296                });
297
298                if is_same {
299                    ApplyAggregationResult::LeftItemIntact
300                } else {
301                    let reactions = event
302                        .to_mut()
303                        .content_mut()
304                        .reactions_mut()
305                        .expect("reactions was Some above");
306
307                    reactions.entry(key.clone()).or_default().insert(
308                        sender.clone(),
309                        ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
310                    );
311
312                    ApplyAggregationResult::UpdatedItem
313                }
314            }
315
316            AggregationKind::Edit(_) => {
317                // Let the caller handle the edit.
318                ApplyAggregationResult::Edit
319            }
320
321            AggregationKind::BeaconUpdate { location } => {
322                match live_location_state_from_item(event) {
323                    Ok(state) => {
324                        state.add_location(location.clone());
325                        ApplyAggregationResult::UpdatedItem
326                    }
327                    Err(err) => ApplyAggregationResult::Error(err),
328                }
329            }
330
331            AggregationKind::BeaconStop { content } => match live_location_state_from_item(event) {
332                Ok(state) => {
333                    state.stop(content.clone());
334                    ApplyAggregationResult::UpdatedItem
335                }
336                Err(err) => ApplyAggregationResult::Error(err),
337            },
338        }
339    }
340
341    /// Undo an aggregation in-place to a given [`TimelineItemContent`].
342    ///
343    /// In case of success, returns an enum indicating whether unapplying the
344    /// aggregation had an effect on the content; if it updated it, then the
345    /// caller has the responsibility to reflect that change.
346    ///
347    /// In case of error, returns an error detailing why the aggregation
348    /// couldn't be unapplied.
349    fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
350        match &self.kind {
351            AggregationKind::PollResponse { sender, timestamp, .. } => {
352                let state = match poll_state_from_item(event) {
353                    Ok(state) => state,
354                    Err(err) => return ApplyAggregationResult::Error(err),
355                };
356                state.remove_response(sender, *timestamp);
357                ApplyAggregationResult::UpdatedItem
358            }
359
360            AggregationKind::PollEnd { .. } => {
361                // Assume we can't undo a poll end event at the moment.
362                ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
363            }
364
365            AggregationKind::Redaction { is_local } => {
366                if *is_local {
367                    if event.unredacted_item.is_some() {
368                        // Unapply local redaction.
369                        *event = Cow::Owned(event.unredact());
370                        ApplyAggregationResult::UpdatedItem
371                    } else {
372                        // Event isn't locally redacted. Nothing to do.
373                        ApplyAggregationResult::LeftItemIntact
374                    }
375                } else {
376                    // Remote redactions are not reversible.
377                    ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
378                }
379            }
380
381            AggregationKind::Reaction { key, sender, .. } => {
382                let Some(reactions) = event.content().reactions() else {
383                    // An item that can't hold any reactions.
384                    return ApplyAggregationResult::LeftItemIntact;
385                };
386
387                // We only need to remove the previous reaction if it was there.
388                //
389                // Search for it.
390
391                let had_entry =
392                    reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
393
394                if had_entry {
395                    let reactions = event
396                        .to_mut()
397                        .content_mut()
398                        .reactions_mut()
399                        .expect("reactions was some above");
400                    let by_user = reactions.get_mut(key);
401                    if let Some(by_user) = by_user {
402                        by_user.swap_remove(sender);
403                        // If this was the last reaction, remove the entire map for this key.
404                        if by_user.is_empty() {
405                            reactions.swap_remove(key);
406                        }
407                    }
408                    ApplyAggregationResult::UpdatedItem
409                } else {
410                    ApplyAggregationResult::LeftItemIntact
411                }
412            }
413
414            AggregationKind::Edit(_) => {
415                // Let the caller handle the edit.
416                ApplyAggregationResult::Edit
417            }
418
419            AggregationKind::BeaconUpdate { location } => {
420                match live_location_state_from_item(event) {
421                    Ok(state) => {
422                        state.remove_location(location.ts);
423                        ApplyAggregationResult::UpdatedItem
424                    }
425                    Err(err) => ApplyAggregationResult::Error(err),
426                }
427            }
428
429            AggregationKind::BeaconStop { .. } => {
430                // Stopping a live location share is not reversible.
431                ApplyAggregationResult::Error(AggregationError::CantUndoBeaconStop)
432            }
433        }
434    }
435}
436
437/// Manager for all known existing aggregations to all events in the timeline.
438#[derive(Clone, Debug, Default)]
439pub(crate) struct Aggregations {
440    /// Mapping of a target event to its list of aggregations.
441    related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
442
443    /// Mapping of a related event identifier to its target.
444    inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
445
446    /// A pending beacon-stop aggregation received before the corresponding live
447    /// `beacon_info` start item has arrived.
448    ///
449    /// Keyed by the sender's user ID. When a live start item is eventually
450    /// inserted via `add_item`, we check if the pending stop matches and
451    /// promote it into [`Self::related_events`] so that [`Self::apply_all`]
452    /// can apply it immediately.
453    pending_beacon_stops: HashMap<OwnedUserId, Aggregation>,
454}
455
456impl Aggregations {
457    /// Clear all the known aggregations from all the mappings.
458    pub fn clear(&mut self) {
459        self.related_events.clear();
460        self.inverted_map.clear();
461        self.pending_beacon_stops.clear();
462    }
463
464    /// Stash a [`AggregationKind::BeaconStop`] that arrived before its target
465    /// live `beacon_info` item. It will be promoted into
466    /// [`Self::related_events`] (and thus picked up by [`Self::apply_all`])
467    /// when the live item is inserted via
468    /// [`Self::promote_pending_beacon_stop`].
469    pub fn add_pending_beacon_stop(&mut self, sender: OwnedUserId, aggregation: Aggregation) {
470        self.pending_beacon_stops.insert(sender, aggregation);
471    }
472
473    /// Promote a matching stashed beacon-stop aggregation for `sender` into the
474    /// regular aggregation map, now that the live start item's
475    /// `target_event_id` is known.
476    ///
477    /// The pending stop's content must match the start event's content (except
478    /// for the `live` field) for promotion to occur. If they don't match, the
479    /// pending stop is discarded because it belongs to a different session.
480    ///
481    /// Should be called from `add_item` just before `apply_all`, when inserting
482    /// a live `beacon_info` item.
483    fn promote_pending_beacon_stop(
484        &mut self,
485        sender: &OwnedUserId,
486        target_event_id: OwnedEventId,
487        start_content: &BeaconInfoEventContent,
488    ) {
489        if !start_content.live {
490            return;
491        }
492
493        let Some(stop) = self.pending_beacon_stops.remove(sender) else { return };
494
495        let AggregationKind::BeaconStop { content: stop_content } = &stop.kind else {
496            warn!("pending beacon stop has unexpected aggregation kind");
497            return;
498        };
499
500        if !beacon_info_matches(start_content, stop_content) {
501            trace!("discarding stale pending beacon stop (content mismatch)");
502            return;
503        }
504
505        let target = TimelineEventItemId::EventId(target_event_id);
506        self.add(target, stop);
507    }
508
509    /// Add a given aggregation that relates to the [`TimelineItemContent`]
510    /// identified by the given [`TimelineEventItemId`].
511    pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
512        // If the aggregation is a redaction, it invalidates all the other aggregations;
513        // remove them.
514        if matches!(aggregation.kind, AggregationKind::Redaction { .. }) {
515            for agg in self.related_events.remove(&related_to).unwrap_or_default() {
516                self.inverted_map.remove(&agg.own_id);
517            }
518        }
519
520        // If there was any redaction among the current aggregation, adding a new one
521        // should be a noop.
522        if let Some(previous_aggregations) = self.related_events.get(&related_to)
523            && previous_aggregations
524                .iter()
525                .any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. }))
526        {
527            return;
528        }
529
530        self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
531
532        // We can have 3 different states for the same aggregation in related_events, in
533        // chronological order:
534        //
535        // 1. The local echo with a transaction ID.
536        // 2. The local echo with the event ID returned by the server after sending the
537        //    event.
538        // 3. The remote echo received via sync.
539        //
540        // The transition from states 1 to 2 is handled in `mark_aggregation_as_sent()`.
541        // So here we need to handle the transition from states 2 to 3. We need to
542        // replace the local echo by the remote echo, which might have more data, like
543        // the raw JSON.
544        let related_events = self.related_events.entry(related_to).or_default();
545        if let Some(pos) = related_events.iter().position(|agg| agg.own_id == aggregation.own_id) {
546            related_events.remove(pos);
547        }
548        related_events.push(aggregation);
549    }
550
551    /// Is the given id one for a known aggregation to another event?
552    ///
553    /// If so, unapplies it by replacing the corresponding related item, if
554    /// needs be.
555    ///
556    /// Returns true if an aggregation was found. This doesn't mean
557    /// the underlying item has been updated, if it was missing from the
558    /// timeline for instance.
559    ///
560    /// May return an error if it found an aggregation, but it couldn't be
561    /// properly applied.
562    pub fn try_remove_aggregation(
563        &mut self,
564        aggregation_id: &TimelineEventItemId,
565        items: &mut ObservableItemsTransaction<'_>,
566    ) -> Result<bool, AggregationError> {
567        let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
568
569        // Find and remove the aggregation in the other mapping.
570        let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
571            let removed = aggregations
572                .iter()
573                .position(|agg| agg.own_id == *aggregation_id)
574                .map(|idx| aggregations.remove(idx));
575
576            // If this was the last aggregation, remove the entry in the `related_events`
577            // mapping.
578            if aggregations.is_empty() {
579                self.related_events.remove(found);
580            }
581
582            removed
583        } else {
584            None
585        };
586
587        let Some(aggregation) = aggregation else {
588            warn!(
589                "incorrect internal state: {aggregation_id:?} was present in the inverted map, \
590                 not in related-to map."
591            );
592            return Ok(false);
593        };
594
595        if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
596            let mut cowed = Cow::Borrowed(&*item);
597            match aggregation.unapply(&mut cowed) {
598                ApplyAggregationResult::UpdatedItem => {
599                    trace!("removed aggregation");
600                    items.replace(
601                        item_pos,
602                        TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
603                    );
604                }
605                ApplyAggregationResult::LeftItemIntact => {}
606                ApplyAggregationResult::Error(err) => {
607                    warn!("error when unapplying aggregation: {err}");
608                }
609                ApplyAggregationResult::Edit => {
610                    // This edit has been removed; try to find another that still applies.
611                    if let Some(aggregations) = self.related_events.get(found) {
612                        if resolve_edits(aggregations, items, &mut cowed) {
613                            items.replace(
614                                item_pos,
615                                TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
616                            );
617                        } else {
618                            // No other edit was found, leave the item as is.
619                            // TODO likely need to change the item to indicate
620                            // it's been un-edited etc.
621                        }
622                    } else {
623                        // No other edits apply.
624                    }
625                }
626            }
627        } else {
628            info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
629        }
630
631        Ok(true)
632    }
633
634    /// Apply all the aggregations to a [`TimelineItemContent`].
635    ///
636    /// If `sender` is provided alongside a remote `item_id`, any
637    /// [`AggregationKind::BeaconStop`] events that arrived out-of-order (i.e.
638    /// before the live `beacon_info` start item) are first promoted from the
639    /// pending-stops stash into the regular aggregation map so they are picked
640    /// up here together with every other pending aggregation for this item.
641    ///
642    /// Will return an error at the first aggregation that couldn't be applied;
643    /// see [`Aggregation::apply`] which explains under which conditions it can
644    /// happen.
645    pub fn apply_all(
646        &mut self,
647        item_id: &TimelineEventItemId,
648        sender: &OwnedUserId,
649        event: &mut Cow<'_, EventTimelineItem>,
650        items: &mut ObservableItemsTransaction<'_>,
651        rules: &RoomVersionRules,
652    ) -> Result<(), AggregationError> {
653        // If a beacon-stop arrived before this live start item, it was stashed
654        // in `pending_beacon_stops` keyed by sender. Promote it into
655        // `related_events` under the now-known start event ID so the loop below
656        // applies it together with any other pending aggregations.
657        //
658        // The promotion verifies that the pending stop's content matches the
659        // start event's content to ensure we don't apply an old stop to a new
660        // session.
661        if let TimelineEventItemId::EventId(event_id) = item_id
662            && let Some(live_location) = event.content().as_live_location_state()
663        {
664            self.promote_pending_beacon_stop(sender, event_id.clone(), &live_location.beacon_info);
665        }
666
667        let Some(aggregations) = self.related_events.get(item_id) else {
668            return Ok(());
669        };
670
671        let mut has_edits = false;
672
673        for a in aggregations {
674            match a.apply(event, rules) {
675                ApplyAggregationResult::Edit => {
676                    has_edits = true;
677                }
678                ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
679                ApplyAggregationResult::Error(err) => return Err(err),
680            }
681        }
682
683        if has_edits {
684            resolve_edits(aggregations, items, event);
685        }
686
687        Ok(())
688    }
689
690    /// Mark a target event as being sent (i.e. it transitions from an local
691    /// transaction id to its remote event id counterpart), by updating the
692    /// internal mappings.
693    pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
694        let from = TimelineEventItemId::TransactionId(txn_id);
695        let to = TimelineEventItemId::EventId(event_id);
696
697        // Update the aggregations in the `related_events` field.
698        if let Some(aggregations) = self.related_events.remove(&from) {
699            // Update the inverted mappings (from aggregation's id, to the new target id).
700            for a in &aggregations {
701                if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
702                    debug_assert_eq!(prev_target, from);
703                    self.inverted_map.insert(a.own_id.clone(), to.clone());
704                }
705            }
706            // Update the direct mapping of target -> aggregations.
707            self.related_events.entry(to).or_default().extend(aggregations);
708        }
709    }
710
711    /// Mark an aggregation event as being sent (i.e. it transitions from an
712    /// local transaction id to its remote event id counterpart), by
713    /// updating the internal mappings.
714    ///
715    /// When an aggregation has been marked as sent, it may need to be reapplied
716    /// to the corresponding [`TimelineItemContent`]; this is why we're also
717    /// passing the context to apply an aggregation here.
718    pub fn mark_aggregation_as_sent(
719        &mut self,
720        txn_id: OwnedTransactionId,
721        event_id: OwnedEventId,
722        items: &mut ObservableItemsTransaction<'_>,
723        rules: &RoomVersionRules,
724    ) -> bool {
725        let from = TimelineEventItemId::TransactionId(txn_id);
726        let to = TimelineEventItemId::EventId(event_id.clone());
727
728        let Some(target) = self.inverted_map.remove(&from) else {
729            return false;
730        };
731
732        if let Some(aggregations) = self.related_events.get_mut(&target)
733            && let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
734        {
735            found.own_id = to.clone();
736
737            match &mut found.kind {
738                AggregationKind::PollResponse { .. }
739                | AggregationKind::PollEnd { .. }
740                | AggregationKind::Edit(..)
741                | AggregationKind::BeaconUpdate { .. }
742                | AggregationKind::BeaconStop { .. } => {
743                    // Nothing particular to do.
744                }
745
746                AggregationKind::Redaction { is_local } => {
747                    // Mark the redaction as being remote and apply it (irreversibly).
748                    *is_local = false;
749
750                    let found = found.clone();
751                    find_item_and_apply_aggregation(self, items, &target, found, rules);
752                }
753
754                AggregationKind::Reaction { reaction_status, .. } => {
755                    // Mark the reaction as becoming remote, and signal that update to the
756                    // caller.
757                    *reaction_status = ReactionStatus::RemoteToRemote(event_id);
758
759                    let found = found.clone();
760                    find_item_and_apply_aggregation(self, items, &target, found, rules);
761                }
762            }
763        }
764
765        self.inverted_map.insert(to, target);
766        true
767    }
768
769    /// Returns the id of the event this aggregation relates to, if it's a known
770    /// aggregation.
771    pub fn is_aggregation_of(&self, item: &TimelineEventItemId) -> Option<&TimelineEventItemId> {
772        self.inverted_map.get(item)
773    }
774}
775
776/// Look at all the edits of a given event, and apply the most recent one, if
777/// found.
778///
779/// Returns true if an edit was found and applied, false otherwise.
780fn resolve_edits(
781    aggregations: &[Aggregation],
782    items: &ObservableItemsTransaction<'_>,
783    event: &mut Cow<'_, EventTimelineItem>,
784) -> bool {
785    // A tuple of the best edit, if we have found one and a boolean indicating if
786    // the edit is coming from a local echo. If it's from a local echo, we can't
787    // validate it as we don't have a raw JSON, but this isn't that important as
788    // we're sure we won't send ourselves invalid edits.
789    let mut best_edit: Option<(PendingEdit, bool)> = None;
790    let mut best_edit_pos = None;
791
792    for a in aggregations {
793        if let AggregationKind::Edit(pending_edit) = &a.kind {
794            match &a.own_id {
795                TimelineEventItemId::TransactionId(_) => {
796                    // A local echo is always the most recent edit: use this one.
797                    best_edit = Some((pending_edit.clone(), true));
798                    break;
799                }
800
801                TimelineEventItemId::EventId(event_id) => {
802                    if let Some(best_edit_pos) = &mut best_edit_pos {
803                        // Find the position of the timeline owning the edit: either the bundled
804                        // item owner if this was a bundled edit, or the edit event itself.
805                        let pos = items.position_by_event_id(
806                            pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
807                        );
808
809                        if let Some(pos) = pos {
810                            // If the edit is more recent (higher index) than the previous best
811                            // edit we knew about, use this one.
812                            if pos > *best_edit_pos {
813                                best_edit = Some((pending_edit.clone(), false));
814                                *best_edit_pos = pos;
815                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
816                            }
817                        } else {
818                            trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
819
820                            // The edit event isn't in the timeline, so it might be a bundled
821                            // edit. In this case, record it as the best edit if and only if
822                            // there wasn't any other.
823                            if best_edit.is_none() {
824                                best_edit = Some((pending_edit.clone(), false));
825                                trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
826                            }
827                        }
828                    } else {
829                        // There wasn't any best edit yet, so record this one as being it, with
830                        // its position.
831                        best_edit = Some((pending_edit.clone(), false));
832                        best_edit_pos = items.position_by_event_id(event_id);
833                        trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
834                    }
835                }
836            }
837        }
838    }
839
840    if let Some((edit, is_local_echo)) = best_edit {
841        edit_item(event, edit, is_local_echo)
842    } else {
843        false
844    }
845}
846
847/// Apply the selected edit to the given EventTimelineItem.
848///
849/// Returns true if the edit was applied, false otherwise (because the edit and
850/// original timeline item types didn't match, for instance).
851fn edit_item(
852    item: &mut Cow<'_, EventTimelineItem>,
853    edit: PendingEdit,
854    is_local_echo: bool,
855) -> bool {
856    // We can receive edits from a local echo, i.e. the edit wasn't yet received
857    // from the homeserver.
858    //
859    // Before we send an edit we check that the event is allowed to be edited and
860    // that the replacement content is allowed.
861    //
862    // We don't have yet a full JSON of the event, so we can't do the validation
863    // here.
864    if !is_local_echo {
865        let Some(original_json) = item.original_json() else {
866            error!("The original event does not have the JSON field set.");
867            return false;
868        };
869
870        let Some(edit_json) = &edit.edit_json else {
871            error!(
872                "The replacement event of a remotely received edit does not have the JSON field set."
873            );
874            return false;
875        };
876
877        match check_validity_of_replacement_events(
878            original_json,
879            item.encryption_info(),
880            edit_json,
881            edit.encryption_info.as_deref(),
882        ) {
883            Ok(content) => content,
884            Err(e) => {
885                warn!("Event wasn't replaced due to the replacement event being invalid: {e}");
886                return false;
887            }
888        }
889    }
890
891    let TimelineItemContent::MsgLike(content) = item.content() else {
892        info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
893        return false;
894    };
895
896    let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
897
898    match (edit_kind, content) {
899        (
900            PendingEditKind::RoomMessage(replacement),
901            MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
902        ) => {
903            // First combination: it's a message edit for a message. Good.
904            let mut new_msg = msg.clone();
905            new_msg.apply_edit(replacement.new_content);
906
907            let new_item = item.with_content_and_latest_edit(
908                TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
909                edit_json,
910            );
911            *item = Cow::Owned(new_item);
912        }
913
914        (
915            PendingEditKind::Poll(replacement),
916            MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
917        ) => {
918            // Second combination: it's a poll edit for a poll. Good.
919            if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
920                let new_item = item.with_content_and_latest_edit(
921                    TimelineItemContent::MsgLike(
922                        content.with_kind(MsgLikeKind::Poll(new_poll_state)),
923                    ),
924                    edit_json,
925                );
926                *item = Cow::Owned(new_item);
927            } else {
928                // The poll has ended, so we can't edit it anymore.
929                return false;
930            }
931        }
932
933        (edit_kind, _) => {
934            // Invalid combination.
935            info!(
936                content = item.content().debug_string(),
937                edit = format!("{:?}", edit_kind),
938                "Mismatch between edit type and content type",
939            );
940            return false;
941        }
942    }
943
944    if let Some(encryption_info) = encryption_info {
945        *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
946    }
947
948    true
949}
950
951/// Find an item identified by the target identifier, and apply the aggregation
952/// onto it.
953///
954/// Returns the updated [`EventTimelineItem`] if the aggregation was applied, or
955/// `None` otherwise.
956pub(crate) fn find_item_and_apply_aggregation(
957    aggregations: &Aggregations,
958    items: &mut ObservableItemsTransaction<'_>,
959    target: &TimelineEventItemId,
960    aggregation: Aggregation,
961    rules: &RoomVersionRules,
962) -> Option<EventTimelineItem> {
963    let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
964        trace!("couldn't find aggregation's target {target:?}");
965        return None;
966    };
967
968    let mut cowed = Cow::Borrowed(&*event_item);
969    match aggregation.apply(&mut cowed, rules) {
970        ApplyAggregationResult::UpdatedItem => {
971            trace!("applied aggregation");
972            let new_event_item = cowed.into_owned();
973            let new_item =
974                TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
975            items.replace(idx, new_item);
976            Some(new_event_item)
977        }
978        ApplyAggregationResult::Edit => {
979            if let Some(aggregations) = aggregations.related_events.get(target)
980                && resolve_edits(aggregations, items, &mut cowed)
981            {
982                let new_event_item = cowed.into_owned();
983                let new_item =
984                    TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
985                items.replace(idx, new_item);
986                return Some(new_event_item);
987            }
988            None
989        }
990        ApplyAggregationResult::LeftItemIntact => {
991            trace!("applying the aggregation had no effect");
992            None
993        }
994        ApplyAggregationResult::Error(err) => {
995            warn!("error when applying aggregation: {err}");
996            None
997        }
998    }
999}
1000
1001/// The result of applying (or unapplying) an aggregation onto a timeline item.
1002enum ApplyAggregationResult {
1003    /// The passed `Cow<EventTimelineItem>` has been cloned and updated.
1004    UpdatedItem,
1005
1006    /// An edit must be included in the edit set and resolved later, using the
1007    /// relative position of the edits.
1008    Edit,
1009
1010    /// The item hasn't been modified after applying the aggregation, because it
1011    /// was likely already applied prior to this.
1012    LeftItemIntact,
1013
1014    /// An error happened while applying the aggregation.
1015    Error(AggregationError),
1016}
1017
1018#[derive(Debug, thiserror::Error)]
1019pub(crate) enum AggregationError {
1020    #[error("trying to end a poll twice")]
1021    PollAlreadyEnded,
1022
1023    #[error("a poll end can't be unapplied")]
1024    CantUndoPollEnd,
1025
1026    #[error("a redaction can't be unapplied")]
1027    CantUndoRedaction,
1028
1029    #[error("a beacon stop can't be unapplied")]
1030    CantUndoBeaconStop,
1031
1032    #[error(
1033        "trying to apply an aggregation of one type to an invalid target: \
1034         expected {expected}, actual {actual}"
1035    )]
1036    InvalidType { expected: String, actual: String },
1037}