matrix_sdk/event_cache/caches/room/updates.rs
1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use matrix_sdk_base::{
18 deserialized_responses::AmbiguityChange,
19 event_cache::{Event, Gap},
20 linked_chunk::{self, OwnedLinkedChunkId},
21};
22use ruma::{OwnedEventId, OwnedRoomId, events::AnySyncEphemeralRoomEvent, serde::Raw};
23use tokio::sync::broadcast::{Receiver, Sender};
24
25use super::super::TimelineVectorDiffs;
26
27/// An update related to events happened in a room.
28#[derive(Debug, Clone)]
29pub enum RoomEventCacheUpdate {
30 /// The fully read marker has moved to a different event.
31 MoveReadMarkerTo {
32 /// Event at which the read marker is now pointing.
33 event_id: OwnedEventId,
34 },
35
36 /// The members have changed.
37 UpdateMembers {
38 /// Collection of ambiguity changes that room member events trigger.
39 ///
40 /// This is a map of event ID of the `m.room.member` event to the
41 /// details of the ambiguity change.
42 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
43 },
44
45 /// The room has received updates for the timeline as _diffs_.
46 UpdateTimelineEvents(TimelineVectorDiffs),
47
48 /// The room has received new ephemeral events.
49 AddEphemeralEvents {
50 /// XXX: this is temporary, until read receipts are handled in the event
51 /// cache
52 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
53 },
54}
55
56/// Represents a timeline update of a room. It hides the details of
57/// [`RoomEventCacheUpdate`] by being more generic.
58///
59/// This is used by [`EventCache::subscribe_to_room_generic_updates`][0]. Please
60/// read it to learn more about the motivation behind this type.
61///
62/// [0]: super::super::super::EventCache::subscribe_to_room_generic_updates
63#[derive(Clone, Debug)]
64pub struct RoomEventCacheGenericUpdate {
65 /// The room ID owning the timeline.
66 pub room_id: OwnedRoomId,
67}
68
69/// An update being triggered when events change in the persisted event cache
70/// for any room.
71#[derive(Clone, Debug)]
72pub struct RoomEventCacheLinkedChunkUpdate {
73 /// The linked chunk affected by the update.
74 pub linked_chunk_id: OwnedLinkedChunkId,
75
76 /// A vector of all the linked chunk updates that happened during this event
77 /// cache update.
78 pub updates: Vec<linked_chunk::Update<Event, Gap>>,
79}
80
81impl RoomEventCacheLinkedChunkUpdate {
82 /// Return all the new events propagated by this update, in topological
83 /// order.
84 pub fn events(self) -> impl DoubleEndedIterator<Item = Event> {
85 use itertools::Either;
86 self.updates.into_iter().flat_map(|update| match update {
87 linked_chunk::Update::PushItems { items, .. } => {
88 Either::Left(Either::Left(items.into_iter()))
89 }
90 linked_chunk::Update::ReplaceItem { item, .. } => {
91 Either::Left(Either::Right(std::iter::once(item)))
92 }
93 linked_chunk::Update::RemoveItem { .. }
94 | linked_chunk::Update::DetachLastItems { .. }
95 | linked_chunk::Update::StartReattachItems
96 | linked_chunk::Update::EndReattachItems
97 | linked_chunk::Update::NewItemsChunk { .. }
98 | linked_chunk::Update::NewGapChunk { .. }
99 | linked_chunk::Update::RemoveChunk(..)
100 | linked_chunk::Update::Clear => {
101 // All these updates don't contain any new event.
102 Either::Right(std::iter::empty())
103 }
104 })
105 }
106}
107
108/// A small type to send updates in all channels.
109#[derive(Clone)]
110pub struct RoomEventCacheUpdateSender {
111 room_sender: Sender<RoomEventCacheUpdate>,
112 generic_sender: Sender<RoomEventCacheGenericUpdate>,
113}
114
115impl RoomEventCacheUpdateSender {
116 /// Create a new [`RoomEventCacheUpdateSender`].
117 pub fn new(generic_sender: Sender<RoomEventCacheGenericUpdate>) -> Self {
118 Self { room_sender: Sender::new(32), generic_sender }
119 }
120
121 /// Send a [`RoomEventCacheUpdate`] and an optional
122 /// [`RoomEventCacheGenericUpdate`].
123 pub fn send(
124 &self,
125 room_update: RoomEventCacheUpdate,
126 generic_update: Option<RoomEventCacheGenericUpdate>,
127 ) {
128 let _ = self.room_sender.send(room_update);
129
130 if let Some(generic_update) = generic_update {
131 let _ = self.generic_sender.send(generic_update);
132 }
133 }
134
135 /// Create a new [`Receiver`] of [`RoomEventCacheUpdate`].
136 pub(super) fn new_room_receiver(&self) -> Receiver<RoomEventCacheUpdate> {
137 self.room_sender.subscribe()
138 }
139}