matrix_sdk_ui/timeline/controller/aggregations.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An aggregation manager for the timeline.
16//!
17//! An aggregation is an event that relates to another event: for instance, a
18//! reaction, a poll response, and so on and so forth.
19//!
20//! Because of the sync mechanisms and federation, it can happen that a related
21//! event is received *before* receiving the event it relates to. Those events
22//! must be accounted for, stashed somewhere, and reapplied later, if/when the
23//! related-to event shows up.
24//!
25//! In addition to that, a room's event cache can also decide to move events
26//! around, in its own internal representation (likely because it ran into some
27//! duplicate events). When that happens, a timeline opened on the given room
28//! will see a removal then re-insertion of the given event. If that event was
29//! the target of aggregations, then those aggregations must be re-applied when
30//! the given event is reinserted.
31//!
32//! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33//! by this module will take care of memoizing aggregations, for the entire
34//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35//! caller). Aggregations are saved in memory, and have the same lifetime as
36//! that of a timeline. This makes it possible to apply pending aggregations
37//! to cater for the first use case, and to never lose any aggregations in the
38//! second use case.
39
40use std::collections::HashMap;
41
42use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId};
43use tracing::{trace, warn};
44
45use super::{rfind_event_by_item_id, ObservableItemsTransaction};
46use crate::timeline::{
47 PollState, ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent,
48};
49
50/// Which kind of aggregation (related event) is this?
51#[derive(Clone, Debug)]
52pub(crate) enum AggregationKind {
53 /// This is a response to a poll.
54 PollResponse {
55 /// Sender of the poll's response.
56 sender: OwnedUserId,
57 /// Timestamp at which the response has beens ent.
58 timestamp: MilliSecondsSinceUnixEpoch,
59 /// All the answers to the poll sent by the sender.
60 answers: Vec<String>,
61 },
62
63 /// This is the marker of the end of a poll.
64 PollEnd {
65 /// Timestamp at which the poll ends, i.e. all the responses with a
66 /// timestamp prior to this one should be taken into account
67 /// (and all the responses with a timestamp after this one
68 /// should be dropped).
69 end_date: MilliSecondsSinceUnixEpoch,
70 },
71
72 /// This is a reaction to another event.
73 Reaction {
74 /// The reaction "key" displayed by the client, often an emoji.
75 key: String,
76 /// Sender of the reaction.
77 sender: OwnedUserId,
78 /// Timestamp at which the reaction has been sent.
79 timestamp: MilliSecondsSinceUnixEpoch,
80 /// The send status of the reaction this is, with handles to abort it if
81 /// we can, etc.
82 reaction_status: ReactionStatus,
83 },
84}
85
86/// An aggregation is an event related to another event (for instance a
87/// reaction, a poll's response, etc.).
88///
89/// It can be either a local or a remote echo.
90#[derive(Clone, Debug)]
91pub(crate) struct Aggregation {
92 /// The kind of aggregation this represents.
93 pub kind: AggregationKind,
94
95 /// The own timeline identifier for an aggregation.
96 ///
97 /// It will be a transaction id when the aggregation is still a local echo,
98 /// and it will transition into an event id when the aggregation is a
99 /// remote echo (i.e. has been received in a sync response):
100 pub own_id: TimelineEventItemId,
101}
102
103/// Get the poll state from a given [`TimelineItemContent`].
104fn poll_state_from_item(
105 content: &mut TimelineItemContent,
106) -> Result<&mut PollState, AggregationError> {
107 match content {
108 TimelineItemContent::Poll(poll_state) => Ok(poll_state),
109 _ => Err(AggregationError::InvalidType {
110 expected: "a poll".to_owned(),
111 actual: content.debug_string().to_owned(),
112 }),
113 }
114}
115
116impl Aggregation {
117 /// Create a new [`Aggregation`].
118 pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self {
119 Self { kind, own_id }
120 }
121
122 /// Apply an aggregation in-place to a given [`TimelineItemContent`].
123 ///
124 /// In case of success, returns an enum indicating whether the applied
125 /// aggregation had an effect on the content; if it updated it, then the
126 /// caller has the responsibility to reflect that change.
127 ///
128 /// In case of error, returns an error detailing why the aggregation
129 /// couldn't be applied.
130 pub fn apply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
131 match &self.kind {
132 AggregationKind::PollResponse { sender, timestamp, answers } => {
133 let state = match poll_state_from_item(content) {
134 Ok(state) => state,
135 Err(err) => return ApplyAggregationResult::Error(err),
136 };
137 state.add_response(sender.clone(), *timestamp, answers.clone());
138 ApplyAggregationResult::UpdatedItem
139 }
140
141 AggregationKind::PollEnd { end_date } => {
142 let poll_state = match poll_state_from_item(content) {
143 Ok(state) => state,
144 Err(err) => return ApplyAggregationResult::Error(err),
145 };
146 if !poll_state.end(*end_date) {
147 return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
148 }
149 ApplyAggregationResult::UpdatedItem
150 }
151
152 AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
153 let Some(reactions) = content.reactions_mut() else {
154 // These items don't hold reactions.
155 return ApplyAggregationResult::LeftItemIntact;
156 };
157
158 let previous_reaction = reactions.entry(key.clone()).or_default().insert(
159 sender.clone(),
160 ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
161 );
162
163 let is_same = previous_reaction.is_some_and(|prev| {
164 prev.timestamp == *timestamp
165 && matches!(
166 (prev.status, reaction_status),
167 (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_))
168 | (
169 ReactionStatus::LocalToRemote(_),
170 ReactionStatus::LocalToRemote(_),
171 )
172 | (
173 ReactionStatus::RemoteToRemote(_),
174 ReactionStatus::RemoteToRemote(_),
175 )
176 )
177 });
178
179 if is_same {
180 ApplyAggregationResult::LeftItemIntact
181 } else {
182 ApplyAggregationResult::UpdatedItem
183 }
184 }
185 }
186 }
187
188 /// Undo an aggregation in-place to a given [`TimelineItemContent`].
189 ///
190 /// In case of success, returns an enum indicating whether unapplying the
191 /// aggregation had an effect on the content; if it updated it, then the
192 /// caller has the responsibility to reflect that change.
193 ///
194 /// In case of error, returns an error detailing why the aggregation
195 /// couldn't be unapplied.
196 pub fn unapply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
197 match &self.kind {
198 AggregationKind::PollResponse { sender, timestamp, .. } => {
199 let state = match poll_state_from_item(content) {
200 Ok(state) => state,
201 Err(err) => return ApplyAggregationResult::Error(err),
202 };
203 state.remove_response(sender, *timestamp);
204 ApplyAggregationResult::UpdatedItem
205 }
206
207 AggregationKind::PollEnd { .. } => {
208 // Assume we can't undo a poll end event at the moment.
209 ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd)
210 }
211
212 AggregationKind::Reaction { key, sender, .. } => {
213 let Some(reactions) = content.reactions_mut() else {
214 // An item that doesn't hold any reactions.
215 return ApplyAggregationResult::LeftItemIntact;
216 };
217
218 let by_user = reactions.get_mut(key);
219 let previous_entry = if let Some(by_user) = by_user {
220 let prev = by_user.swap_remove(sender);
221 // If this was the last reaction, remove the entire map for this key.
222 if by_user.is_empty() {
223 reactions.swap_remove(key);
224 }
225 prev
226 } else {
227 None
228 };
229
230 if previous_entry.is_some() {
231 ApplyAggregationResult::UpdatedItem
232 } else {
233 ApplyAggregationResult::LeftItemIntact
234 }
235 }
236 }
237 }
238}
239
240/// Manager for all known existing aggregations to all events in the timeline.
241#[derive(Clone, Debug, Default)]
242pub(crate) struct Aggregations {
243 /// Mapping of a target event to its list of aggregations.
244 related_events: HashMap<TimelineEventItemId, Vec<Aggregation>>,
245
246 /// Mapping of a related event identifier to its target.
247 inverted_map: HashMap<TimelineEventItemId, TimelineEventItemId>,
248}
249
250impl Aggregations {
251 /// Clear all the known aggregations from all the mappings.
252 pub fn clear(&mut self) {
253 self.related_events.clear();
254 self.inverted_map.clear();
255 }
256
257 /// Add a given aggregation that relates to the [`TimelineItemContent`]
258 /// identified by the given [`TimelineEventItemId`].
259 pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) {
260 self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone());
261 self.related_events.entry(related_to).or_default().push(aggregation);
262 }
263
264 /// Is the given id one for a known aggregation to another event?
265 ///
266 /// If so, returns the target event identifier as well as the aggregation.
267 /// The aggregation must be unapplied on the corresponding timeline
268 /// item.
269 #[must_use]
270 pub fn try_remove_aggregation(
271 &mut self,
272 aggregation_id: &TimelineEventItemId,
273 ) -> Option<(&TimelineEventItemId, Aggregation)> {
274 let found = self.inverted_map.get(aggregation_id)?;
275
276 // Find and remove the aggregation in the other mapping.
277 let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) {
278 let removed = aggregations
279 .iter()
280 .position(|agg| agg.own_id == *aggregation_id)
281 .map(|idx| aggregations.remove(idx));
282
283 // If this was the last aggregation, remove the entry in the `related_events`
284 // mapping.
285 if aggregations.is_empty() {
286 self.related_events.remove(found);
287 }
288
289 removed
290 } else {
291 None
292 };
293
294 if aggregation.is_none() {
295 warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map.");
296 }
297
298 Some((found, aggregation?))
299 }
300
301 /// Apply all the aggregations to a [`TimelineItemContent`].
302 ///
303 /// Will return an error at the first aggregation that couldn't be applied;
304 /// see [`Aggregation::apply`] which explains under which conditions it can
305 /// happen.
306 pub fn apply(
307 &self,
308 item_id: &TimelineEventItemId,
309 content: &mut TimelineItemContent,
310 ) -> Result<(), AggregationError> {
311 let Some(aggregations) = self.related_events.get(item_id) else {
312 return Ok(());
313 };
314 for a in aggregations {
315 if let ApplyAggregationResult::Error(err) = a.apply(content) {
316 return Err(err);
317 }
318 }
319 Ok(())
320 }
321
322 /// Mark a target event as being sent (i.e. it transitions from an local
323 /// transaction id to its remote event id counterpart), by updating the
324 /// internal mappings.
325 pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) {
326 let from = TimelineEventItemId::TransactionId(txn_id);
327 let to = TimelineEventItemId::EventId(event_id);
328
329 // Update the aggregations in the `related_events` field.
330 if let Some(aggregations) = self.related_events.remove(&from) {
331 // Update the inverted mappings (from aggregation's id, to the new target id).
332 for a in &aggregations {
333 if let Some(prev_target) = self.inverted_map.remove(&a.own_id) {
334 debug_assert_eq!(prev_target, from);
335 self.inverted_map.insert(a.own_id.clone(), to.clone());
336 }
337 }
338 // Update the direct mapping of target -> aggregations.
339 self.related_events.entry(to).or_default().extend(aggregations);
340 }
341 }
342
343 /// Mark an aggregation event as being sent (i.e. it transitions from an
344 /// local transaction id to its remote event id counterpart), by
345 /// updating the internal mappings.
346 ///
347 /// When an aggregation has been marked as sent, it may need to be reapplied
348 /// to the corresponding [`TimelineItemContent`]; in this case, a
349 /// [`MarkAggregationSentResult::MarkedSent`] result with a set `update`
350 /// will be returned, and must be applied.
351 pub fn mark_aggregation_as_sent(
352 &mut self,
353 txn_id: OwnedTransactionId,
354 event_id: OwnedEventId,
355 ) -> MarkAggregationSentResult {
356 let from = TimelineEventItemId::TransactionId(txn_id);
357 let to = TimelineEventItemId::EventId(event_id.clone());
358
359 let Some(target) = self.inverted_map.remove(&from) else {
360 return MarkAggregationSentResult::NotFound;
361 };
362
363 let mut target_and_new_aggregation = MarkAggregationSentResult::MarkedSent { update: None };
364
365 if let Some(aggregations) = self.related_events.get_mut(&target) {
366 if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) {
367 found.own_id = to.clone();
368
369 match &mut found.kind {
370 AggregationKind::PollResponse { .. } | AggregationKind::PollEnd { .. } => {
371 // Nothing particular to do.
372 }
373 AggregationKind::Reaction { reaction_status, .. } => {
374 // Mark the reaction as becoming remote, and signal that update to the
375 // caller.
376 *reaction_status = ReactionStatus::RemoteToRemote(event_id);
377 target_and_new_aggregation = MarkAggregationSentResult::MarkedSent {
378 update: Some((target.clone(), found.clone())),
379 };
380 }
381 }
382 }
383 }
384
385 self.inverted_map.insert(to, target);
386
387 target_and_new_aggregation
388 }
389}
390
391/// Find an item identified by the target identifier, and apply the aggregation
392/// onto it.
393///
394/// Returns whether the aggregation has been applied or not (so as to increment
395/// a number of updated result, for instance).
396pub(crate) fn find_item_and_apply_aggregation(
397 items: &mut ObservableItemsTransaction<'_>,
398 target: &TimelineEventItemId,
399 aggregation: Aggregation,
400) -> bool {
401 let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else {
402 trace!("couldn't find aggregation's target {target:?}");
403 return false;
404 };
405
406 let mut new_content = event_item.content().clone();
407
408 match aggregation.apply(&mut new_content) {
409 ApplyAggregationResult::UpdatedItem => {
410 trace!("applied aggregation");
411 let new_item = event_item.with_content(new_content);
412 items.replace(idx, TimelineItem::new(new_item, event_item.internal_id.to_owned()));
413 true
414 }
415 ApplyAggregationResult::LeftItemIntact => {
416 trace!("applying the aggregation had no effect");
417 false
418 }
419 ApplyAggregationResult::Error(err) => {
420 warn!("error when applying aggregation: {err}");
421 false
422 }
423 }
424}
425
426/// The result of marking an aggregation as sent.
427pub(crate) enum MarkAggregationSentResult {
428 /// The aggregation has been found, and marked as sent.
429 ///
430 /// Optionally, it can include an [`Aggregation`] `update` to the matching
431 /// [`TimelineItemContent`] item identified by the [`TimelineEventItemId`].
432 MarkedSent { update: Option<(TimelineEventItemId, Aggregation)> },
433 /// The aggregation was unknown to the aggregations manager, aka not found.
434 NotFound,
435}
436
437/// The result of applying (or unapplying) an aggregation onto a timeline item.
438pub(crate) enum ApplyAggregationResult {
439 /// The item has been updated after applying the aggregation.
440 UpdatedItem,
441
442 /// The item hasn't been modified after applying the aggregation, because it
443 /// was likely already applied prior to this.
444 LeftItemIntact,
445
446 /// An error happened while applying the aggregation.
447 Error(AggregationError),
448}
449
450#[derive(Debug, thiserror::Error)]
451pub(crate) enum AggregationError {
452 #[error("trying to end a poll twice")]
453 PollAlreadyEnded,
454
455 #[error("a poll end can't be unapplied")]
456 CantUndoPollEnd,
457
458 #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")]
459 InvalidType { expected: String, actual: String },
460}