Skip to main content

matrix_sdk/event_cache/caches/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use eyeball::SharedObservable;
16use eyeball_im::VectorDiff;
17use matrix_sdk_base::{
18    ThreadingSupport,
19    event_cache::{Event, store::EventCacheStoreLock},
20    linked_chunk::Position,
21    sync::{JoinedRoomUpdate, LeftRoomUpdate},
22};
23use ruma::{OwnedRoomId, RoomId};
24use tokio::sync::{broadcast::Sender, mpsc};
25
26use super::{EventCacheError, EventsOrigin, Result};
27use crate::{client::WeakClient, room::WeakRoom};
28
29pub mod event_focused;
30pub mod event_linked_chunk;
31pub(super) mod lock;
32pub mod pagination;
33pub mod pinned_events;
34mod read_receipts;
35pub mod room;
36pub mod thread;
37
38/// A type to hold all the caches for a given room.
39#[derive(Debug)]
40pub(super) struct Caches {
41    pub room: room::RoomEventCache,
42}
43
44impl Caches {
45    /// Create a new [`Caches`].
46    pub async fn new(
47        weak_client: &WeakClient,
48        room_id: &RoomId,
49        generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
50        linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
51        auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
52        store: EventCacheStoreLock,
53    ) -> Result<Self> {
54        let Some(client) = weak_client.get() else {
55            return Err(EventCacheError::ClientDropped);
56        };
57
58        let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
59
60        let room = client
61            .get_room(room_id)
62            .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
63        let room_version_rules = room.clone_info().room_version_rules_or_default();
64
65        let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
66            hit_timeline_start: false,
67        });
68
69        let enabled_thread_support =
70            matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
71
72        let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
73
74        let own_user_id =
75            client.user_id().expect("the user must be logged in, at this point").to_owned();
76
77        let room_state = room::LockedRoomEventCacheState::new(
78            own_user_id,
79            room_id.to_owned(),
80            weak_room.clone(),
81            room_version_rules,
82            enabled_thread_support,
83            update_sender.clone(),
84            linked_chunk_update_sender,
85            store,
86            pagination_status.clone(),
87        )
88        .await?;
89
90        let timeline_is_not_empty =
91            room_state.read().await?.room_linked_chunk().revents().next().is_some();
92
93        let room_event_cache = room::RoomEventCache::new(
94            room_id.to_owned(),
95            weak_room,
96            room_state,
97            pagination_status,
98            auto_shrink_sender,
99            update_sender,
100        );
101
102        // If at least one event has been loaded, it means there is a timeline. Let's
103        // emit a generic update.
104        if timeline_is_not_empty {
105            let _ = generic_update_sender
106                .send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
107        }
108
109        Ok(Self { room: room_event_cache })
110    }
111
112    /// Update all the event caches with a [`JoinedRoomUpdate`].
113    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
114        let Self { room } = &self;
115
116        room.handle_joined_room_update(updates).await?;
117
118        Ok(())
119    }
120
121    /// Update all the event caches with a [`LeftRoomUpdate`].
122    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
123        let Self { room } = &self;
124
125        room.handle_left_room_update(updates).await?;
126
127        Ok(())
128    }
129
130    /// Try to acquire exclusive locks over all the event caches managed by
131    /// this [`Caches`], in order to reset all the in-memory data.
132    ///
133    /// Note that this method takes `&mut self`, ensuring only one reset can
134    /// happen at a time.
135    ///
136    /// If the returned value is dropped, no data will be reset.
137    pub async fn prepare_to_reset(&mut self) -> Result<ResetCaches<'_>> {
138        ResetCaches::new(self).await
139    }
140
141    /// Get all events from all the event caches manged by this [`Cacches`].
142    ///
143    /// Events can be duplicated if present in different event caches.
144    #[cfg(feature = "e2e-encryption")]
145    pub async fn all_events(&self) -> Result<impl Iterator<Item = Event>> {
146        let events_from_room = self.room.events().await?;
147
148        Ok(events_from_room.into_iter())
149    }
150}
151
152/// Type holding exclusive locks over all event caches managed by a
153/// [`Caches`].
154///
155/// To reset all the event caches, call [`ResetCaches::reset_all`]. If this type
156/// is dropped, no reset happens and the exclusive lock is released.
157pub(super) struct ResetCaches<'c> {
158    room_lock: (&'c room::RoomEventCache, room::RoomEventCacheStateLockWriteGuard<'c>),
159}
160
161impl<'c> ResetCaches<'c> {
162    /// Create a new [`ResetCaches`].
163    ///
164    /// It can fail if acquiring an exclusive lock fails.
165    async fn new(Caches { room }: &'c mut Caches) -> Result<Self> {
166        Ok(Self { room_lock: (room, room.state().write().await?) })
167    }
168
169    /// Reset all the event caches, and broadcast the [`TimelineVectorDiffs`].
170    ///
171    /// Note that this method consumes `self`, ensuring the acquired exclusive
172    /// locks over the event caches are released.
173    ///
174    /// It can fail if resetting an event cache fails.
175    pub async fn reset_all(self) -> Result<()> {
176        let Self { room_lock: (room, mut room_state) } = self;
177
178        {
179            let updates_as_vector_diffs = room_state.reset().await?;
180            room.update_sender().send(
181                room::RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
182                    diffs: updates_as_vector_diffs,
183                    origin: EventsOrigin::Cache,
184                }),
185                Some(room::RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
186            );
187        }
188
189        Ok(())
190    }
191}
192
193/// A diff update for an event cache timeline represented as a vector.
194#[derive(Clone, Debug)]
195pub struct TimelineVectorDiffs {
196    /// New vector diff for the thread timeline.
197    pub diffs: Vec<VectorDiff<Event>>,
198    /// The origin that triggered this update.
199    pub origin: EventsOrigin,
200}
201
202/// An enum representing where an event has been found.
203pub(super) enum EventLocation {
204    /// Event lives in memory (and likely in the store!).
205    Memory(Position),
206
207    /// Event lives in the store only, it has not been loaded in memory yet.
208    Store,
209}