Skip to main content

matrix_sdk/event_cache/caches/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::HashMap, ops::Deref, sync::Arc};
16
17use eyeball::SharedObservable;
18use eyeball_im::VectorDiff;
19use matrix_sdk_base::{
20    ThreadingSupport,
21    event_cache::Event,
22    linked_chunk::Position,
23    sync::{JoinedRoomUpdate, LeftRoomUpdate},
24};
25use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules};
26use tokio::sync::{
27    OnceCell, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc,
28};
29
30use super::{
31    EventCacheError, EventsOrigin, Result, automatic_pagination::AutomaticPagination, states,
32};
33use crate::{client::WeakClient, room::WeakRoom};
34
35mod aggregator;
36pub mod event_focused;
37pub mod event_linked_chunk;
38pub mod pagination;
39pub mod pinned_events;
40mod read_receipts;
41pub mod room;
42pub mod thread;
43
44/// A type to hold all the caches for a given room.
45#[derive(Debug)]
46pub(super) struct Caches {
47    /// The one and only [`RoomEventCache`].
48    ///
49    /// [`RoomEventCache`]: room::RoomEventCache
50    pub room: room::RoomEventCache,
51
52    /// All the lazily-loaded [`ThreadEventCache`].
53    ///
54    /// [`ThreadEventCache`]: thread::ThreadEventCache
55    // An `Arc` is used to get an owned lock.
56    pub threads: Arc<RwLock<HashMap<OwnedEventId, thread::ThreadEventCache>>>,
57
58    /// The one and only [`PinnedEventsCache`].
59    ///
60    /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache
61    pub pinned_events: OnceCell<pinned_events::PinnedEventsCache>,
62
63    /// All the lazily-loaded [`EventFocusedCache`].
64    ///
65    /// [`EventFocusedCache`]: event_focused::EventFocusedCache
66    // An `Arc` is used to get an owned lock.
67    pub event_focused:
68        Arc<RwLock<HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>>>,
69
70    /// Internals data, used to lazily create caches.
71    internals: CachesInternals,
72}
73
74#[derive(Debug)]
75struct CachesInternals {
76    state: states::StateLock,
77    linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
78    room_version_rules: RoomVersionRules,
79}
80
81impl Caches {
82    /// Create a new [`Caches`].
83    pub async fn new(
84        weak_client: &WeakClient,
85        room_id: &RoomId,
86        generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
87        linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
88        auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
89        state: &states::StateLock,
90        automatic_pagination: Option<AutomaticPagination>,
91    ) -> Result<Self> {
92        let Some(client) = weak_client.get() else {
93            return Err(EventCacheError::ClientDropped);
94        };
95
96        let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
97
98        let room = client
99            .get_room(room_id)
100            .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
101        let room_version_rules = room.clone_info().room_version_rules_or_default();
102
103        let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
104            hit_timeline_start: false,
105        });
106
107        let enabled_thread_support =
108            matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
109
110        let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
111
112        let own_user_id =
113            client.user_id().expect("the user must be logged in, at this point").to_owned();
114
115        let room_state = state
116            .try_insert_once_with(
117                states::selectors::RoomStateSelector::new(room_id.to_owned()),
118                |store_guard| {
119                    room::RoomEventCacheState::new(
120                        own_user_id.clone(),
121                        room_id.to_owned(),
122                        weak_room.clone(),
123                        room_version_rules.clone(),
124                        enabled_thread_support,
125                        update_sender.clone(),
126                        linked_chunk_update_sender.clone(),
127                        store_guard,
128                        pagination_status.clone(),
129                        automatic_pagination,
130                    )
131                },
132            )
133            .await?;
134
135        let timeline_is_not_empty =
136            room_state.read().await?.room_linked_chunk().revents().next().is_some();
137
138        let room_event_cache = room::RoomEventCache::new(
139            room_id.to_owned(),
140            weak_room,
141            own_user_id,
142            room_state,
143            pagination_status,
144            auto_shrink_sender,
145            update_sender,
146        );
147
148        // If at least one event has been loaded, it means there is a timeline. Let's
149        // emit a generic update.
150        if timeline_is_not_empty {
151            let _ = generic_update_sender
152                .send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
153        }
154
155        Ok(Self {
156            room: room_event_cache,
157            threads: Arc::new(RwLock::new(HashMap::new())),
158            pinned_events: OnceCell::new(),
159            event_focused: Arc::new(RwLock::new(HashMap::new())),
160            internals: CachesInternals {
161                state: state.clone(),
162                linked_chunk_update_sender,
163                room_version_rules,
164            },
165        })
166    }
167
168    /// Get the [`RoomEventCache`].
169    ///
170    /// [`RoomEventCache`]: room::RoomEventCache
171    pub fn room(&self) -> &room::RoomEventCache {
172        &self.room
173    }
174
175    /// Get or create a [`ThreadEventCache`].
176    ///
177    /// Note: it is impossible to know if `thread_id` represents a valid thread
178    /// identifier. It means it's possible to create a [`ThreadEventCache`] for
179    /// an event that is not a thread root.
180    ///
181    /// [`ThreadEventCache`]: thread::ThreadEventCache
182    pub async fn thread(
183        &self,
184        thread_id: OwnedEventId,
185    ) -> Result<
186        OwnedRwLockReadGuard<
187            HashMap<OwnedEventId, thread::ThreadEventCache>,
188            thread::ThreadEventCache,
189        >,
190    > {
191        Ok(
192            match OwnedRwLockWriteGuard::try_downgrade_map(
193                self.threads.clone().write_owned().await,
194                |threads| threads.get(&thread_id),
195            ) {
196                // Thread exists.
197                Ok(locked_cache) => locked_cache,
198                // Thread does not exist, let's create it.
199                Err(mut threads) => {
200                    let room = &self.room;
201                    let cache = thread::ThreadEventCache::new(
202                        room.room_id().to_owned(),
203                        thread_id.clone(),
204                        room.own_user_id().to_owned(),
205                        self.internals.room_version_rules.clone(),
206                        room.weak_room().to_owned(),
207                        &self.internals.state,
208                        room.update_sender().generic_update_sender().clone(),
209                        self.internals.linked_chunk_update_sender.clone(),
210                    )
211                    .await?;
212
213                    threads.insert(thread_id.clone(), cache);
214
215                    OwnedRwLockWriteGuard::downgrade_map(threads, |threads| {
216                        threads.get(&thread_id).unwrap()
217                    })
218                }
219            },
220        )
221    }
222
223    /// Get or create a [`PinnedEventsCache`].
224    ///
225    /// [`PinnedEventsCache`]: pinned_events::PinnedEventsCache
226    pub async fn pinned_events(&self) -> Result<&pinned_events::PinnedEventsCache> {
227        self.pinned_events
228            .get_or_try_init(|| {
229                pinned_events::PinnedEventsCache::new(
230                    self.room.weak_room(),
231                    self.room.own_user_id().clone(),
232                    self.internals.room_version_rules.clone(),
233                    self.internals.linked_chunk_update_sender.clone(),
234                    &self.internals.state,
235                )
236            })
237            .await
238    }
239
240    /// Get or create a [`EventFocusedCache`].
241    ///
242    /// [`EventFocusedCache`]: event_focused::EventFocusedCache
243    pub async fn event_focused(
244        &self,
245        event_id: OwnedEventId,
246        thread_mode: event_focused::EventFocusThreadMode,
247        number_of_initial_events: u16,
248    ) -> Result<
249        OwnedRwLockReadGuard<
250            HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>,
251            event_focused::EventFocusedCache,
252        >,
253    > {
254        let key = event_focused::EventFocusedCacheKey { focused_event_id: event_id, thread_mode };
255
256        Ok(
257            match OwnedRwLockWriteGuard::try_downgrade_map(
258                self.event_focused.clone().write_owned().await,
259                |event_focused_caches| event_focused_caches.get(&key),
260            ) {
261                // Event-focused cache exists.
262                Ok(locked_cache) => locked_cache,
263                // Event-focused cache does not exist, let's create it.
264                Err(mut event_focused_caches) => {
265                    let cache = event_focused::EventFocusedCache::new(
266                        self.room.weak_room().clone(),
267                        key.clone(),
268                        &self.internals.state,
269                        self.internals.linked_chunk_update_sender.clone(),
270                    )
271                    .await?;
272                    cache.start_from(number_of_initial_events, thread_mode).await?;
273
274                    event_focused_caches.insert(key.clone(), cache);
275
276                    OwnedRwLockWriteGuard::downgrade_map(
277                        event_focused_caches,
278                        |event_focused_caches| event_focused_caches.get(&key).unwrap(),
279                    )
280                }
281            },
282        )
283    }
284
285    /// Update all the event caches with a [`JoinedRoomUpdate`].
286    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
287        let Self { room, threads, pinned_events, event_focused, internals } = &self;
288
289        // Room.
290        {
291            let mut updates = updates.clone();
292            updates.timeline = aggregator::aggregate_timeline_for_room(updates.timeline);
293
294            room.handle_joined_room_update(updates).await?;
295        }
296
297        // Threads.
298        {
299            let mut updates = updates.clone();
300            updates.account_data.clear();
301            updates.ambiguity_changes.clear();
302
303            let timeline_for_threads = aggregator::aggregate_timeline_for_threads(
304                &updates.timeline,
305                threads.read().await.deref(),
306                room.state().read().await?,
307                &internals.room_version_rules.redaction,
308            )
309            .await?;
310
311            for (thread_id, timeline) in timeline_for_threads {
312                let mut updates = updates.clone();
313                updates.timeline = timeline;
314
315                let thread = self.thread(thread_id).await?;
316                thread.handle_joined_room_update(updates).await?;
317
318                let new_thread_summary =
319                    thread.state().read().await?.compute_thread_summary().await?;
320
321                room.update_thread_summary(thread.thread_id(), new_thread_summary).await?;
322            }
323        }
324
325        // Pinned-events.
326        if let Some(pinned_events) = pinned_events.get() {
327            let mut updates = updates.clone();
328            updates.timeline = aggregator::aggregate_timeline_for_pinned_events(
329                &updates.timeline,
330                &pinned_events.state().read().await?.current_event_ids(),
331                &internals.room_version_rules.redaction,
332            );
333
334            pinned_events.handle_joined_room_update(updates).await?;
335        }
336
337        // Event-focused.
338        {
339            // An event-focused cache isn't listening to live update. Consequently, it is
340            // not interested by this kind of update.
341            let _ = event_focused;
342        }
343
344        Ok(())
345    }
346
347    /// Update all the event caches with a [`LeftRoomUpdate`].
348    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
349        let Self { room, threads, pinned_events, event_focused, internals } = &self;
350
351        // Room.
352        {
353            let mut updates = updates.clone();
354            updates.timeline = aggregator::aggregate_timeline_for_room(updates.timeline);
355
356            room.handle_left_room_update(updates).await?;
357        }
358
359        // Threads.
360        {
361            let mut updates = updates.clone();
362            updates.account_data.clear();
363            updates.ambiguity_changes.clear();
364
365            let timeline_for_threads = aggregator::aggregate_timeline_for_threads(
366                &updates.timeline,
367                threads.read().await.deref(),
368                room.state().read().await?,
369                &internals.room_version_rules.redaction,
370            )
371            .await?;
372
373            for (thread_id, timeline) in timeline_for_threads {
374                let mut updates = updates.clone();
375                updates.timeline = timeline;
376
377                let thread = self.thread(thread_id).await?;
378                thread.handle_left_room_update(updates).await?;
379
380                let new_thread_summary =
381                    thread.state().read().await?.compute_thread_summary().await?;
382
383                room.update_thread_summary(thread.thread_id(), new_thread_summary).await?;
384            }
385        }
386
387        // Pinned-events.
388        if let Some(pinned_events) = pinned_events.get() {
389            let mut updates = updates.clone();
390            updates.timeline = aggregator::aggregate_timeline_for_pinned_events(
391                &updates.timeline,
392                &pinned_events.state().read().await?.current_event_ids(),
393                &internals.room_version_rules.redaction,
394            );
395
396            pinned_events.handle_left_room_update(updates).await?;
397        }
398
399        // Event-focused.
400        {
401            // An event-focused cache isn't listening to live update. Consequently, it is
402            // not interested by this kind of update.
403            let _ = event_focused;
404        }
405
406        Ok(())
407    }
408
409    /// Get all in-memory events from all the event caches managed by this
410    /// [`Caches`].
411    ///
412    /// Events can be duplicated if present in different event caches.
413    #[cfg(feature = "e2e-encryption")]
414    pub async fn all_in_memory_events(&self) -> Result<impl Iterator<Item = Event>> {
415        // We have to fetch events from all the caches.
416        //
417        // The room cache contains all the room events + the thread events + the
418        // pinned-events.
419        let mut events = self.room.events().await?;
420
421        // The last cache is the events from the event-focused cache.
422        {
423            let event_focused = self.event_focused.read().await;
424
425            for event_focused in event_focused.values() {
426                events.extend(event_focused.events().await?);
427            }
428        }
429
430        Ok(events.into_iter())
431    }
432
433    /// Get all encrypted events from all the event caches managed by this
434    /// [`Caches`].
435    ///
436    /// The `event_type` represents the type of the event to filter by.
437    /// The `session_id` represents the unique ID of the room key that was used
438    /// to encrypt the event
439    ///
440    /// Events can be duplicated if present in different event caches.
441    #[cfg(feature = "e2e-encryption")]
442    pub async fn all_events_of_type(
443        &self,
444        event_type: Option<&str>,
445        session_id: Option<&str>,
446    ) -> Result<impl Iterator<Item = Event>> {
447        // All caches store their events in the store except one. Let's start by looking
448        // inside the store.
449        let mut events = {
450            let state = self.internals.state.read().await?;
451
452            state.store.get_room_events(self.room.room_id(), event_type, session_id).await?
453        };
454
455        // The only cache to not store its events is the event-focused cache. Its events
456        // only live in memory.
457        {
458            let event_focused = self.event_focused.read().await;
459
460            for event_focused in event_focused.values() {
461                events.extend(
462                    event_focused
463                        .events()
464                        .await?
465                        .into_iter()
466                        .filter(|event| event_type == event.kind.event_type().as_deref())
467                        .filter(|event| session_id == event.kind.session_id()),
468                );
469            }
470        }
471
472        Ok(events.into_iter())
473    }
474}
475
476/// A diff update for an event cache timeline represented as a vector.
477#[derive(Clone, Debug)]
478pub struct TimelineVectorDiffs {
479    /// New vector diff for the thread timeline.
480    pub diffs: Vec<VectorDiff<Event>>,
481    /// The origin that triggered this update.
482    pub origin: EventsOrigin,
483}
484
485/// An enum representing where an event has been found.
486#[derive(Debug)]
487pub(super) enum EventLocation {
488    /// Event lives in memory (and likely in the store!).
489    Memory(Position),
490
491    /// Event lives in the store only, it has not been loaded in memory yet.
492    Store,
493}