matrix_sdk_base/event_cache/store/
traits.rs1use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19 AsyncTraitDeps,
20 cross_process_lock::CrossProcessLockGeneration,
21 linked_chunk::{
22 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
23 RawChunk, Update,
24 },
25};
26use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
27
28use super::EventCacheStoreError;
29use crate::event_cache::{Event, Gap};
30
31pub const DEFAULT_CHUNK_CAPACITY: usize = 128;
35
36#[cfg_attr(target_family = "wasm", async_trait(?Send))]
39#[cfg_attr(not(target_family = "wasm"), async_trait)]
40pub trait EventCacheStore: AsyncTraitDeps {
41 type Error: fmt::Debug + Into<EventCacheStoreError>;
43
44 async fn try_take_leased_lock(
46 &self,
47 lease_duration_ms: u32,
48 key: &str,
49 holder: &str,
50 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;
51
52 async fn handle_linked_chunk_updates(
56 &self,
57 linked_chunk_id: LinkedChunkId<'_>,
58 updates: Vec<Update<Event, Gap>>,
59 ) -> Result<(), Self::Error>;
60
61 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
63 self.handle_linked_chunk_updates(LinkedChunkId::Room(room_id), vec![Update::Clear]).await
66 }
67
68 #[doc(hidden)]
71 async fn load_all_chunks(
72 &self,
73 linked_chunk_id: LinkedChunkId<'_>,
74 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error>;
75
76 async fn load_all_chunks_metadata(
81 &self,
82 linked_chunk_id: LinkedChunkId<'_>,
83 ) -> Result<Vec<ChunkMetadata>, Self::Error>;
84
85 async fn load_last_chunk(
90 &self,
91 linked_chunk_id: LinkedChunkId<'_>,
92 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error>;
93
94 async fn load_previous_chunk(
100 &self,
101 linked_chunk_id: LinkedChunkId<'_>,
102 before_chunk_identifier: ChunkIdentifier,
103 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error>;
104
105 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error>;
116
117 async fn filter_duplicated_events(
120 &self,
121 linked_chunk_id: LinkedChunkId<'_>,
122 events: Vec<OwnedEventId>,
123 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error>;
124
125 async fn find_event(
130 &self,
131 room_id: &RoomId,
132 event_id: &EventId,
133 ) -> Result<Option<Event>, Self::Error>;
134
135 async fn find_event_relations(
152 &self,
153 room_id: &RoomId,
154 event_id: &EventId,
155 filter: Option<&[RelationType]>,
156 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error>;
157
158 async fn get_room_events(
163 &self,
164 room_id: &RoomId,
165 event_type: Option<&str>,
166 session_id: Option<&str>,
167 ) -> Result<Vec<Event>, Self::Error>;
168
169 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error>;
178
179 async fn close(&self) -> Result<(), Self::Error>;
185
186 async fn reopen(&self) -> Result<(), Self::Error>;
189
190 #[doc(hidden)]
196 async fn optimize(&self) -> Result<(), Self::Error>;
197
198 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
200}
201
202#[repr(transparent)]
203struct EraseEventCacheStoreError<T>(T);
204
205#[cfg(not(tarpaulin_include))]
206impl<T: fmt::Debug> fmt::Debug for EraseEventCacheStoreError<T> {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 self.0.fmt(f)
209 }
210}
211
212#[cfg_attr(target_family = "wasm", async_trait(?Send))]
213#[cfg_attr(not(target_family = "wasm"), async_trait)]
214impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
215 type Error = EventCacheStoreError;
216
217 async fn try_take_leased_lock(
218 &self,
219 lease_duration_ms: u32,
220 key: &str,
221 holder: &str,
222 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
223 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
224 }
225
226 async fn handle_linked_chunk_updates(
227 &self,
228 linked_chunk_id: LinkedChunkId<'_>,
229 updates: Vec<Update<Event, Gap>>,
230 ) -> Result<(), Self::Error> {
231 self.0.handle_linked_chunk_updates(linked_chunk_id, updates).await.map_err(Into::into)
232 }
233
234 async fn load_all_chunks(
235 &self,
236 linked_chunk_id: LinkedChunkId<'_>,
237 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
238 self.0.load_all_chunks(linked_chunk_id).await.map_err(Into::into)
239 }
240
241 async fn load_all_chunks_metadata(
242 &self,
243 linked_chunk_id: LinkedChunkId<'_>,
244 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
245 self.0.load_all_chunks_metadata(linked_chunk_id).await.map_err(Into::into)
246 }
247
248 async fn load_last_chunk(
249 &self,
250 linked_chunk_id: LinkedChunkId<'_>,
251 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
252 self.0.load_last_chunk(linked_chunk_id).await.map_err(Into::into)
253 }
254
255 async fn load_previous_chunk(
256 &self,
257 linked_chunk_id: LinkedChunkId<'_>,
258 before_chunk_identifier: ChunkIdentifier,
259 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
260 self.0
261 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
262 .await
263 .map_err(Into::into)
264 }
265
266 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
267 self.0.clear_all_linked_chunks().await.map_err(Into::into)
268 }
269
270 async fn filter_duplicated_events(
271 &self,
272 linked_chunk_id: LinkedChunkId<'_>,
273 events: Vec<OwnedEventId>,
274 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
275 self.0.filter_duplicated_events(linked_chunk_id, events).await.map_err(Into::into)
276 }
277
278 async fn find_event(
279 &self,
280 room_id: &RoomId,
281 event_id: &EventId,
282 ) -> Result<Option<Event>, Self::Error> {
283 self.0.find_event(room_id, event_id).await.map_err(Into::into)
284 }
285
286 async fn find_event_relations(
287 &self,
288 room_id: &RoomId,
289 event_id: &EventId,
290 filter: Option<&[RelationType]>,
291 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
292 self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into)
293 }
294
295 async fn get_room_events(
296 &self,
297 room_id: &RoomId,
298 event_type: Option<&str>,
299 session_id: Option<&str>,
300 ) -> Result<Vec<Event>, Self::Error> {
301 self.0.get_room_events(room_id, event_type, session_id).await.map_err(Into::into)
302 }
303
304 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
305 self.0.save_event(room_id, event).await.map_err(Into::into)
306 }
307
308 async fn close(&self) -> Result<(), Self::Error> {
309 self.0.close().await.map_err(Into::into)
310 }
311
312 async fn reopen(&self) -> Result<(), Self::Error> {
313 self.0.reopen().await.map_err(Into::into)
314 }
315
316 async fn optimize(&self) -> Result<(), Self::Error> {
317 self.0.optimize().await.map_err(Into::into)?;
318 Ok(())
319 }
320
321 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
322 Ok(self.0.get_size().await.map_err(Into::into)?)
323 }
324}
325
326pub type DynEventCacheStore = dyn EventCacheStore<Error = EventCacheStoreError>;
328
329pub trait IntoEventCacheStore {
335 #[doc(hidden)]
336 fn into_event_cache_store(self) -> Arc<DynEventCacheStore>;
337}
338
339impl IntoEventCacheStore for Arc<DynEventCacheStore> {
340 fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
341 self
342 }
343}
344
345impl<T> IntoEventCacheStore for T
346where
347 T: EventCacheStore + Sized + 'static,
348{
349 fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
350 Arc::new(EraseEventCacheStoreError(self))
351 }
352}
353
354impl<T> IntoEventCacheStore for Arc<T>
357where
358 T: EventCacheStore + 'static,
359{
360 fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
361 let ptr: *const T = Arc::into_raw(self);
362 let ptr_erased = ptr as *const EraseEventCacheStoreError<T>;
363 unsafe { Arc::from_raw(ptr_erased) }
366 }
367}