Skip to main content

matrix_sdk/event_cache/caches/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use eyeball::SharedObservable;
16use eyeball_im::VectorDiff;
17use matrix_sdk_base::{
18    ThreadingSupport,
19    event_cache::{Event, store::EventCacheStoreLock},
20    linked_chunk::Position,
21    sync::{JoinedRoomUpdate, LeftRoomUpdate},
22};
23use ruma::{OwnedRoomId, RoomId};
24use tokio::sync::{broadcast::Sender, mpsc};
25
26use super::{EventCacheError, EventsOrigin, Result};
27use crate::{
28    client::WeakClient, event_cache::automatic_pagination::AutomaticPagination, room::WeakRoom,
29};
30
31pub mod event_focused;
32pub mod event_linked_chunk;
33pub(super) mod lock;
34pub mod pagination;
35pub mod pinned_events;
36mod read_receipts;
37pub mod room;
38pub mod thread;
39
40/// A type to hold all the caches for a given room.
41#[derive(Debug)]
42pub(super) struct Caches {
43    pub room: room::RoomEventCache,
44}
45
46impl Caches {
47    /// Create a new [`Caches`].
48    pub async fn new(
49        weak_client: &WeakClient,
50        room_id: &RoomId,
51        generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
52        linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
53        auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
54        store: EventCacheStoreLock,
55        automatic_pagination: Option<AutomaticPagination>,
56    ) -> Result<Self> {
57        let Some(client) = weak_client.get() else {
58            return Err(EventCacheError::ClientDropped);
59        };
60
61        let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
62
63        let room = client
64            .get_room(room_id)
65            .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
66        let room_version_rules = room.clone_info().room_version_rules_or_default();
67
68        let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
69            hit_timeline_start: false,
70        });
71
72        let enabled_thread_support =
73            matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
74
75        let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
76
77        let own_user_id =
78            client.user_id().expect("the user must be logged in, at this point").to_owned();
79
80        let room_state = room::LockedRoomEventCacheState::new(
81            own_user_id,
82            room_id.to_owned(),
83            weak_room.clone(),
84            room_version_rules,
85            enabled_thread_support,
86            update_sender.clone(),
87            linked_chunk_update_sender,
88            store,
89            pagination_status.clone(),
90            automatic_pagination,
91        )
92        .await?;
93
94        let timeline_is_not_empty =
95            room_state.read().await?.room_linked_chunk().revents().next().is_some();
96
97        let room_event_cache = room::RoomEventCache::new(
98            room_id.to_owned(),
99            weak_room,
100            room_state,
101            pagination_status,
102            auto_shrink_sender,
103            update_sender,
104        );
105
106        // If at least one event has been loaded, it means there is a timeline. Let's
107        // emit a generic update.
108        if timeline_is_not_empty {
109            let _ = generic_update_sender
110                .send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
111        }
112
113        Ok(Self { room: room_event_cache })
114    }
115
116    /// Update all the event caches with a [`JoinedRoomUpdate`].
117    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
118        let Self { room } = &self;
119
120        room.handle_joined_room_update(updates).await?;
121
122        Ok(())
123    }
124
125    /// Update all the event caches with a [`LeftRoomUpdate`].
126    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
127        let Self { room } = &self;
128
129        room.handle_left_room_update(updates).await?;
130
131        Ok(())
132    }
133
134    /// Try to acquire exclusive locks over all the event caches managed by
135    /// this [`Caches`], in order to reset all the in-memory data.
136    ///
137    /// Note that this method takes `&mut self`, ensuring only one reset can
138    /// happen at a time.
139    ///
140    /// If the returned value is dropped, no data will be reset.
141    pub async fn prepare_to_reset(&mut self) -> Result<ResetCaches<'_>> {
142        ResetCaches::new(self).await
143    }
144
145    /// Get all events from all the event caches manged by this [`Cacches`].
146    ///
147    /// Events can be duplicated if present in different event caches.
148    #[cfg(feature = "e2e-encryption")]
149    pub async fn all_events(&self) -> Result<impl Iterator<Item = Event>> {
150        let events_from_room = self.room.events().await?;
151
152        Ok(events_from_room.into_iter())
153    }
154}
155
156/// Type holding exclusive locks over all event caches managed by a
157/// [`Caches`].
158///
159/// To reset all the event caches, call [`ResetCaches::reset_all`]. If this type
160/// is dropped, no reset happens and the exclusive lock is released.
161pub(super) struct ResetCaches<'c> {
162    room_lock: (&'c room::RoomEventCache, room::RoomEventCacheStateLockWriteGuard<'c>),
163}
164
165impl<'c> ResetCaches<'c> {
166    /// Create a new [`ResetCaches`].
167    ///
168    /// It can fail if acquiring an exclusive lock fails.
169    async fn new(Caches { room }: &'c mut Caches) -> Result<Self> {
170        Ok(Self { room_lock: (room, room.state().write().await?) })
171    }
172
173    /// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`].
174    ///
175    /// Note that this method consumes `self`, ensuring the acquired exclusive
176    /// locks over the event caches are released.
177    ///
178    /// It can fail if resetting an event cache fails.
179    pub async fn reset_all(self) -> Result<()> {
180        let Self { room_lock: (room, mut room_state) } = self;
181
182        {
183            let updates_as_vector_diffs = room_state.reset().await?;
184            room.update_sender().send(
185                room::RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
186                    diffs: updates_as_vector_diffs,
187                    origin: EventsOrigin::Cache,
188                }),
189                Some(room::RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
190            );
191        }
192
193        Ok(())
194    }
195}
196
197/// A diff update for an event cache timeline represented as a vector.
198#[derive(Clone, Debug)]
199pub struct TimelineVectorDiffs {
200    /// New vector diff for the thread timeline.
201    pub diffs: Vec<VectorDiff<Event>>,
202    /// The origin that triggered this update.
203    pub origin: EventsOrigin,
204}
205
206/// An enum representing where an event has been found.
207pub(super) enum EventLocation {
208    /// Event lives in memory (and likely in the store!).
209    Memory(Position),
210
211    /// Event lives in the store only, it has not been loaded in memory yet.
212    Store,
213}