matrix_sdk_ui/timeline/controller/aggregations.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::collections::HashMap;
41
42use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId};
43use tracing::{trace, warn};
44
45use super::{rfind_event_by_item_id, ObservableItemsTransaction};
46use crate::timeline::{
47 MsgLikeContent, MsgLikeKind, PollState, ReactionInfo, ReactionStatus, TimelineEventItemId,
48 TimelineItem, TimelineItemContent,
49};
50
51/// Which kind of aggregation (related event) is this?
52#[derive(Clone, Debug)]
53pub(crate) enum AggregationKind {
54 /// This is a response to a poll.
55 PollResponse {
56 /// Sender of the poll's response.
57 sender: OwnedUserId,
58 /// Timestamp at which the response has beens ent.
59 timestamp: MilliSecondsSinceUnixEpoch,
60 /// All the answers to the poll sent by the sender.
61 answers: Vec<String>,
62 },
63
64 /// This is the marker of the end of a poll.
65 PollEnd {
66 /// Timestamp at which the poll ends, i.e. all the responses with a
67 /// timestamp prior to this one should be taken into account
68 /// (and all the responses with a timestamp after this one
69 /// should be dropped).
70 end_date: MilliSecondsSinceUnixEpoch,
71 },
72
73 /// This is a reaction to another event.
74 Reaction {
75 /// The reaction "key" displayed by the client, often an emoji.
76 key: String,
77 /// Sender of the reaction.
78 sender: OwnedUserId,
79 /// Timestamp at which the reaction has been sent.
80 timestamp: MilliSecondsSinceUnixEpoch,
81 /// The send status of the reaction this is, with handles to abort it if
82 /// we can, etc.
83 reaction_status: ReactionStatus,
84 },
85}
86
87/// An aggregation is an event related to another event (for instance a
88/// reaction, a poll's response, etc.).
89///
90/// It can be either a local or a remote echo.
91#[derive(Clone, Debug)]
92pub(crate) struct Aggregation {
93 /// The kind of aggregation this represents.
94 pub kind: AggregationKind,
95
96 /// The own timeline identifier for an aggregation.
97 ///
98 /// It will be a transaction id when the aggregation is still a local echo,
99 /// and it will transition into an event id when the aggregation is a
100 /// remote echo (i.e. has been received in a sync response):
101 pub own_id: TimelineEventItemId,
102}
103
104/// Get the poll state from a given [`TimelineItemContent`].
105fn poll_state_from_item(
106 content: &mut TimelineItemContent,
107) -> Result<&mut PollState, AggregationError> {
108 match content {
109 TimelineItemContent::MsgLike(MsgLikeContent {
110 kind: MsgLikeKind::Poll(poll_state),
111 ..
112 }) => Ok(poll_state),
113 _ => Err(AggregationError::InvalidType {
114 expected: "a poll".to_owned(),
115 actual: content.debug_string().to_owned(),
116 }),
117 }
118}
119
120impl Aggregation {
121 /// Create a new [`Aggregation`].
122 pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
123 Self { kind, own_id }
124 }
125
126 /// Apply an aggregation in-place to a given [`TimelineItemContent`].
127 ///
128 /// In case of success, returns an enum indicating whether the applied
129 /// aggregation had an effect on the content; if it updated it, then the
130 /// caller has the responsibility to reflect that change.
131 ///
132 /// In case of error, returns an error detailing why the aggregation
133 /// couldn't be applied.
134 pub fn apply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
135 match &self.kind {
136 AggregationKind::PollResponse { sender, timestamp, answers } => {
137 let state = match poll_state_from_item(content) {
138 Ok(state) => state,
139 Err(err) => return ApplyAggregationResult::Error(err),
140 };
141 state.add_response(sender.clone(), *timestamp, answers.clone());
142 ApplyAggregationResult::UpdatedItem
143 }
144
145 AggregationKind::PollEnd { end_date } => {
146 let poll_state = match poll_state_from_item(content) {
147 Ok(state) => state,
148 Err(err) => return ApplyAggregationResult::Error(err),
149 };
150 if !poll_state.end(*end_date) {
151 return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
152 }
153 ApplyAggregationResult::UpdatedItem
154 }
155
156 AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
157 let Some(reactions) = content.reactions_mut() else {
158 // These items don't hold reactions.
159 return ApplyAggregationResult::LeftItemIntact;
160 };
161
162 let previous_reaction = reactions.entry(key.clone()).or_default().insert(
163 sender.clone(),
164 ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
165 );
166
167 let is_same = previous_reaction.is_some_and(|prev| {
168 prev.timestamp == *timestamp
169 && matches!(
170 (prev.status, reaction_status),
171 (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
172 | (
173 ReactionStatus::LocalToRemote(_),
174 ReactionStatus::LocalToRemote(_),
175 )
176 | (
177 ReactionStatus::RemoteToRemote(_),
178 ReactionStatus::RemoteToRemote(_),
179 )
180 )
181 });
182
183 if is_same {
184 ApplyAggregationResult::LeftItemIntact
185 } else {
186 ApplyAggregationResult::UpdatedItem
187 }
188 }
189 }
190 }
191
192 /// Undo an aggregation in-place to a given [`TimelineItemContent`].
193 ///
194 /// In case of success, returns an enum indicating whether unapplying the
195 /// aggregation had an effect on the content; if it updated it, then the
196 /// caller has the responsibility to reflect that change.
197 ///
198 /// In case of error, returns an error detailing why the aggregation
199 /// couldn't be unapplied.
200 pub fn unapply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
201 match &self.kind {
202 AggregationKind::PollResponse { sender, timestamp, .. } => {
203 let state = match poll_state_from_item(content) {
204 Ok(state) => state,
205 Err(err) => return ApplyAggregationResult::Error(err),
206 };
207 state.remove_response(sender, *timestamp);
208 ApplyAggregationResult::UpdatedItem
209 }
210
211 AggregationKind::PollEnd { .. } => {
212 // Assume we can't undo a poll end event at the moment.
213 ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
214 }
215
216 AggregationKind::Reaction { key, sender, .. } => {
217 let Some(reactions) = content.reactions_mut() else {
218 // An item that doesn't hold any reactions.
219 return ApplyAggregationResult::LeftItemIntact;
220 };
221
222 let by_user = reactions.get_mut(key);
223 let previous_entry = if let Some(by_user) = by_user {
224 let prev = by_user.swap_remove(sender);
225 // If this was the last reaction, remove the entire map for this key.
226 if by_user.is_empty() {
227 reactions.swap_remove(key);
228 }
229 prev
230 } else {
231 None
232 };
233
234 if previous_entry.is_some() {
235 ApplyAggregationResult::UpdatedItem
236 } else {
237 ApplyAggregationResult::LeftItemIntact
238 }
239 }
240 }
241 }
242}
243
244/// Manager for all known existing aggregations to all events in the timeline.
245#[derive(Clone, Debug, Default)]
246pub(crate) struct Aggregations {
247 /// Mapping of a target event to its list of aggregations.
248 related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
249
250 /// Mapping of a related event identifier to its target.
251 inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
252}
253
254impl Aggregations {
255 /// Clear all the known aggregations from all the mappings.
256 pub fn clear(&mut self) {
257 self.related_events.clear();
258 self.inverted_map.clear();
259 }
260
261 /// Add a given aggregation that relates to the [`TimelineItemContent`]
262 /// identified by the given [`TimelineEventItemId`].
263 pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
264 self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
265 self.related_events.entry(related_to).or_default().push(aggregation);
266 }
267
268 /// Is the given id one for a known aggregation to another event?
269 ///
270 /// If so, returns the target event identifier as well as the aggregation.
271 /// The aggregation must be unapplied on the corresponding timeline
272 /// item.
273 #[must_use]
274 pub fn try_remove_aggregation(
275 &mut self,
276 aggregation_id: &TimelineEventItemId,
277 ) -> Option<(&TimelineEventItemId, Aggregation)> {
278 let found = self.inverted_map.get(aggregation_id)?;
279
280 // Find and remove the aggregation in the other mapping.
281 let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
282 let removed = aggregations
283 .iter()
284 .position(|agg| agg.own_id == *aggregation_id)
285 .map(|idx| aggregations.remove(idx));
286
287 // If this was the last aggregation, remove the entry in the `related_events`
288 // mapping.
289 if aggregations.is_empty() {
290 self.related_events.remove(found);
291 }
292
293 removed
294 } else {
295 None
296 };
297
298 if aggregation.is_none() {
299 warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map.");
300 }
301
302 Some((found, aggregation?))
303 }
304
305 /// Apply all the aggregations to a [`TimelineItemContent`].
306 ///
307 /// Will return an error at the first aggregation that couldn't be applied;
308 /// see [`Aggregation::apply`] which explains under which conditions it can
309 /// happen.
310 pub fn apply(
311 &self,
312 item_id: &TimelineEventItemId,
313 content: &mut TimelineItemContent,
314 ) -> Result<(), AggregationError> {
315 let Some(aggregations) = self.related_events.get(item_id) else {
316 return Ok(());
317 };
318 for a in aggregations {
319 if let ApplyAggregationResult::Error(err) = a.apply(content) {
320 return Err(err);
321 }
322 }
323 Ok(())
324 }
325
326 /// Mark a target event as being sent (i.e. it transitions from an local
327 /// transaction id to its remote event id counterpart), by updating the
328 /// internal mappings.
329 pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
330 let from = TimelineEventItemId::TransactionId(txn_id);
331 let to = TimelineEventItemId::EventId(event_id);
332
333 // Update the aggregations in the `related_events` field.
334 if let Some(aggregations) = self.related_events.remove(&from) {
335 // Update the inverted mappings (from aggregation's id, to the new target id).
336 for a in &aggregations {
337 if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
338 debug_assert_eq!(prev_target, from);
339 self.inverted_map.insert(a.own_id.clone(), to.clone());
340 }
341 }
342 // Update the direct mapping of target -> aggregations.
343 self.related_events.entry(to).or_default().extend(aggregations);
344 }
345 }
346
347 /// Mark an aggregation event as being sent (i.e. it transitions from an
348 /// local transaction id to its remote event id counterpart), by
349 /// updating the internal mappings.
350 ///
351 /// When an aggregation has been marked as sent, it may need to be reapplied
352 /// to the corresponding [`TimelineItemContent`]; in this case, a
353 /// [`MarkAggregationSentResult::MarkedSent`] result with a set `update`
354 /// will be returned, and must be applied.
355 pub fn mark_aggregation_as_sent(
356 &mut self,
357 txn_id: OwnedTransactionId,
358 event_id: OwnedEventId,
359 ) -> MarkAggregationSentResult {
360 let from = TimelineEventItemId::TransactionId(txn_id);
361 let to = TimelineEventItemId::EventId(event_id.clone());
362
363 let Some(target) = self.inverted_map.remove(&from) else {
364 return MarkAggregationSentResult::NotFound;
365 };
366
367 let mut target_and_new_aggregation = MarkAggregationSentResult::MarkedSent { update: None };
368
369 if let Some(aggregations) = self.related_events.get_mut(&target) {
370 if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) {
371 found.own_id = to.clone();
372
373 match &mut found.kind {
374 AggregationKind::PollResponse { .. } | AggregationKind::PollEnd { .. } => {
375 // Nothing particular to do.
376 }
377 AggregationKind::Reaction { reaction_status, .. } => {
378 // Mark the reaction as becoming remote, and signal that update to the
379 // caller.
380 *reaction_status = ReactionStatus::RemoteToRemote(event_id);
381 target_and_new_aggregation = MarkAggregationSentResult::MarkedSent {
382 update: Some((target.clone(), found.clone())),
383 };
384 }
385 }
386 }
387 }
388
389 self.inverted_map.insert(to, target);
390
391 target_and_new_aggregation
392 }
393}
394
395/// Find an item identified by the target identifier, and apply the aggregation
396/// onto it.
397///
398/// Returns whether the aggregation has been applied or not (so as to increment
399/// a number of updated result, for instance).
400pub(crate) fn find_item_and_apply_aggregation(
401 items: &mut ObservableItemsTransaction<'_>,
402 target: &TimelineEventItemId,
403 aggregation: Aggregation,
404) -> bool {
405 let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
406 trace!("couldn't find aggregation's target {target:?}");
407 return false;
408 };
409
410 let mut new_content = event_item.content().clone();
411
412 match aggregation.apply(&mut new_content) {
413 ApplyAggregationResult::UpdatedItem => {
414 trace!("applied aggregation");
415 let new_item = event_item.with_content(new_content);
416 items.replace(idx, TimelineItem::new(new_item, event_item.internal_id.to_owned()));
417 true
418 }
419 ApplyAggregationResult::LeftItemIntact => {
420 trace!("applying the aggregation had no effect");
421 false
422 }
423 ApplyAggregationResult::Error(err) => {
424 warn!("error when applying aggregation: {err}");
425 false
426 }
427 }
428}
429
430/// The result of marking an aggregation as sent.
431pub(crate) enum MarkAggregationSentResult {
432 /// The aggregation has been found, and marked as sent.
433 ///
434 /// Optionally, it can include an [`Aggregation`] `update` to the matching
435 /// [`TimelineItemContent`] item identified by the [`TimelineEventItemId`].
436 MarkedSent { update: Option<(TimelineEventItemId, Aggregation)> },
437 /// The aggregation was unknown to the aggregations manager, aka not found.
438 NotFound,
439}
440
441/// The result of applying (or unapplying) an aggregation onto a timeline item.
442pub(crate) enum ApplyAggregationResult {
443 /// The item has been updated after applying the aggregation.
444 UpdatedItem,
445
446 /// The item hasn't been modified after applying the aggregation, because it
447 /// was likely already applied prior to this.
448 LeftItemIntact,
449
450 /// An error happened while applying the aggregation.
451 Error(AggregationError),
452}
453
454#[derive(Debug, thiserror::Error)]
455pub(crate) enum AggregationError {
456 #[error("trying to end a poll twice")]
457 PollAlreadyEnded,
458
459 #[error("a poll end can't be unapplied")]
460 CantUndoPollEnd,
461
462 #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
463 InvalidType { expected: String, actual: String },
464}