matrix_sdk_ui/timeline/controller/aggregations.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::{borrow::Cow, collections::HashMap, sync::Arc};
41
42use as_variant::as_variant;
43use matrix_sdk::{check_validity_of_replacement_events, deserialized_responses::EncryptionInfo};
44use ruma::{
45 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
46 events::{
47 AnySyncTimelineEvent, beacon_info::BeaconInfoEventContent,
48 poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation,
49 relation::Replacement, room::message::RoomMessageEventContentWithoutRelation,
50 },
51 room_version_rules::RoomVersionRules,
52 serde::Raw,
53};
54use tracing::{error, info, trace, warn};
55
56use super::{ObservableItemsTransaction, rfind_event_by_item_id};
57use crate::timeline::{
58 BeaconInfo, EventTimelineItem, LiveLocationState, MsgLikeContent, MsgLikeKind, PollState,
59 ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
60 event_item::beacon_info_matches,
61};
62
63#[derive(Clone)]
64pub(in crate::timeline) enum PendingEditKind {
65 RoomMessage(Replacement<RoomMessageEventContentWithoutRelation>),
66 Poll(Replacement<NewUnstablePollStartEventContentWithoutRelation>),
67}
68
69impl std::fmt::Debug for PendingEditKind {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 Self::RoomMessage(_) => f.debug_struct("RoomMessage").finish_non_exhaustive(),
73 Self::Poll(_) => f.debug_struct("Poll").finish_non_exhaustive(),
74 }
75 }
76}
77
78#[derive(Clone, Debug)]
79pub(in crate::timeline) struct PendingEdit {
80 /// The kind of edit this is.
81 pub kind: PendingEditKind,
82
83 /// The raw JSON for the edit.
84 pub edit_json: Option<Raw<AnySyncTimelineEvent>>,
85
86 /// The encryption info for this edit.
87 pub encryption_info: Option<Arc<EncryptionInfo>>,
88
89 /// If provided, this is the identifier of a remote event item that included
90 /// this bundled edit.
91 pub bundled_item_owner: Option<OwnedEventId>,
92}
93
94/// Which kind of aggregation (related event) is this?
95#[derive(Clone, Debug)]
96pub(crate) enum AggregationKind {
97 /// This is a response to a poll.
98 PollResponse {
99 /// Sender of the poll's response.
100 sender: OwnedUserId,
101 /// Timestamp at which the response has beens ent.
102 timestamp: MilliSecondsSinceUnixEpoch,
103 /// All the answers to the poll sent by the sender.
104 answers: Vec<String>,
105 },
106
107 /// This is the marker of the end of a poll.
108 PollEnd {
109 /// Timestamp at which the poll ends, i.e. all the responses with a
110 /// timestamp prior to this one should be taken into account
111 /// (and all the responses with a timestamp after this one
112 /// should be dropped).
113 end_date: MilliSecondsSinceUnixEpoch,
114 },
115
116 /// This is a reaction to another event.
117 Reaction {
118 /// The reaction "key" displayed by the client, often an emoji.
119 key: String,
120 /// Sender of the reaction.
121 sender: OwnedUserId,
122 /// Timestamp at which the reaction has been sent.
123 timestamp: MilliSecondsSinceUnixEpoch,
124 /// The send status of the reaction this is, with handles to abort it if
125 /// we can, etc.
126 reaction_status: ReactionStatus,
127 },
128
129 /// An event has been redacted.
130 Redaction {
131 /// Whether this aggregation results from the local echo of a redaction.
132 /// Local echoes of redactions are applied reversibly whereas remote
133 /// echoes of redactions are applied irreversibly.
134 is_local: bool,
135 },
136
137 /// An event has been edited.
138 ///
139 /// Note that edits can't be applied in isolation; we need to identify what
140 /// the *latest* edit is, based on the event ordering. As such, they're
141 /// handled exceptionally in `Aggregation::apply` and
142 /// `Aggregation::unapply`, and the callers have the responsibility of
143 /// considering all the edits and applying only the right one.
144 Edit(PendingEdit),
145
146 /// A location update for a live location sharing session (MSC3489).
147 BeaconUpdate { location: BeaconInfo },
148
149 /// A stop event for a live location sharing session (MSC3489).
150 ///
151 /// Carries the new (non-live) [`BeaconInfoEventContent`] that should
152 /// replace the stored content on the target item, flipping
153 /// [`LiveLocationState::is_live`] to `false`.
154 ///
155 /// Unlike [`BeaconUpdate`], a beacon stop is not reversible.
156 BeaconStop { content: BeaconInfoEventContent },
157}
158
159/// An aggregation is an event related to another event (for instance a
160/// reaction, a poll's response, etc.).
161///
162/// It can be either a local or a remote echo.
163#[derive(Clone, Debug)]
164pub(crate) struct Aggregation {
165 /// The kind of aggregation this represents.
166 pub kind: AggregationKind,
167
168 /// The own timeline identifier for an aggregation.
169 ///
170 /// It will be a transaction id when the aggregation is still a local echo,
171 /// and it will transition into an event id when the aggregation is a
172 /// remote echo (i.e. has been received in a sync response):
173 pub own_id: TimelineEventItemId,
174}
175
176/// Get the poll state from a given [`TimelineItemContent`].
177fn poll_state_from_item<'a>(
178 event: &'a mut Cow<'_, EventTimelineItem>,
179) -> Result<&'a mut PollState, AggregationError> {
180 if event.content().is_poll() {
181 // It was a poll! Now return the state as mutable.
182 let state = as_variant!(
183 event.to_mut().content_mut(),
184 TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
185 )
186 .expect("it was a poll just above");
187 Ok(state)
188 } else {
189 Err(AggregationError::InvalidType {
190 expected: "a poll".to_owned(),
191 actual: event.content().debug_string().to_owned(),
192 })
193 }
194}
195
196/// Get the [`LiveLocationState`] from a given [`TimelineItemContent`], mutably.
197fn live_location_state_from_item<'a>(
198 event: &'a mut Cow<'_, EventTimelineItem>,
199) -> Result<&'a mut LiveLocationState, AggregationError> {
200 if event.content().is_live_location() {
201 // It was a live location! Now return the state as mutable.
202 let state = event
203 .to_mut()
204 .content_mut()
205 .as_live_location_state_mut()
206 .expect("it was a live location just above");
207 Ok(state)
208 } else {
209 Err(AggregationError::InvalidType {
210 expected: "a live location".to_owned(),
211 actual: event.content().debug_string().to_owned(),
212 })
213 }
214}
215
216impl Aggregation {
217 /// Create a new [`Aggregation`].
218 pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
219 Self { kind, own_id }
220 }
221
222 /// Apply an aggregation in-place to a given [`TimelineItemContent`].
223 ///
224 /// In case of success, returns an enum indicating whether the applied
225 /// aggregation had an effect on the content; if it updated it, then the
226 /// caller has the responsibility to reflect that change.
227 ///
228 /// In case of error, returns an error detailing why the aggregation
229 /// couldn't be applied.
230 fn apply(
231 &self,
232 event: &mut Cow<'_, EventTimelineItem>,
233 rules: &RoomVersionRules,
234 ) -> ApplyAggregationResult {
235 match &self.kind {
236 AggregationKind::PollResponse { sender, timestamp, answers } => {
237 match poll_state_from_item(event) {
238 Ok(state) => {
239 state.add_response(sender.clone(), *timestamp, answers.clone());
240 ApplyAggregationResult::UpdatedItem
241 }
242 Err(err) => ApplyAggregationResult::Error(err),
243 }
244 }
245
246 AggregationKind::Redaction { is_local } => {
247 let is_local_redacted =
248 event.content().is_redacted() && event.unredacted_item.is_some();
249 let is_remote_redacted =
250 event.content().is_redacted() && event.unredacted_item.is_none();
251 if *is_local && is_local_redacted || !*is_local && is_remote_redacted {
252 ApplyAggregationResult::LeftItemIntact
253 } else {
254 let new_item = event.redact(&rules.redaction, *is_local);
255 *event = Cow::Owned(new_item);
256 ApplyAggregationResult::UpdatedItem
257 }
258 }
259
260 AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
261 Ok(state) => {
262 if !state.end(*end_date) {
263 return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
264 }
265 ApplyAggregationResult::UpdatedItem
266 }
267 Err(err) => ApplyAggregationResult::Error(err),
268 },
269
270 AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
271 let Some(reactions) = event.content().reactions() else {
272 // An item that can't hold any reactions.
273 return ApplyAggregationResult::LeftItemIntact;
274 };
275
276 let previous_reaction = reactions.get(key).and_then(|by_user| by_user.get(sender));
277
278 // If the reaction was already added to the item, we don't need to add it back.
279 //
280 // Search for a previous reaction that would be equivalent.
281
282 let is_same = previous_reaction.is_some_and(|prev| {
283 prev.timestamp == *timestamp
284 && matches!(
285 (&prev.status, reaction_status),
286 (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
287 | (
288 ReactionStatus::LocalToRemote(_),
289 ReactionStatus::LocalToRemote(_),
290 )
291 | (
292 ReactionStatus::RemoteToRemote(_),
293 ReactionStatus::RemoteToRemote(_),
294 )
295 )
296 });
297
298 if is_same {
299 ApplyAggregationResult::LeftItemIntact
300 } else {
301 let reactions = event
302 .to_mut()
303 .content_mut()
304 .reactions_mut()
305 .expect("reactions was Some above");
306
307 reactions.entry(key.clone()).or_default().insert(
308 sender.clone(),
309 ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
310 );
311
312 ApplyAggregationResult::UpdatedItem
313 }
314 }
315
316 AggregationKind::Edit(_) => {
317 // Let the caller handle the edit.
318 ApplyAggregationResult::Edit
319 }
320
321 AggregationKind::BeaconUpdate { location } => {
322 match live_location_state_from_item(event) {
323 Ok(state) => {
324 state.add_location(location.clone());
325 ApplyAggregationResult::UpdatedItem
326 }
327 Err(err) => ApplyAggregationResult::Error(err),
328 }
329 }
330
331 AggregationKind::BeaconStop { content } => match live_location_state_from_item(event) {
332 Ok(state) => {
333 state.stop(content.clone());
334 ApplyAggregationResult::UpdatedItem
335 }
336 Err(err) => ApplyAggregationResult::Error(err),
337 },
338 }
339 }
340
341 /// Undo an aggregation in-place to a given [`TimelineItemContent`].
342 ///
343 /// In case of success, returns an enum indicating whether unapplying the
344 /// aggregation had an effect on the content; if it updated it, then the
345 /// caller has the responsibility to reflect that change.
346 ///
347 /// In case of error, returns an error detailing why the aggregation
348 /// couldn't be unapplied.
349 fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
350 match &self.kind {
351 AggregationKind::PollResponse { sender, timestamp, .. } => {
352 let state = match poll_state_from_item(event) {
353 Ok(state) => state,
354 Err(err) => return ApplyAggregationResult::Error(err),
355 };
356 state.remove_response(sender, *timestamp);
357 ApplyAggregationResult::UpdatedItem
358 }
359
360 AggregationKind::PollEnd { .. } => {
361 // Assume we can't undo a poll end event at the moment.
362 ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
363 }
364
365 AggregationKind::Redaction { is_local } => {
366 if *is_local {
367 if event.unredacted_item.is_some() {
368 // Unapply local redaction.
369 *event = Cow::Owned(event.unredact());
370 ApplyAggregationResult::UpdatedItem
371 } else {
372 // Event isn't locally redacted. Nothing to do.
373 ApplyAggregationResult::LeftItemIntact
374 }
375 } else {
376 // Remote redactions are not reversible.
377 ApplyAggregationResult::Error(AggregationError::CantUndoRedaction)
378 }
379 }
380
381 AggregationKind::Reaction { key, sender, .. } => {
382 let Some(reactions) = event.content().reactions() else {
383 // An item that can't hold any reactions.
384 return ApplyAggregationResult::LeftItemIntact;
385 };
386
387 // We only need to remove the previous reaction if it was there.
388 //
389 // Search for it.
390
391 let had_entry =
392 reactions.get(key).and_then(|by_user| by_user.get(sender)).is_some();
393
394 if had_entry {
395 let reactions = event
396 .to_mut()
397 .content_mut()
398 .reactions_mut()
399 .expect("reactions was some above");
400 let by_user = reactions.get_mut(key);
401 if let Some(by_user) = by_user {
402 by_user.swap_remove(sender);
403 // If this was the last reaction, remove the entire map for this key.
404 if by_user.is_empty() {
405 reactions.swap_remove(key);
406 }
407 }
408 ApplyAggregationResult::UpdatedItem
409 } else {
410 ApplyAggregationResult::LeftItemIntact
411 }
412 }
413
414 AggregationKind::Edit(_) => {
415 // Let the caller handle the edit.
416 ApplyAggregationResult::Edit
417 }
418
419 AggregationKind::BeaconUpdate { location } => {
420 match live_location_state_from_item(event) {
421 Ok(state) => {
422 state.remove_location(location.ts);
423 ApplyAggregationResult::UpdatedItem
424 }
425 Err(err) => ApplyAggregationResult::Error(err),
426 }
427 }
428
429 AggregationKind::BeaconStop { .. } => {
430 // Stopping a live location share is not reversible.
431 ApplyAggregationResult::Error(AggregationError::CantUndoBeaconStop)
432 }
433 }
434 }
435}
436
437/// Manager for all known existing aggregations to all events in the timeline.
438#[derive(Clone, Debug, Default)]
439pub(crate) struct Aggregations {
440 /// Mapping of a target event to its list of aggregations.
441 related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
442
443 /// Mapping of a related event identifier to its target.
444 inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
445
446 /// A pending beacon-stop aggregation received before the corresponding live
447 /// `beacon_info` start item has arrived.
448 ///
449 /// Keyed by the sender's user ID. When a live start item is eventually
450 /// inserted via `add_item`, we check if the pending stop matches and
451 /// promote it into [`Self::related_events`] so that [`Self::apply_all`]
452 /// can apply it immediately.
453 pending_beacon_stops: HashMap<OwnedUserId, Aggregation>,
454}
455
456impl Aggregations {
457 /// Clear all the known aggregations from all the mappings.
458 pub fn clear(&mut self) {
459 self.related_events.clear();
460 self.inverted_map.clear();
461 self.pending_beacon_stops.clear();
462 }
463
464 /// Stash a [`AggregationKind::BeaconStop`] that arrived before its target
465 /// live `beacon_info` item. It will be promoted into
466 /// [`Self::related_events`] (and thus picked up by [`Self::apply_all`])
467 /// when the live item is inserted via
468 /// [`Self::promote_pending_beacon_stop`].
469 pub fn add_pending_beacon_stop(&mut self, sender: OwnedUserId, aggregation: Aggregation) {
470 self.pending_beacon_stops.insert(sender, aggregation);
471 }
472
473 /// Promote a matching stashed beacon-stop aggregation for `sender` into the
474 /// regular aggregation map, now that the live start item's
475 /// `target_event_id` is known.
476 ///
477 /// The pending stop's content must match the start event's content (except
478 /// for the `live` field) for promotion to occur. If they don't match, the
479 /// pending stop is discarded because it belongs to a different session.
480 ///
481 /// Should be called from `add_item` just before `apply_all`, when inserting
482 /// a live `beacon_info` item.
483 fn promote_pending_beacon_stop(
484 &mut self,
485 sender: &OwnedUserId,
486 target_event_id: OwnedEventId,
487 start_content: &BeaconInfoEventContent,
488 ) {
489 if !start_content.live {
490 return;
491 }
492
493 let Some(stop) = self.pending_beacon_stops.remove(sender) else { return };
494
495 let AggregationKind::BeaconStop { content: stop_content } = &stop.kind else {
496 warn!("pending beacon stop has unexpected aggregation kind");
497 return;
498 };
499
500 if !beacon_info_matches(start_content, stop_content) {
501 trace!("discarding stale pending beacon stop (content mismatch)");
502 return;
503 }
504
505 let target = TimelineEventItemId::EventId(target_event_id);
506 self.add(target, stop);
507 }
508
509 /// Add a given aggregation that relates to the [`TimelineItemContent`]
510 /// identified by the given [`TimelineEventItemId`].
511 pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
512 // If the aggregation is a redaction, it invalidates all the other aggregations;
513 // remove them.
514 if matches!(aggregation.kind, AggregationKind::Redaction { .. }) {
515 for agg in self.related_events.remove(&related_to).unwrap_or_default() {
516 self.inverted_map.remove(&agg.own_id);
517 }
518 }
519
520 // If there was any redaction among the current aggregation, adding a new one
521 // should be a noop.
522 if let Some(previous_aggregations) = self.related_events.get(&related_to)
523 && previous_aggregations
524 .iter()
525 .any(|agg| matches!(agg.kind, AggregationKind::Redaction { .. }))
526 {
527 return;
528 }
529
530 self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
531
532 // We can have 3 different states for the same aggregation in related_events, in
533 // chronological order:
534 //
535 // 1. The local echo with a transaction ID.
536 // 2. The local echo with the event ID returned by the server after sending the
537 // event.
538 // 3. The remote echo received via sync.
539 //
540 // The transition from states 1 to 2 is handled in `mark_aggregation_as_sent()`.
541 // So here we need to handle the transition from states 2 to 3. We need to
542 // replace the local echo by the remote echo, which might have more data, like
543 // the raw JSON.
544 let related_events = self.related_events.entry(related_to).or_default();
545 if let Some(pos) = related_events.iter().position(|agg| agg.own_id == aggregation.own_id) {
546 related_events.remove(pos);
547 }
548 related_events.push(aggregation);
549 }
550
551 /// Is the given id one for a known aggregation to another event?
552 ///
553 /// If so, unapplies it by replacing the corresponding related item, if
554 /// needs be.
555 ///
556 /// Returns true if an aggregation was found. This doesn't mean
557 /// the underlying item has been updated, if it was missing from the
558 /// timeline for instance.
559 ///
560 /// May return an error if it found an aggregation, but it couldn't be
561 /// properly applied.
562 pub fn try_remove_aggregation(
563 &mut self,
564 aggregation_id: &TimelineEventItemId,
565 items: &mut ObservableItemsTransaction<'_>,
566 ) -> Result<bool, AggregationError> {
567 let Some(found) = self.inverted_map.get(aggregation_id) else { return Ok(false) };
568
569 // Find and remove the aggregation in the other mapping.
570 let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
571 let removed = aggregations
572 .iter()
573 .position(|agg| agg.own_id == *aggregation_id)
574 .map(|idx| aggregations.remove(idx));
575
576 // If this was the last aggregation, remove the entry in the `related_events`
577 // mapping.
578 if aggregations.is_empty() {
579 self.related_events.remove(found);
580 }
581
582 removed
583 } else {
584 None
585 };
586
587 let Some(aggregation) = aggregation else {
588 warn!(
589 "incorrect internal state: {aggregation_id:?} was present in the inverted map, \
590 not in related-to map."
591 );
592 return Ok(false);
593 };
594
595 if let Some((item_pos, item)) = rfind_event_by_item_id(items, found) {
596 let mut cowed = Cow::Borrowed(&*item);
597 match aggregation.unapply(&mut cowed) {
598 ApplyAggregationResult::UpdatedItem => {
599 trace!("removed aggregation");
600 items.replace(
601 item_pos,
602 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
603 );
604 }
605 ApplyAggregationResult::LeftItemIntact => {}
606 ApplyAggregationResult::Error(err) => {
607 warn!("error when unapplying aggregation: {err}");
608 }
609 ApplyAggregationResult::Edit => {
610 // This edit has been removed; try to find another that still applies.
611 if let Some(aggregations) = self.related_events.get(found) {
612 if resolve_edits(aggregations, items, &mut cowed) {
613 items.replace(
614 item_pos,
615 TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
616 );
617 } else {
618 // No other edit was found, leave the item as is.
619 // TODO likely need to change the item to indicate
620 // it's been un-edited etc.
621 }
622 } else {
623 // No other edits apply.
624 }
625 }
626 }
627 } else {
628 info!("missing related-to item ({found:?}) for aggregation {aggregation_id:?}");
629 }
630
631 Ok(true)
632 }
633
634 /// Apply all the aggregations to a [`TimelineItemContent`].
635 ///
636 /// If `sender` is provided alongside a remote `item_id`, any
637 /// [`AggregationKind::BeaconStop`] events that arrived out-of-order (i.e.
638 /// before the live `beacon_info` start item) are first promoted from the
639 /// pending-stops stash into the regular aggregation map so they are picked
640 /// up here together with every other pending aggregation for this item.
641 ///
642 /// Will return an error at the first aggregation that couldn't be applied;
643 /// see [`Aggregation::apply`] which explains under which conditions it can
644 /// happen.
645 pub fn apply_all(
646 &mut self,
647 item_id: &TimelineEventItemId,
648 sender: &OwnedUserId,
649 event: &mut Cow<'_, EventTimelineItem>,
650 items: &mut ObservableItemsTransaction<'_>,
651 rules: &RoomVersionRules,
652 ) -> Result<(), AggregationError> {
653 // If a beacon-stop arrived before this live start item, it was stashed
654 // in `pending_beacon_stops` keyed by sender. Promote it into
655 // `related_events` under the now-known start event ID so the loop below
656 // applies it together with any other pending aggregations.
657 //
658 // The promotion verifies that the pending stop's content matches the
659 // start event's content to ensure we don't apply an old stop to a new
660 // session.
661 if let TimelineEventItemId::EventId(event_id) = item_id
662 && let Some(live_location) = event.content().as_live_location_state()
663 {
664 self.promote_pending_beacon_stop(sender, event_id.clone(), &live_location.beacon_info);
665 }
666
667 let Some(aggregations) = self.related_events.get(item_id) else {
668 return Ok(());
669 };
670
671 let mut has_edits = false;
672
673 for a in aggregations {
674 match a.apply(event, rules) {
675 ApplyAggregationResult::Edit => {
676 has_edits = true;
677 }
678 ApplyAggregationResult::UpdatedItem | ApplyAggregationResult::LeftItemIntact => {}
679 ApplyAggregationResult::Error(err) => return Err(err),
680 }
681 }
682
683 if has_edits {
684 resolve_edits(aggregations, items, event);
685 }
686
687 Ok(())
688 }
689
690 /// Mark a target event as being sent (i.e. it transitions from an local
691 /// transaction id to its remote event id counterpart), by updating the
692 /// internal mappings.
693 pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
694 let from = TimelineEventItemId::TransactionId(txn_id);
695 let to = TimelineEventItemId::EventId(event_id);
696
697 // Update the aggregations in the `related_events` field.
698 if let Some(aggregations) = self.related_events.remove(&from) {
699 // Update the inverted mappings (from aggregation's id, to the new target id).
700 for a in &aggregations {
701 if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
702 debug_assert_eq!(prev_target, from);
703 self.inverted_map.insert(a.own_id.clone(), to.clone());
704 }
705 }
706 // Update the direct mapping of target -> aggregations.
707 self.related_events.entry(to).or_default().extend(aggregations);
708 }
709 }
710
711 /// Mark an aggregation event as being sent (i.e. it transitions from an
712 /// local transaction id to its remote event id counterpart), by
713 /// updating the internal mappings.
714 ///
715 /// When an aggregation has been marked as sent, it may need to be reapplied
716 /// to the corresponding [`TimelineItemContent`]; this is why we're also
717 /// passing the context to apply an aggregation here.
718 pub fn mark_aggregation_as_sent(
719 &mut self,
720 txn_id: OwnedTransactionId,
721 event_id: OwnedEventId,
722 items: &mut ObservableItemsTransaction<'_>,
723 rules: &RoomVersionRules,
724 ) -> bool {
725 let from = TimelineEventItemId::TransactionId(txn_id);
726 let to = TimelineEventItemId::EventId(event_id.clone());
727
728 let Some(target) = self.inverted_map.remove(&from) else {
729 return false;
730 };
731
732 if let Some(aggregations) = self.related_events.get_mut(&target)
733 && let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from)
734 {
735 found.own_id = to.clone();
736
737 match &mut found.kind {
738 AggregationKind::PollResponse { .. }
739 | AggregationKind::PollEnd { .. }
740 | AggregationKind::Edit(..)
741 | AggregationKind::BeaconUpdate { .. }
742 | AggregationKind::BeaconStop { .. } => {
743 // Nothing particular to do.
744 }
745
746 AggregationKind::Redaction { is_local } => {
747 // Mark the redaction as being remote and apply it (irreversibly).
748 *is_local = false;
749
750 let found = found.clone();
751 find_item_and_apply_aggregation(self, items, &target, found, rules);
752 }
753
754 AggregationKind::Reaction { reaction_status, .. } => {
755 // Mark the reaction as becoming remote, and signal that update to the
756 // caller.
757 *reaction_status = ReactionStatus::RemoteToRemote(event_id);
758
759 let found = found.clone();
760 find_item_and_apply_aggregation(self, items, &target, found, rules);
761 }
762 }
763 }
764
765 self.inverted_map.insert(to, target);
766 true
767 }
768
769 /// Returns the id of the event this aggregation relates to, if it's a known
770 /// aggregation.
771 pub fn is_aggregation_of(&self, item: &TimelineEventItemId) -> Option<&TimelineEventItemId> {
772 self.inverted_map.get(item)
773 }
774}
775
776/// Look at all the edits of a given event, and apply the most recent one, if
777/// found.
778///
779/// Returns true if an edit was found and applied, false otherwise.
780fn resolve_edits(
781 aggregations: &[Aggregation],
782 items: &ObservableItemsTransaction<'_>,
783 event: &mut Cow<'_, EventTimelineItem>,
784) -> bool {
785 // A tuple of the best edit, if we have found one and a boolean indicating if
786 // the edit is coming from a local echo. If it's from a local echo, we can't
787 // validate it as we don't have a raw JSON, but this isn't that important as
788 // we're sure we won't send ourselves invalid edits.
789 let mut best_edit: Option<(PendingEdit, bool)> = None;
790 let mut best_edit_pos = None;
791
792 for a in aggregations {
793 if let AggregationKind::Edit(pending_edit) = &a.kind {
794 match &a.own_id {
795 TimelineEventItemId::TransactionId(_) => {
796 // A local echo is always the most recent edit: use this one.
797 best_edit = Some((pending_edit.clone(), true));
798 break;
799 }
800
801 TimelineEventItemId::EventId(event_id) => {
802 if let Some(best_edit_pos) = &mut best_edit_pos {
803 // Find the position of the timeline owning the edit: either the bundled
804 // item owner if this was a bundled edit, or the edit event itself.
805 let pos = items.position_by_event_id(
806 pending_edit.bundled_item_owner.as_ref().unwrap_or(event_id),
807 );
808
809 if let Some(pos) = pos {
810 // If the edit is more recent (higher index) than the previous best
811 // edit we knew about, use this one.
812 if pos > *best_edit_pos {
813 best_edit = Some((pending_edit.clone(), false));
814 *best_edit_pos = pos;
815 trace!(?best_edit_pos, edit_id = ?a.own_id, "found better edit");
816 }
817 } else {
818 trace!(edit_id = ?a.own_id, "couldn't find timeline meta for edit event");
819
820 // The edit event isn't in the timeline, so it might be a bundled
821 // edit. In this case, record it as the best edit if and only if
822 // there wasn't any other.
823 if best_edit.is_none() {
824 best_edit = Some((pending_edit.clone(), false));
825 trace!(?best_edit_pos, edit_id = ?a.own_id, "found bundled edit");
826 }
827 }
828 } else {
829 // There wasn't any best edit yet, so record this one as being it, with
830 // its position.
831 best_edit = Some((pending_edit.clone(), false));
832 best_edit_pos = items.position_by_event_id(event_id);
833 trace!(?best_edit_pos, edit_id = ?a.own_id, "first best edit");
834 }
835 }
836 }
837 }
838 }
839
840 if let Some((edit, is_local_echo)) = best_edit {
841 edit_item(event, edit, is_local_echo)
842 } else {
843 false
844 }
845}
846
847/// Apply the selected edit to the given EventTimelineItem.
848///
849/// Returns true if the edit was applied, false otherwise (because the edit and
850/// original timeline item types didn't match, for instance).
851fn edit_item(
852 item: &mut Cow<'_, EventTimelineItem>,
853 edit: PendingEdit,
854 is_local_echo: bool,
855) -> bool {
856 // We can receive edits from a local echo, i.e. the edit wasn't yet received
857 // from the homeserver.
858 //
859 // Before we send an edit we check that the event is allowed to be edited and
860 // that the replacement content is allowed.
861 //
862 // We don't have yet a full JSON of the event, so we can't do the validation
863 // here.
864 if !is_local_echo {
865 let Some(original_json) = item.original_json() else {
866 error!("The original event does not have the JSON field set.");
867 return false;
868 };
869
870 let Some(edit_json) = &edit.edit_json else {
871 error!(
872 "The replacement event of a remotely received edit does not have the JSON field set."
873 );
874 return false;
875 };
876
877 match check_validity_of_replacement_events(
878 original_json,
879 item.encryption_info(),
880 edit_json,
881 edit.encryption_info.as_deref(),
882 ) {
883 Ok(content) => content,
884 Err(e) => {
885 warn!("Event wasn't replaced due to the replacement event being invalid: {e}");
886 return false;
887 }
888 }
889 }
890
891 let TimelineItemContent::MsgLike(content) = item.content() else {
892 info!("Edit of message event applies to {:?}, discarding", item.content().debug_string());
893 return false;
894 };
895
896 let PendingEdit { kind: edit_kind, edit_json, encryption_info, bundled_item_owner: _ } = edit;
897
898 match (edit_kind, content) {
899 (
900 PendingEditKind::RoomMessage(replacement),
901 MsgLikeContent { kind: MsgLikeKind::Message(msg), .. },
902 ) => {
903 // First combination: it's a message edit for a message. Good.
904 let mut new_msg = msg.clone();
905 new_msg.apply_edit(replacement.new_content);
906
907 let new_item = item.with_content_and_latest_edit(
908 TimelineItemContent::MsgLike(content.with_kind(MsgLikeKind::Message(new_msg))),
909 edit_json,
910 );
911 *item = Cow::Owned(new_item);
912 }
913
914 (
915 PendingEditKind::Poll(replacement),
916 MsgLikeContent { kind: MsgLikeKind::Poll(poll_state), .. },
917 ) => {
918 // Second combination: it's a poll edit for a poll. Good.
919 if let Some(new_poll_state) = poll_state.edit(replacement.new_content) {
920 let new_item = item.with_content_and_latest_edit(
921 TimelineItemContent::MsgLike(
922 content.with_kind(MsgLikeKind::Poll(new_poll_state)),
923 ),
924 edit_json,
925 );
926 *item = Cow::Owned(new_item);
927 } else {
928 // The poll has ended, so we can't edit it anymore.
929 return false;
930 }
931 }
932
933 (edit_kind, _) => {
934 // Invalid combination.
935 info!(
936 content = item.content().debug_string(),
937 edit = format!("{:?}", edit_kind),
938 "Mismatch between edit type and content type",
939 );
940 return false;
941 }
942 }
943
944 if let Some(encryption_info) = encryption_info {
945 *item = Cow::Owned(item.with_encryption_info(Some(encryption_info)));
946 }
947
948 true
949}
950
951/// Find an item identified by the target identifier, and apply the aggregation
952/// onto it.
953///
954/// Returns the updated [`EventTimelineItem`] if the aggregation was applied, or
955/// `None` otherwise.
956pub(crate) fn find_item_and_apply_aggregation(
957 aggregations: &Aggregations,
958 items: &mut ObservableItemsTransaction<'_>,
959 target: &TimelineEventItemId,
960 aggregation: Aggregation,
961 rules: &RoomVersionRules,
962) -> Option<EventTimelineItem> {
963 let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
964 trace!("couldn't find aggregation's target {target:?}");
965 return None;
966 };
967
968 let mut cowed = Cow::Borrowed(&*event_item);
969 match aggregation.apply(&mut cowed, rules) {
970 ApplyAggregationResult::UpdatedItem => {
971 trace!("applied aggregation");
972 let new_event_item = cowed.into_owned();
973 let new_item =
974 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
975 items.replace(idx, new_item);
976 Some(new_event_item)
977 }
978 ApplyAggregationResult::Edit => {
979 if let Some(aggregations) = aggregations.related_events.get(target)
980 && resolve_edits(aggregations, items, &mut cowed)
981 {
982 let new_event_item = cowed.into_owned();
983 let new_item =
984 TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
985 items.replace(idx, new_item);
986 return Some(new_event_item);
987 }
988 None
989 }
990 ApplyAggregationResult::LeftItemIntact => {
991 trace!("applying the aggregation had no effect");
992 None
993 }
994 ApplyAggregationResult::Error(err) => {
995 warn!("error when applying aggregation: {err}");
996 None
997 }
998 }
999}
1000
1001/// The result of applying (or unapplying) an aggregation onto a timeline item.
1002enum ApplyAggregationResult {
1003 /// The passed `Cow<EventTimelineItem>` has been cloned and updated.
1004 UpdatedItem,
1005
1006 /// An edit must be included in the edit set and resolved later, using the
1007 /// relative position of the edits.
1008 Edit,
1009
1010 /// The item hasn't been modified after applying the aggregation, because it
1011 /// was likely already applied prior to this.
1012 LeftItemIntact,
1013
1014 /// An error happened while applying the aggregation.
1015 Error(AggregationError),
1016}
1017
1018#[derive(Debug, thiserror::Error)]
1019pub(crate) enum AggregationError {
1020 #[error("trying to end a poll twice")]
1021 PollAlreadyEnded,
1022
1023 #[error("a poll end can't be unapplied")]
1024 CantUndoPollEnd,
1025
1026 #[error("a redaction can't be unapplied")]
1027 CantUndoRedaction,
1028
1029 #[error("a beacon stop can't be unapplied")]
1030 CantUndoBeaconStop,
1031
1032 #[error(
1033 "trying to apply an aggregation of one type to an invalid target: \
1034 expected {expected}, actual {actual}"
1035 )]
1036 InvalidType { expected: String, actual: String },
1037}