matrix_sdk_base/event_cache/store/
traits.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19    linked_chunk::{ChunkIdentifier, ChunkIdentifierGenerator, Position, RawChunk, Update},
20    AsyncTraitDeps,
21};
22use ruma::{EventId, MxcUri, OwnedEventId, RoomId};
23
24use super::{
25    media::{IgnoreMediaRetentionPolicy, MediaRetentionPolicy},
26    EventCacheStoreError,
27};
28use crate::{
29    event_cache::{Event, Gap},
30    media::MediaRequestParameters,
31};
32
33/// A default capacity for linked chunks, when manipulating in conjunction with
34/// an `EventCacheStore` implementation.
35// TODO: move back?
36pub const DEFAULT_CHUNK_CAPACITY: usize = 128;
37
38/// An abstract trait that can be used to implement different store backends
39/// for the event cache of the SDK.
40#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
41#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
42pub trait EventCacheStore: AsyncTraitDeps {
43    /// The error type used by this event cache store.
44    type Error: fmt::Debug + Into<EventCacheStoreError>;
45
46    /// Try to take a lock using the given store.
47    async fn try_take_leased_lock(
48        &self,
49        lease_duration_ms: u32,
50        key: &str,
51        holder: &str,
52    ) -> Result<bool, Self::Error>;
53
54    /// An [`Update`] reflects an operation that has happened inside a linked
55    /// chunk. The linked chunk is used by the event cache to store the events
56    /// in-memory. This method aims at forwarding this update inside this store.
57    async fn handle_linked_chunk_updates(
58        &self,
59        room_id: &RoomId,
60        updates: Vec<Update<Event, Gap>>,
61    ) -> Result<(), Self::Error>;
62
63    /// Remove all data tied to a given room from the cache.
64    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
65        // Right now, this means removing all the linked chunk. If implementations
66        // override this behavior, they should *also* include this code.
67        self.handle_linked_chunk_updates(room_id, vec![Update::Clear]).await
68    }
69
70    /// Return all the raw components of a linked chunk, so the caller may
71    /// reconstruct the linked chunk later.
72    #[doc(hidden)]
73    async fn load_all_chunks(
74        &self,
75        room_id: &RoomId,
76    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error>;
77
78    /// Load the last chunk of the `LinkedChunk` holding all events of the room
79    /// identified by `room_id`.
80    ///
81    /// This is used to iteratively load events for the `EventCache`.
82    async fn load_last_chunk(
83        &self,
84        room_id: &RoomId,
85    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error>;
86
87    /// Load the chunk before the chunk identified by `before_chunk_identifier`
88    /// of the `LinkedChunk` holding all events of the room identified by
89    /// `room_id`
90    ///
91    /// This is used to iteratively load events for the `EventCache`.
92    async fn load_previous_chunk(
93        &self,
94        room_id: &RoomId,
95        before_chunk_identifier: ChunkIdentifier,
96    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error>;
97
98    /// Clear persisted events for all the rooms.
99    ///
100    /// This will empty and remove all the linked chunks stored previously,
101    /// using the above [`Self::handle_linked_chunk_updates`] methods.
102    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error>;
103
104    /// Given a set of event IDs, return the duplicated events along with their
105    /// position if there are any.
106    async fn filter_duplicated_events(
107        &self,
108        room_id: &RoomId,
109        events: Vec<OwnedEventId>,
110    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error>;
111
112    /// Find an event by its ID.
113    async fn find_event(
114        &self,
115        room_id: &RoomId,
116        event_id: &EventId,
117    ) -> Result<Option<(Position, Event)>, Self::Error>;
118
119    /// Add a media file's content in the media store.
120    ///
121    /// # Arguments
122    ///
123    /// * `request` - The `MediaRequest` of the file.
124    ///
125    /// * `content` - The content of the file.
126    async fn add_media_content(
127        &self,
128        request: &MediaRequestParameters,
129        content: Vec<u8>,
130        ignore_policy: IgnoreMediaRetentionPolicy,
131    ) -> Result<(), Self::Error>;
132
133    /// Replaces the given media's content key with another one.
134    ///
135    /// This should be used whenever a temporary (local) MXID has been used, and
136    /// it must now be replaced with its actual remote counterpart (after
137    /// uploading some content, or creating an empty MXC URI).
138    ///
139    /// ⚠ No check is performed to ensure that the media formats are consistent,
140    /// i.e. it's possible to update with a thumbnail key a media that was
141    /// keyed as a file before. The caller is responsible of ensuring that
142    /// the replacement makes sense, according to their use case.
143    ///
144    /// This should not raise an error when the `from` parameter points to an
145    /// unknown media, and it should silently continue in this case.
146    ///
147    /// # Arguments
148    ///
149    /// * `from` - The previous `MediaRequest` of the file.
150    ///
151    /// * `to` - The new `MediaRequest` of the file.
152    async fn replace_media_key(
153        &self,
154        from: &MediaRequestParameters,
155        to: &MediaRequestParameters,
156    ) -> Result<(), Self::Error>;
157
158    /// Get a media file's content out of the media store.
159    ///
160    /// # Arguments
161    ///
162    /// * `request` - The `MediaRequest` of the file.
163    async fn get_media_content(
164        &self,
165        request: &MediaRequestParameters,
166    ) -> Result<Option<Vec<u8>>, Self::Error>;
167
168    /// Remove a media file's content from the media store.
169    ///
170    /// # Arguments
171    ///
172    /// * `request` - The `MediaRequest` of the file.
173    async fn remove_media_content(
174        &self,
175        request: &MediaRequestParameters,
176    ) -> Result<(), Self::Error>;
177
178    /// Get a media file's content associated to an `MxcUri` from the
179    /// media store.
180    ///
181    /// In theory, there could be several files stored using the same URI and a
182    /// different `MediaFormat`. This API is meant to be used with a media file
183    /// that has only been stored with a single format.
184    ///
185    /// If there are several media files for a given URI in different formats,
186    /// this API will only return one of them. Which one is left as an
187    /// implementation detail.
188    ///
189    /// # Arguments
190    ///
191    /// * `uri` - The `MxcUri` of the media file.
192    async fn get_media_content_for_uri(&self, uri: &MxcUri)
193        -> Result<Option<Vec<u8>>, Self::Error>;
194
195    /// Remove all the media files' content associated to an `MxcUri` from the
196    /// media store.
197    ///
198    /// This should not raise an error when the `uri` parameter points to an
199    /// unknown media, and it should return an Ok result in this case.
200    ///
201    /// # Arguments
202    ///
203    /// * `uri` - The `MxcUri` of the media files.
204    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error>;
205
206    /// Set the `MediaRetentionPolicy` to use for deciding whether to store or
207    /// keep media content.
208    ///
209    /// # Arguments
210    ///
211    /// * `policy` - The `MediaRetentionPolicy` to use.
212    async fn set_media_retention_policy(
213        &self,
214        policy: MediaRetentionPolicy,
215    ) -> Result<(), Self::Error>;
216
217    /// Get the current `MediaRetentionPolicy`.
218    fn media_retention_policy(&self) -> MediaRetentionPolicy;
219
220    /// Set whether the current [`MediaRetentionPolicy`] should be ignored for
221    /// the media.
222    ///
223    /// The change will be taken into account in the next cleanup.
224    ///
225    /// # Arguments
226    ///
227    /// * `request` - The `MediaRequestParameters` of the file.
228    ///
229    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
230    ///   ignored.
231    async fn set_ignore_media_retention_policy(
232        &self,
233        request: &MediaRequestParameters,
234        ignore_policy: IgnoreMediaRetentionPolicy,
235    ) -> Result<(), Self::Error>;
236
237    /// Clean up the media cache with the current `MediaRetentionPolicy`.
238    ///
239    /// If there is already an ongoing cleanup, this is a noop.
240    async fn clean_up_media_cache(&self) -> Result<(), Self::Error>;
241}
242
243#[repr(transparent)]
244struct EraseEventCacheStoreError<T>(T);
245
246#[cfg(not(tarpaulin_include))]
247impl<T: fmt::Debug> fmt::Debug for EraseEventCacheStoreError<T> {
248    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249        self.0.fmt(f)
250    }
251}
252
253#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
254#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
255impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
256    type Error = EventCacheStoreError;
257
258    async fn try_take_leased_lock(
259        &self,
260        lease_duration_ms: u32,
261        key: &str,
262        holder: &str,
263    ) -> Result<bool, Self::Error> {
264        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
265    }
266
267    async fn handle_linked_chunk_updates(
268        &self,
269        room_id: &RoomId,
270        updates: Vec<Update<Event, Gap>>,
271    ) -> Result<(), Self::Error> {
272        self.0.handle_linked_chunk_updates(room_id, updates).await.map_err(Into::into)
273    }
274
275    async fn load_all_chunks(
276        &self,
277        room_id: &RoomId,
278    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
279        self.0.load_all_chunks(room_id).await.map_err(Into::into)
280    }
281
282    async fn load_last_chunk(
283        &self,
284        room_id: &RoomId,
285    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
286        self.0.load_last_chunk(room_id).await.map_err(Into::into)
287    }
288
289    async fn load_previous_chunk(
290        &self,
291        room_id: &RoomId,
292        before_chunk_identifier: ChunkIdentifier,
293    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
294        self.0.load_previous_chunk(room_id, before_chunk_identifier).await.map_err(Into::into)
295    }
296
297    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
298        self.0.clear_all_rooms_chunks().await.map_err(Into::into)
299    }
300
301    async fn filter_duplicated_events(
302        &self,
303        room_id: &RoomId,
304        events: Vec<OwnedEventId>,
305    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
306        self.0.filter_duplicated_events(room_id, events).await.map_err(Into::into)
307    }
308
309    async fn find_event(
310        &self,
311        room_id: &RoomId,
312        event_id: &EventId,
313    ) -> Result<Option<(Position, Event)>, Self::Error> {
314        self.0.find_event(room_id, event_id).await.map_err(Into::into)
315    }
316
317    async fn add_media_content(
318        &self,
319        request: &MediaRequestParameters,
320        content: Vec<u8>,
321        ignore_policy: IgnoreMediaRetentionPolicy,
322    ) -> Result<(), Self::Error> {
323        self.0.add_media_content(request, content, ignore_policy).await.map_err(Into::into)
324    }
325
326    async fn replace_media_key(
327        &self,
328        from: &MediaRequestParameters,
329        to: &MediaRequestParameters,
330    ) -> Result<(), Self::Error> {
331        self.0.replace_media_key(from, to).await.map_err(Into::into)
332    }
333
334    async fn get_media_content(
335        &self,
336        request: &MediaRequestParameters,
337    ) -> Result<Option<Vec<u8>>, Self::Error> {
338        self.0.get_media_content(request).await.map_err(Into::into)
339    }
340
341    async fn remove_media_content(
342        &self,
343        request: &MediaRequestParameters,
344    ) -> Result<(), Self::Error> {
345        self.0.remove_media_content(request).await.map_err(Into::into)
346    }
347
348    async fn get_media_content_for_uri(
349        &self,
350        uri: &MxcUri,
351    ) -> Result<Option<Vec<u8>>, Self::Error> {
352        self.0.get_media_content_for_uri(uri).await.map_err(Into::into)
353    }
354
355    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
356        self.0.remove_media_content_for_uri(uri).await.map_err(Into::into)
357    }
358
359    async fn set_media_retention_policy(
360        &self,
361        policy: MediaRetentionPolicy,
362    ) -> Result<(), Self::Error> {
363        self.0.set_media_retention_policy(policy).await.map_err(Into::into)
364    }
365
366    fn media_retention_policy(&self) -> MediaRetentionPolicy {
367        self.0.media_retention_policy()
368    }
369
370    async fn set_ignore_media_retention_policy(
371        &self,
372        request: &MediaRequestParameters,
373        ignore_policy: IgnoreMediaRetentionPolicy,
374    ) -> Result<(), Self::Error> {
375        self.0.set_ignore_media_retention_policy(request, ignore_policy).await.map_err(Into::into)
376    }
377
378    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
379        self.0.clean_up_media_cache().await.map_err(Into::into)
380    }
381}
382
383/// A type-erased [`EventCacheStore`].
384pub type DynEventCacheStore = dyn EventCacheStore<Error = EventCacheStoreError>;
385
386/// A type that can be type-erased into `Arc<dyn EventCacheStore>`.
387///
388/// This trait is not meant to be implemented directly outside
389/// `matrix-sdk-base`, but it is automatically implemented for everything that
390/// implements `EventCacheStore`.
391pub trait IntoEventCacheStore {
392    #[doc(hidden)]
393    fn into_event_cache_store(self) -> Arc<DynEventCacheStore>;
394}
395
396impl<T> IntoEventCacheStore for T
397where
398    T: EventCacheStore + Sized + 'static,
399{
400    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
401        Arc::new(EraseEventCacheStoreError(self))
402    }
403}
404
405// Turns a given `Arc<T>` into `Arc<DynEventCacheStore>` by attaching the
406// `EventCacheStore` impl vtable of `EraseEventCacheStoreError<T>`.
407impl<T> IntoEventCacheStore for Arc<T>
408where
409    T: EventCacheStore + 'static,
410{
411    fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
412        let ptr: *const T = Arc::into_raw(self);
413        let ptr_erased = ptr as *const EraseEventCacheStoreError<T>;
414        // SAFETY: EraseEventCacheStoreError is repr(transparent) so T and
415        //         EraseEventCacheStoreError<T> have the same layout and ABI
416        unsafe { Arc::from_raw(ptr_erased) }
417    }
418}