matrix_sdk_ui/timeline/controller/
aggregations.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::collections::HashMap;
41
42use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId};
43use tracing::{trace, warn};
44
45use super::{rfind_event_by_item_id, ObservableItemsTransaction};
46use crate::timeline::{
47    PollState, ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
48};
49
50/// Which kind of aggregation (related event) is this?
51#[derive(Clone, Debug)]
52pub(crate) enum AggregationKind {
53    /// This is a response to a poll.
54    PollResponse {
55        /// Sender of the poll's response.
56        sender: OwnedUserId,
57        /// Timestamp at which the response has beens ent.
58        timestamp: MilliSecondsSinceUnixEpoch,
59        /// All the answers to the poll sent by the sender.
60        answers: Vec<String>,
61    },
62
63    /// This is the marker of the end of a poll.
64    PollEnd {
65        /// Timestamp at which the poll ends, i.e. all the responses with a
66        /// timestamp prior to this one should be taken into account
67        /// (and all the responses with a timestamp after this one
68        /// should be dropped).
69        end_date: MilliSecondsSinceUnixEpoch,
70    },
71
72    /// This is a reaction to another event.
73    Reaction {
74        /// The reaction "key" displayed by the client, often an emoji.
75        key: String,
76        /// Sender of the reaction.
77        sender: OwnedUserId,
78        /// Timestamp at which the reaction has been sent.
79        timestamp: MilliSecondsSinceUnixEpoch,
80        /// The send status of the reaction this is, with handles to abort it if
81        /// we can, etc.
82        reaction_status: ReactionStatus,
83    },
84}
85
86/// An aggregation is an event related to another event (for instance a
87/// reaction, a poll's response, etc.).
88///
89/// It can be either a local or a remote echo.
90#[derive(Clone, Debug)]
91pub(crate) struct Aggregation {
92    /// The kind of aggregation this represents.
93    pub kind: AggregationKind,
94
95    /// The own timeline identifier for an aggregation.
96    ///
97    /// It will be a transaction id when the aggregation is still a local echo,
98    /// and it will transition into an event id when the aggregation is a
99    /// remote echo (i.e. has been received in a sync response):
100    pub own_id: TimelineEventItemId,
101}
102
103/// Get the poll state from a given [`TimelineItemContent`].
104fn poll_state_from_item(
105    content: &mut TimelineItemContent,
106) -> Result<&mut PollState, AggregationError> {
107    match content {
108        TimelineItemContent::Poll(poll_state) => Ok(poll_state),
109        _ => Err(AggregationError::InvalidType {
110            expected: "a poll".to_owned(),
111            actual: content.debug_string().to_owned(),
112        }),
113    }
114}
115
116impl Aggregation {
117    /// Create a new [`Aggregation`].
118    pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
119        Self { kind, own_id }
120    }
121
122    /// Apply an aggregation in-place to a given [`TimelineItemContent`].
123    ///
124    /// In case of success, returns an enum indicating whether the applied
125    /// aggregation had an effect on the content; if it updated it, then the
126    /// caller has the responsibility to reflect that change.
127    ///
128    /// In case of error, returns an error detailing why the aggregation
129    /// couldn't be applied.
130    pub fn apply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
131        match &self.kind {
132            AggregationKind::PollResponse { sender, timestamp, answers } => {
133                let state = match poll_state_from_item(content) {
134                    Ok(state) => state,
135                    Err(err) => return ApplyAggregationResult::Error(err),
136                };
137                state.add_response(sender.clone(), *timestamp, answers.clone());
138                ApplyAggregationResult::UpdatedItem
139            }
140
141            AggregationKind::PollEnd { end_date } => {
142                let poll_state = match poll_state_from_item(content) {
143                    Ok(state) => state,
144                    Err(err) => return ApplyAggregationResult::Error(err),
145                };
146                if !poll_state.end(*end_date) {
147                    return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
148                }
149                ApplyAggregationResult::UpdatedItem
150            }
151
152            AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
153                let Some(reactions) = content.reactions_mut() else {
154                    // These items don't hold reactions.
155                    return ApplyAggregationResult::LeftItemIntact;
156                };
157
158                let previous_reaction = reactions.entry(key.clone()).or_default().insert(
159                    sender.clone(),
160                    ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
161                );
162
163                let is_same = previous_reaction.is_some_and(|prev| {
164                    prev.timestamp == *timestamp
165                        && matches!(
166                            (prev.status, reaction_status),
167                            (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
168                                | (
169                                    ReactionStatus::LocalToRemote(_),
170                                    ReactionStatus::LocalToRemote(_),
171                                )
172                                | (
173                                    ReactionStatus::RemoteToRemote(_),
174                                    ReactionStatus::RemoteToRemote(_),
175                                )
176                        )
177                });
178
179                if is_same {
180                    ApplyAggregationResult::LeftItemIntact
181                } else {
182                    ApplyAggregationResult::UpdatedItem
183                }
184            }
185        }
186    }
187
188    /// Undo an aggregation in-place to a given [`TimelineItemContent`].
189    ///
190    /// In case of success, returns an enum indicating whether unapplying the
191    /// aggregation had an effect on the content; if it updated it, then the
192    /// caller has the responsibility to reflect that change.
193    ///
194    /// In case of error, returns an error detailing why the aggregation
195    /// couldn't be unapplied.
196    pub fn unapply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
197        match &self.kind {
198            AggregationKind::PollResponse { sender, timestamp, .. } => {
199                let state = match poll_state_from_item(content) {
200                    Ok(state) => state,
201                    Err(err) => return ApplyAggregationResult::Error(err),
202                };
203                state.remove_response(sender, *timestamp);
204                ApplyAggregationResult::UpdatedItem
205            }
206
207            AggregationKind::PollEnd { .. } => {
208                // Assume we can't undo a poll end event at the moment.
209                ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
210            }
211
212            AggregationKind::Reaction { key, sender, .. } => {
213                let Some(reactions) = content.reactions_mut() else {
214                    // An item that doesn't hold any reactions.
215                    return ApplyAggregationResult::LeftItemIntact;
216                };
217
218                let by_user = reactions.get_mut(key);
219                let previous_entry = if let Some(by_user) = by_user {
220                    let prev = by_user.swap_remove(sender);
221                    // If this was the last reaction, remove the entire map for this key.
222                    if by_user.is_empty() {
223                        reactions.swap_remove(key);
224                    }
225                    prev
226                } else {
227                    None
228                };
229
230                if previous_entry.is_some() {
231                    ApplyAggregationResult::UpdatedItem
232                } else {
233                    ApplyAggregationResult::LeftItemIntact
234                }
235            }
236        }
237    }
238}
239
240/// Manager for all known existing aggregations to all events in the timeline.
241#[derive(Clone, Debug, Default)]
242pub(crate) struct Aggregations {
243    /// Mapping of a target event to its list of aggregations.
244    related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
245
246    /// Mapping of a related event identifier to its target.
247    inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
248}
249
250impl Aggregations {
251    /// Clear all the known aggregations from all the mappings.
252    pub fn clear(&mut self) {
253        self.related_events.clear();
254        self.inverted_map.clear();
255    }
256
257    /// Add a given aggregation that relates to the [`TimelineItemContent`]
258    /// identified by the given [`TimelineEventItemId`].
259    pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
260        self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
261        self.related_events.entry(related_to).or_default().push(aggregation);
262    }
263
264    /// Is the given id one for a known aggregation to another event?
265    ///
266    /// If so, returns the target event identifier as well as the aggregation.
267    /// The aggregation must be unapplied on the corresponding timeline
268    /// item.
269    #[must_use]
270    pub fn try_remove_aggregation(
271        &mut self,
272        aggregation_id: &TimelineEventItemId,
273    ) -> Option<(&TimelineEventItemId, Aggregation)> {
274        let found = self.inverted_map.get(aggregation_id)?;
275
276        // Find and remove the aggregation in the other mapping.
277        let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
278            let removed = aggregations
279                .iter()
280                .position(|agg| agg.own_id == *aggregation_id)
281                .map(|idx| aggregations.remove(idx));
282
283            // If this was the last aggregation, remove the entry in the `related_events`
284            // mapping.
285            if aggregations.is_empty() {
286                self.related_events.remove(found);
287            }
288
289            removed
290        } else {
291            None
292        };
293
294        if aggregation.is_none() {
295            warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map.");
296        }
297
298        Some((found, aggregation?))
299    }
300
301    /// Apply all the aggregations to a [`TimelineItemContent`].
302    ///
303    /// Will return an error at the first aggregation that couldn't be applied;
304    /// see [`Aggregation::apply`] which explains under which conditions it can
305    /// happen.
306    pub fn apply(
307        &self,
308        item_id: &TimelineEventItemId,
309        content: &mut TimelineItemContent,
310    ) -> Result<(), AggregationError> {
311        let Some(aggregations) = self.related_events.get(item_id) else {
312            return Ok(());
313        };
314        for a in aggregations {
315            if let ApplyAggregationResult::Error(err) = a.apply(content) {
316                return Err(err);
317            }
318        }
319        Ok(())
320    }
321
322    /// Mark a target event as being sent (i.e. it transitions from an local
323    /// transaction id to its remote event id counterpart), by updating the
324    /// internal mappings.
325    pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
326        let from = TimelineEventItemId::TransactionId(txn_id);
327        let to = TimelineEventItemId::EventId(event_id);
328
329        // Update the aggregations in the `related_events` field.
330        if let Some(aggregations) = self.related_events.remove(&from) {
331            // Update the inverted mappings (from aggregation's id, to the new target id).
332            for a in &aggregations {
333                if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
334                    debug_assert_eq!(prev_target, from);
335                    self.inverted_map.insert(a.own_id.clone(), to.clone());
336                }
337            }
338            // Update the direct mapping of target -> aggregations.
339            self.related_events.entry(to).or_default().extend(aggregations);
340        }
341    }
342
343    /// Mark an aggregation event as being sent (i.e. it transitions from an
344    /// local transaction id to its remote event id counterpart), by
345    /// updating the internal mappings.
346    ///
347    /// When an aggregation has been marked as sent, it may need to be reapplied
348    /// to the corresponding [`TimelineItemContent`]; in this case, a
349    /// [`MarkAggregationSentResult::MarkedSent`] result with a set `update`
350    /// will be returned, and must be applied.
351    pub fn mark_aggregation_as_sent(
352        &mut self,
353        txn_id: OwnedTransactionId,
354        event_id: OwnedEventId,
355    ) -> MarkAggregationSentResult {
356        let from = TimelineEventItemId::TransactionId(txn_id);
357        let to = TimelineEventItemId::EventId(event_id.clone());
358
359        let Some(target) = self.inverted_map.remove(&from) else {
360            return MarkAggregationSentResult::NotFound;
361        };
362
363        let mut target_and_new_aggregation = MarkAggregationSentResult::MarkedSent { update: None };
364
365        if let Some(aggregations) = self.related_events.get_mut(&target) {
366            if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) {
367                found.own_id = to.clone();
368
369                match &mut found.kind {
370                    AggregationKind::PollResponse { .. } | AggregationKind::PollEnd { .. } => {
371                        // Nothing particular to do.
372                    }
373                    AggregationKind::Reaction { reaction_status, .. } => {
374                        // Mark the reaction as becoming remote, and signal that update to the
375                        // caller.
376                        *reaction_status = ReactionStatus::RemoteToRemote(event_id);
377                        target_and_new_aggregation = MarkAggregationSentResult::MarkedSent {
378                            update: Some((target.clone(), found.clone())),
379                        };
380                    }
381                }
382            }
383        }
384
385        self.inverted_map.insert(to, target);
386
387        target_and_new_aggregation
388    }
389}
390
391/// Find an item identified by the target identifier, and apply the aggregation
392/// onto it.
393///
394/// Returns whether the aggregation has been applied or not (so as to increment
395/// a number of updated result, for instance).
396pub(crate) fn find_item_and_apply_aggregation(
397    items: &mut ObservableItemsTransaction<'_>,
398    target: &TimelineEventItemId,
399    aggregation: Aggregation,
400) -> bool {
401    let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
402        trace!("couldn't find aggregation's target {target:?}");
403        return false;
404    };
405
406    let mut new_content = event_item.content().clone();
407
408    match aggregation.apply(&mut new_content) {
409        ApplyAggregationResult::UpdatedItem => {
410            trace!("applied aggregation");
411            let new_item = event_item.with_content(new_content);
412            items.replace(idx, TimelineItem::new(new_item, event_item.internal_id.to_owned()));
413            true
414        }
415        ApplyAggregationResult::LeftItemIntact => {
416            trace!("applying the aggregation had no effect");
417            false
418        }
419        ApplyAggregationResult::Error(err) => {
420            warn!("error when applying aggregation: {err}");
421            false
422        }
423    }
424}
425
426/// The result of marking an aggregation as sent.
427pub(crate) enum MarkAggregationSentResult {
428    /// The aggregation has been found, and marked as sent.
429    ///
430    /// Optionally, it can include an [`Aggregation`] `update` to the matching
431    /// [`TimelineItemContent`] item identified by the [`TimelineEventItemId`].
432    MarkedSent { update: Option<(TimelineEventItemId, Aggregation)> },
433    /// The aggregation was unknown to the aggregations manager, aka not found.
434    NotFound,
435}
436
437/// The result of applying (or unapplying) an aggregation onto a timeline item.
438pub(crate) enum ApplyAggregationResult {
439    /// The item has been updated after applying the aggregation.
440    UpdatedItem,
441
442    /// The item hasn't been modified after applying the aggregation, because it
443    /// was likely already applied prior to this.
444    LeftItemIntact,
445
446    /// An error happened while applying the aggregation.
447    Error(AggregationError),
448}
449
450#[derive(Debug, thiserror::Error)]
451pub(crate) enum AggregationError {
452    #[error("trying to end a poll twice")]
453    PollAlreadyEnded,
454
455    #[error("a poll end can't be unapplied")]
456    CantUndoPollEnd,
457
458    #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
459    InvalidType { expected: String, actual: String },
460}