Skip to main content

matrix_sdk_base/event_cache/store/
traits.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19    AsyncTraitDeps,
20    cross_process_lock::CrossProcessLockGeneration,
21    linked_chunk::{
22        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
23        RawChunk, Update,
24    },
25};
26use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
27
28use super::EventCacheStoreError;
29use crate::event_cache::{Event, Gap};
30
31/// A default capacity for linked chunks, when manipulating in conjunction with
32/// an `EventCacheStore` implementation.
33// TODO: move back?
34pub const DEFAULT_CHUNK_CAPACITY: usize = 128;
35
36/// An abstract trait that can be used to implement different store backends
37/// for the event cache of the SDK.
38#[cfg_attr(target_family = "wasm", async_trait(?Send))]
39#[cfg_attr(not(target_family = "wasm"), async_trait)]
40pub trait EventCacheStore: AsyncTraitDeps {
41    /// The error type used by this event cache store.
42    type Error: fmt::Debug + Into<EventCacheStoreError>;
43
44    /// Try to take a lock using the given store.
45    async fn try_take_leased_lock(
46        &self,
47        lease_duration_ms: u32,
48        key: &str,
49        holder: &str,
50    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;
51
52    /// An [`Update`] reflects an operation that has happened inside a linked
53    /// chunk. The linked chunk is used by the event cache to store the events
54    /// in-memory. This method aims at forwarding this update inside this store.
55    async fn handle_linked_chunk_updates(
56        &self,
57        linked_chunk_id: LinkedChunkId<'_>,
58        updates: Vec<Update<Event, Gap>>,
59    ) -> Result<(), Self::Error>;
60
61    /// Remove all data tied to a given room from the cache.
62    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
63        // Right now, this means removing all the linked chunk. If implementations
64        // override this behavior, they should *also* include this code.
65        self.handle_linked_chunk_updates(LinkedChunkId::Room(room_id), vec![Update::Clear]).await
66    }
67
68    /// Return all the raw components of a linked chunk, so the caller may
69    /// reconstruct the linked chunk later.
70    #[doc(hidden)]
71    async fn load_all_chunks(
72        &self,
73        linked_chunk_id: LinkedChunkId<'_>,
74    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error>;
75
76    /// Load all of the chunks' metadata for the given [`LinkedChunkId`].
77    ///
78    /// Chunks are unordered, and there's no guarantee that the chunks would
79    /// form a valid linked chunk after reconstruction.
80    async fn load_all_chunks_metadata(
81        &self,
82        linked_chunk_id: LinkedChunkId<'_>,
83    ) -> Result<Vec<ChunkMetadata>, Self::Error>;
84
85    /// Load the last chunk of the `LinkedChunk` holding all events of the room
86    /// identified by `room_id`.
87    ///
88    /// This is used to iteratively load events for the `EventCache`.
89    async fn load_last_chunk(
90        &self,
91        linked_chunk_id: LinkedChunkId<'_>,
92    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error>;
93
94    /// Load the chunk before the chunk identified by `before_chunk_identifier`
95    /// of the `LinkedChunk` holding all events of the room identified by
96    /// `room_id`
97    ///
98    /// This is used to iteratively load events for the `EventCache`.
99    async fn load_previous_chunk(
100        &self,
101        linked_chunk_id: LinkedChunkId<'_>,
102        before_chunk_identifier: ChunkIdentifier,
103    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error>;
104
105    /// Clear persisted events for all the rooms.
106    ///
107    /// This will empty and remove all the linked chunks stored previously,
108    /// using the above [`Self::handle_linked_chunk_updates`] methods. It
109    /// must *also* delete all the events' content, if they were stored in a
110    /// separate table.
111    ///
112    /// ⚠ This is meant only for super specific use cases, where there shouldn't
113    /// be any live in-memory linked chunks. In general, prefer using
114    /// `EventCache::clear_all_rooms()` from the common SDK crate.
115    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error>;
116
117    /// Given a set of event IDs, return the duplicated events along with their
118    /// position if there are any.
119    async fn filter_duplicated_events(
120        &self,
121        linked_chunk_id: LinkedChunkId<'_>,
122        events: Vec<OwnedEventId>,
123    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error>;
124
125    /// Find an event by its ID in a room.
126    ///
127    /// This method must return events saved either in any linked chunks, *or*
128    /// events saved "out-of-band" with the [`Self::save_event`] method.
129    async fn find_event(
130        &self,
131        room_id: &RoomId,
132        event_id: &EventId,
133    ) -> Result<Option<Event>, Self::Error>;
134
135    /// Find all the events (alongside their position in the room's linked
136    /// chunk, if available) that relate to a given event.
137    ///
138    /// The only events which don't have a position are those which have been
139    /// saved out-of-band using [`Self::save_event`].
140    ///
141    /// Note: it doesn't process relations recursively: for instance, if
142    /// requesting only thread events, it will NOT return the aggregated
143    /// events affecting the returned events. It is the responsibility of
144    /// the caller to do so, if needed.
145    ///
146    /// An additional filter can be provided to only retrieve related events for
147    /// a certain relationship.
148    ///
149    /// This method must return events saved either in any linked chunks, *or*
150    /// events saved "out-of-band" with the [`Self::save_event`] method.
151    async fn find_event_relations(
152        &self,
153        room_id: &RoomId,
154        event_id: &EventId,
155        filter: Option<&[RelationType]>,
156    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error>;
157
158    /// Get all events in this room.
159    ///
160    /// This method must return events saved either in any linked chunks, *or*
161    /// events saved "out-of-band" with the [`Self::save_event`] method.
162    async fn get_room_events(
163        &self,
164        room_id: &RoomId,
165        event_type: Option<&str>,
166        session_id: Option<&str>,
167    ) -> Result<Vec<Event>, Self::Error>;
168
169    /// Save an event, that might or might not be part of an existing linked
170    /// chunk.
171    ///
172    /// If the event has no event id, it will not be saved, and the function
173    /// must return an Ok result early.
174    ///
175    /// If the event was already stored with the same id, it must be replaced,
176    /// without causing an error.
177    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error>;
178
179    /// Close the store, releasing all held resources (database connections,
180    /// file descriptors, file locks).
181    ///
182    /// In-flight operations complete before this method returns. After it
183    /// returns, operations will fail until [`Self::reopen()`] is called.
184    async fn close(&self) -> Result<(), Self::Error>;
185
186    /// Reopen the store after a [`Self::close()`], re-acquiring database
187    /// connections.
188    async fn reopen(&self) -> Result<(), Self::Error>;
189
190    /// Perform database optimizations if any are available, i.e. vacuuming in
191    /// SQLite.
192    ///
193    /// **Warning:** this was added to check if SQLite fragmentation was the
194    /// source of performance issues, **DO NOT use in production**.
195    #[doc(hidden)]
196    async fn optimize(&self) -> Result<(), Self::Error>;
197
198    /// Returns the size of the store in bytes, if known.
199    async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
200}
201
202#[repr(transparent)]
203struct EraseEventCacheStoreError<T>(T);
204
205#[cfg(not(tarpaulin_include))]
206impl<T: fmt::Debug> fmt::Debug for EraseEventCacheStoreError<T> {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208        self.0.fmt(f)
209    }
210}
211
212#[cfg_attr(target_family = "wasm", async_trait(?Send))]
213#[cfg_attr(not(target_family = "wasm"), async_trait)]
214impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
215    type Error = EventCacheStoreError;
216
217    async fn try_take_leased_lock(
218        &self,
219        lease_duration_ms: u32,
220        key: &str,
221        holder: &str,
222    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
223        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
224    }
225
226    async fn handle_linked_chunk_updates(
227        &self,
228        linked_chunk_id: LinkedChunkId<'_>,
229        updates: Vec<Update<Event, Gap>>,
230    ) -> Result<(), Self::Error> {
231        self.0.handle_linked_chunk_updates(linked_chunk_id, updates).await.map_err(Into::into)
232    }
233
234    async fn load_all_chunks(
235        &self,
236        linked_chunk_id: LinkedChunkId<'_>,
237    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
238        self.0.load_all_chunks(linked_chunk_id).await.map_err(Into::into)
239    }
240
241    async fn load_all_chunks_metadata(
242        &self,
243        linked_chunk_id: LinkedChunkId<'_>,
244    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
245        self.0.load_all_chunks_metadata(linked_chunk_id).await.map_err(Into::into)
246    }
247
248    async fn load_last_chunk(
249        &self,
250        linked_chunk_id: LinkedChunkId<'_>,
251    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
252        self.0.load_last_chunk(linked_chunk_id).await.map_err(Into::into)
253    }
254
255    async fn load_previous_chunk(
256        &self,
257        linked_chunk_id: LinkedChunkId<'_>,
258        before_chunk_identifier: ChunkIdentifier,
259    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
260        self.0
261            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
262            .await
263            .map_err(Into::into)
264    }
265
266    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
267        self.0.clear_all_linked_chunks().await.map_err(Into::into)
268    }
269
270    async fn filter_duplicated_events(
271        &self,
272        linked_chunk_id: LinkedChunkId<'_>,
273        events: Vec<OwnedEventId>,
274    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
275        self.0.filter_duplicated_events(linked_chunk_id, events).await.map_err(Into::into)
276    }
277
278    async fn find_event(
279        &self,
280        room_id: &RoomId,
281        event_id: &EventId,
282    ) -> Result<Option<Event>, Self::Error> {
283        self.0.find_event(room_id, event_id).await.map_err(Into::into)
284    }
285
286    async fn find_event_relations(
287        &self,
288        room_id: &RoomId,
289        event_id: &EventId,
290        filter: Option<&[RelationType]>,
291    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
292        self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into)
293    }
294
295    async fn get_room_events(
296        &self,
297        room_id: &RoomId,
298        event_type: Option<&str>,
299        session_id: Option<&str>,
300    ) -> Result<Vec<Event>, Self::Error> {
301        self.0.get_room_events(room_id, event_type, session_id).await.map_err(Into::into)
302    }
303
304    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
305        self.0.save_event(room_id, event).await.map_err(Into::into)
306    }
307
308    async fn close(&self) -> Result<(), Self::Error> {
309        self.0.close().await.map_err(Into::into)
310    }
311
312    async fn reopen(&self) -> Result<(), Self::Error> {
313        self.0.reopen().await.map_err(Into::into)
314    }
315
316    async fn optimize(&self) -> Result<(), Self::Error> {
317        self.0.optimize().await.map_err(Into::into)?;
318        Ok(())
319    }
320
321    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
322        Ok(self.0.get_size().await.map_err(Into::into)?)
323    }
324}
325
326/// A type-erased [`EventCacheStore`].
327pub type DynEventCacheStore = dyn EventCacheStore<Error = EventCacheStoreError>;
328
329/// A type that can be type-erased into `Arc<dyn EventCacheStore>`.
330///
331/// This trait is not meant to be implemented directly outside
332/// `matrix-sdk-base`, but it is automatically implemented for everything that
333/// implements `EventCacheStore`.
334pub trait IntoEventCacheStore {
335    #[doc(hidden)]
336    fn into_event_cache_store(self) -> Arc<DynEventCacheStore>;
337}
338
339impl IntoEventCacheStore for Arc<DynEventCacheStore> {
340    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
341        self
342    }
343}
344
345impl<T> IntoEventCacheStore for T
346where
347    T: EventCacheStore + Sized + 'static,
348{
349    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
350        Arc::new(EraseEventCacheStoreError(self))
351    }
352}
353
354// Turns a given `Arc<T>` into `Arc<DynEventCacheStore>` by attaching the
355// `EventCacheStore` impl vtable of `EraseEventCacheStoreError<T>`.
356impl<T> IntoEventCacheStore for Arc<T>
357where
358    T: EventCacheStore + 'static,
359{
360    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
361        let ptr: *const T = Arc::into_raw(self);
362        let ptr_erased = ptr as *const EraseEventCacheStoreError<T>;
363        // SAFETY: EraseEventCacheStoreError is repr(transparent) so T and
364        //         EraseEventCacheStoreError<T> have the same layout and ABI
365        unsafe { Arc::from_raw(ptr_erased) }
366    }
367}