matrix_sdk_ui/timeline/controller/
aggregations.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::collections::HashMap;
41
42use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId};
43use tracing::{trace, warn};
44
45use super::{rfind_event_by_item_id, ObservableItemsTransaction};
46use crate::timeline::{
47    MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus, TimelineEventItemId,
48    TimelineItem, TimelineItemContent,
49};
50
51/// Which kind of aggregation (related event) is this?
52#[derive(Clone, Debug)]
53pub(crate) enum AggregationKind {
54    /// This is a response to a poll.
55    PollResponse {
56        /// Sender of the poll's response.
57        sender: OwnedUserId,
58        /// Timestamp at which the response has beens ent.
59        timestamp: MilliSecondsSinceUnixEpoch,
60        /// All the answers to the poll sent by the sender.
61        answers: Vec<String>,
62    },
63
64    /// This is the marker of the end of a poll.
65    PollEnd {
66        /// Timestamp at which the poll ends, i.e. all the responses with a
67        /// timestamp prior to this one should be taken into account
68        /// (and all the responses with a timestamp after this one
69        /// should be dropped).
70        end_date: MilliSecondsSinceUnixEpoch,
71    },
72
73    /// This is a reaction to another event.
74    Reaction {
75        /// The reaction "key" displayed by the client, often an emoji.
76        key: String,
77        /// Sender of the reaction.
78        sender: OwnedUserId,
79        /// Timestamp at which the reaction has been sent.
80        timestamp: MilliSecondsSinceUnixEpoch,
81        /// The send status of the reaction this is, with handles to abort it if
82        /// we can, etc.
83        reaction_status: ReactionStatus,
84    },
85}
86
87/// An aggregation is an event related to another event (for instance a
88/// reaction, a poll's response, etc.).
89///
90/// It can be either a local or a remote echo.
91#[derive(Clone, Debug)]
92pub(crate) struct Aggregation {
93    /// The kind of aggregation this represents.
94    pub kind: AggregationKind,
95
96    /// The own timeline identifier for an aggregation.
97    ///
98    /// It will be a transaction id when the aggregation is still a local echo,
99    /// and it will transition into an event id when the aggregation is a
100    /// remote echo (i.e. has been received in a sync response):
101    pub own_id: TimelineEventItemId,
102}
103
104/// Get the poll state from a given [`TimelineItemContent`].
105fn poll_state_from_item(
106    content: &mut TimelineItemContent,
107) -> Result<&mut PollState, AggregationError> {
108    match content {
109        TimelineItemContent::MsgLike(MsgLikeContent {
110            kind: MsgLikeKind::Poll(poll_state),
111            ..
112        }) => Ok(poll_state),
113        _ => Err(AggregationError::InvalidType {
114            expected: "a poll".to_owned(),
115            actual: content.debug_string().to_owned(),
116        }),
117    }
118}
119
120impl Aggregation {
121    /// Create a new [`Aggregation`].
122    pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
123        Self { kind, own_id }
124    }
125
126    /// Apply an aggregation in-place to a given [`TimelineItemContent`].
127    ///
128    /// In case of success, returns an enum indicating whether the applied
129    /// aggregation had an effect on the content; if it updated it, then the
130    /// caller has the responsibility to reflect that change.
131    ///
132    /// In case of error, returns an error detailing why the aggregation
133    /// couldn't be applied.
134    pub fn apply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
135        match &self.kind {
136            AggregationKind::PollResponse { sender, timestamp, answers } => {
137                let state = match poll_state_from_item(content) {
138                    Ok(state) => state,
139                    Err(err) => return ApplyAggregationResult::Error(err),
140                };
141                state.add_response(sender.clone(), *timestamp, answers.clone());
142                ApplyAggregationResult::UpdatedItem
143            }
144
145            AggregationKind::PollEnd { end_date } => {
146                let poll_state = match poll_state_from_item(content) {
147                    Ok(state) => state,
148                    Err(err) => return ApplyAggregationResult::Error(err),
149                };
150                if !poll_state.end(*end_date) {
151                    return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
152                }
153                ApplyAggregationResult::UpdatedItem
154            }
155
156            AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
157                let Some(reactions) = content.reactions_mut() else {
158                    // These items don't hold reactions.
159                    return ApplyAggregationResult::LeftItemIntact;
160                };
161
162                let previous_reaction = reactions.entry(key.clone()).or_default().insert(
163                    sender.clone(),
164                    ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
165                );
166
167                let is_same = previous_reaction.is_some_and(|prev| {
168                    prev.timestamp == *timestamp
169                        && matches!(
170                            (prev.status, reaction_status),
171                            (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
172                                | (
173                                    ReactionStatus::LocalToRemote(_),
174                                    ReactionStatus::LocalToRemote(_),
175                                )
176                                | (
177                                    ReactionStatus::RemoteToRemote(_),
178                                    ReactionStatus::RemoteToRemote(_),
179                                )
180                        )
181                });
182
183                if is_same {
184                    ApplyAggregationResult::LeftItemIntact
185                } else {
186                    ApplyAggregationResult::UpdatedItem
187                }
188            }
189        }
190    }
191
192    /// Undo an aggregation in-place to a given [`TimelineItemContent`].
193    ///
194    /// In case of success, returns an enum indicating whether unapplying the
195    /// aggregation had an effect on the content; if it updated it, then the
196    /// caller has the responsibility to reflect that change.
197    ///
198    /// In case of error, returns an error detailing why the aggregation
199    /// couldn't be unapplied.
200    pub fn unapply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
201        match &self.kind {
202            AggregationKind::PollResponse { sender, timestamp, .. } => {
203                let state = match poll_state_from_item(content) {
204                    Ok(state) => state,
205                    Err(err) => return ApplyAggregationResult::Error(err),
206                };
207                state.remove_response(sender, *timestamp);
208                ApplyAggregationResult::UpdatedItem
209            }
210
211            AggregationKind::PollEnd { .. } => {
212                // Assume we can't undo a poll end event at the moment.
213                ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
214            }
215
216            AggregationKind::Reaction { key, sender, .. } => {
217                let Some(reactions) = content.reactions_mut() else {
218                    // An item that doesn't hold any reactions.
219                    return ApplyAggregationResult::LeftItemIntact;
220                };
221
222                let by_user = reactions.get_mut(key);
223                let previous_entry = if let Some(by_user) = by_user {
224                    let prev = by_user.swap_remove(sender);
225                    // If this was the last reaction, remove the entire map for this key.
226                    if by_user.is_empty() {
227                        reactions.swap_remove(key);
228                    }
229                    prev
230                } else {
231                    None
232                };
233
234                if previous_entry.is_some() {
235                    ApplyAggregationResult::UpdatedItem
236                } else {
237                    ApplyAggregationResult::LeftItemIntact
238                }
239            }
240        }
241    }
242}
243
244/// Manager for all known existing aggregations to all events in the timeline.
245#[derive(Clone, Debug, Default)]
246pub(crate) struct Aggregations {
247    /// Mapping of a target event to its list of aggregations.
248    related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
249
250    /// Mapping of a related event identifier to its target.
251    inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
252}
253
254impl Aggregations {
255    /// Clear all the known aggregations from all the mappings.
256    pub fn clear(&mut self) {
257        self.related_events.clear();
258        self.inverted_map.clear();
259    }
260
261    /// Add a given aggregation that relates to the [`TimelineItemContent`]
262    /// identified by the given [`TimelineEventItemId`].
263    pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
264        self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
265        self.related_events.entry(related_to).or_default().push(aggregation);
266    }
267
268    /// Is the given id one for a known aggregation to another event?
269    ///
270    /// If so, returns the target event identifier as well as the aggregation.
271    /// The aggregation must be unapplied on the corresponding timeline
272    /// item.
273    #[must_use]
274    pub fn try_remove_aggregation(
275        &mut self,
276        aggregation_id: &TimelineEventItemId,
277    ) -> Option<(&TimelineEventItemId, Aggregation)> {
278        let found = self.inverted_map.get(aggregation_id)?;
279
280        // Find and remove the aggregation in the other mapping.
281        let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
282            let removed = aggregations
283                .iter()
284                .position(|agg| agg.own_id == *aggregation_id)
285                .map(|idx| aggregations.remove(idx));
286
287            // If this was the last aggregation, remove the entry in the `related_events`
288            // mapping.
289            if aggregations.is_empty() {
290                self.related_events.remove(found);
291            }
292
293            removed
294        } else {
295            None
296        };
297
298        if aggregation.is_none() {
299            warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map.");
300        }
301
302        Some((found, aggregation?))
303    }
304
305    /// Apply all the aggregations to a [`TimelineItemContent`].
306    ///
307    /// Will return an error at the first aggregation that couldn't be applied;
308    /// see [`Aggregation::apply`] which explains under which conditions it can
309    /// happen.
310    pub fn apply(
311        &self,
312        item_id: &TimelineEventItemId,
313        content: &mut TimelineItemContent,
314    ) -> Result<(), AggregationError> {
315        let Some(aggregations) = self.related_events.get(item_id) else {
316            return Ok(());
317        };
318        for a in aggregations {
319            if let ApplyAggregationResult::Error(err) = a.apply(content) {
320                return Err(err);
321            }
322        }
323        Ok(())
324    }
325
326    /// Mark a target event as being sent (i.e. it transitions from an local
327    /// transaction id to its remote event id counterpart), by updating the
328    /// internal mappings.
329    pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
330        let from = TimelineEventItemId::TransactionId(txn_id);
331        let to = TimelineEventItemId::EventId(event_id);
332
333        // Update the aggregations in the `related_events` field.
334        if let Some(aggregations) = self.related_events.remove(&from) {
335            // Update the inverted mappings (from aggregation's id, to the new target id).
336            for a in &aggregations {
337                if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
338                    debug_assert_eq!(prev_target, from);
339                    self.inverted_map.insert(a.own_id.clone(), to.clone());
340                }
341            }
342            // Update the direct mapping of target -> aggregations.
343            self.related_events.entry(to).or_default().extend(aggregations);
344        }
345    }
346
347    /// Mark an aggregation event as being sent (i.e. it transitions from an
348    /// local transaction id to its remote event id counterpart), by
349    /// updating the internal mappings.
350    ///
351    /// When an aggregation has been marked as sent, it may need to be reapplied
352    /// to the corresponding [`TimelineItemContent`]; in this case, a
353    /// [`MarkAggregationSentResult::MarkedSent`] result with a set `update`
354    /// will be returned, and must be applied.
355    pub fn mark_aggregation_as_sent(
356        &mut self,
357        txn_id: OwnedTransactionId,
358        event_id: OwnedEventId,
359    ) -> MarkAggregationSentResult {
360        let from = TimelineEventItemId::TransactionId(txn_id);
361        let to = TimelineEventItemId::EventId(event_id.clone());
362
363        let Some(target) = self.inverted_map.remove(&from) else {
364            return MarkAggregationSentResult::NotFound;
365        };
366
367        let mut target_and_new_aggregation = MarkAggregationSentResult::MarkedSent { update: None };
368
369        if let Some(aggregations) = self.related_events.get_mut(&target) {
370            if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) {
371                found.own_id = to.clone();
372
373                match &mut found.kind {
374                    AggregationKind::PollResponse { .. } | AggregationKind::PollEnd { .. } => {
375                        // Nothing particular to do.
376                    }
377                    AggregationKind::Reaction { reaction_status, .. } => {
378                        // Mark the reaction as becoming remote, and signal that update to the
379                        // caller.
380                        *reaction_status = ReactionStatus::RemoteToRemote(event_id);
381                        target_and_new_aggregation = MarkAggregationSentResult::MarkedSent {
382                            update: Some((target.clone(), found.clone())),
383                        };
384                    }
385                }
386            }
387        }
388
389        self.inverted_map.insert(to, target);
390
391        target_and_new_aggregation
392    }
393}
394
395/// Find an item identified by the target identifier, and apply the aggregation
396/// onto it.
397///
398/// Returns whether the aggregation has been applied or not (so as to increment
399/// a number of updated result, for instance).
400pub(crate) fn find_item_and_apply_aggregation(
401    items: &mut ObservableItemsTransaction<'_>,
402    target: &TimelineEventItemId,
403    aggregation: Aggregation,
404) -> bool {
405    let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
406        trace!("couldn't find aggregation's target {target:?}");
407        return false;
408    };
409
410    let mut new_content = event_item.content().clone();
411
412    match aggregation.apply(&mut new_content) {
413        ApplyAggregationResult::UpdatedItem => {
414            trace!("applied aggregation");
415            let new_item = event_item.with_content(new_content);
416            items.replace(idx, TimelineItem::new(new_item, event_item.internal_id.to_owned()));
417            true
418        }
419        ApplyAggregationResult::LeftItemIntact => {
420            trace!("applying the aggregation had no effect");
421            false
422        }
423        ApplyAggregationResult::Error(err) => {
424            warn!("error when applying aggregation: {err}");
425            false
426        }
427    }
428}
429
430/// The result of marking an aggregation as sent.
431pub(crate) enum MarkAggregationSentResult {
432    /// The aggregation has been found, and marked as sent.
433    ///
434    /// Optionally, it can include an [`Aggregation`] `update` to the matching
435    /// [`TimelineItemContent`] item identified by the [`TimelineEventItemId`].
436    MarkedSent { update: Option<(TimelineEventItemId, Aggregation)> },
437    /// The aggregation was unknown to the aggregations manager, aka not found.
438    NotFound,
439}
440
441/// The result of applying (or unapplying) an aggregation onto a timeline item.
442pub(crate) enum ApplyAggregationResult {
443    /// The item has been updated after applying the aggregation.
444    UpdatedItem,
445
446    /// The item hasn't been modified after applying the aggregation, because it
447    /// was likely already applied prior to this.
448    LeftItemIntact,
449
450    /// An error happened while applying the aggregation.
451    Error(AggregationError),
452}
453
454#[derive(Debug, thiserror::Error)]
455pub(crate) enum AggregationError {
456    #[error("trying to end a poll twice")]
457    PollAlreadyEnded,
458
459    #[error("a poll end can't be unapplied")]
460    CantUndoPollEnd,
461
462    #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
463    InvalidType { expected: String, actual: String },
464}