matrix_sdk_base/event_cache/store/
traits.rs1use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19 AsyncTraitDeps,
20 cross_process_lock::CrossProcessLockGeneration,
21 linked_chunk::{
22 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
23 RawChunk, Update,
24 },
25};
26use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
27
28use super::EventCacheStoreError;
29use crate::event_cache::{Event, Gap};
30
31pub const DEFAULT_CHUNK_CAPACITY: usize = 128;
35
36#[cfg_attr(target_family = "wasm", async_trait(?Send))]
39#[cfg_attr(not(target_family = "wasm"), async_trait)]
40pub trait EventCacheStore: AsyncTraitDeps {
41 type Error: fmt::Debug + Into<EventCacheStoreError>;
43
44 async fn try_take_leased_lock(
46 &self,
47 lease_duration_ms: u32,
48 key: &str,
49 holder: &str,
50 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error>;
51
52 async fn handle_linked_chunk_updates(
56 &self,
57 linked_chunk_id: LinkedChunkId<'_>,
58 updates: Vec<Update<Event, Gap>>,
59 ) -> Result<(), Self::Error>;
60
61 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
63 self.handle_linked_chunk_updates(LinkedChunkId::Room(room_id), vec![Update::Clear]).await
66 }
67
68 #[doc(hidden)]
71 async fn load_all_chunks(
72 &self,
73 linked_chunk_id: LinkedChunkId<'_>,
74 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error>;
75
76 async fn load_all_chunks_metadata(
81 &self,
82 linked_chunk_id: LinkedChunkId<'_>,
83 ) -> Result<Vec<ChunkMetadata>, Self::Error>;
84
85 async fn load_last_chunk(
90 &self,
91 linked_chunk_id: LinkedChunkId<'_>,
92 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error>;
93
94 async fn load_previous_chunk(
100 &self,
101 linked_chunk_id: LinkedChunkId<'_>,
102 before_chunk_identifier: ChunkIdentifier,
103 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error>;
104
105 async fn clear_all_events(&self) -> Result<(), Self::Error>;
115
116 async fn filter_duplicated_events(
119 &self,
120 linked_chunk_id: LinkedChunkId<'_>,
121 events: Vec<OwnedEventId>,
122 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error>;
123
124 async fn find_event(
129 &self,
130 room_id: &RoomId,
131 event_id: &EventId,
132 ) -> Result<Option<Event>, Self::Error>;
133
134 async fn find_event_relations(
151 &self,
152 room_id: &RoomId,
153 event_id: &EventId,
154 filter: Option<&[RelationType]>,
155 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error>;
156
157 async fn get_room_events(
162 &self,
163 room_id: &RoomId,
164 event_type: Option<&str>,
165 session_id: Option<&str>,
166 ) -> Result<Vec<Event>, Self::Error>;
167
168 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error>;
177
178 async fn close(&self) -> Result<(), Self::Error>;
184
185 async fn reopen(&self) -> Result<(), Self::Error>;
188
189 #[doc(hidden)]
195 async fn optimize(&self) -> Result<(), Self::Error>;
196
197 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
199}
200
201#[repr(transparent)]
202struct EraseEventCacheStoreError<T>(T);
203
204#[cfg(not(tarpaulin_include))]
205impl<T: fmt::Debug> fmt::Debug for EraseEventCacheStoreError<T> {
206 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207 self.0.fmt(f)
208 }
209}
210
211#[cfg_attr(target_family = "wasm", async_trait(?Send))]
212#[cfg_attr(not(target_family = "wasm"), async_trait)]
213impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
214 type Error = EventCacheStoreError;
215
216 async fn try_take_leased_lock(
217 &self,
218 lease_duration_ms: u32,
219 key: &str,
220 holder: &str,
221 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
222 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await.map_err(Into::into)
223 }
224
225 async fn handle_linked_chunk_updates(
226 &self,
227 linked_chunk_id: LinkedChunkId<'_>,
228 updates: Vec<Update<Event, Gap>>,
229 ) -> Result<(), Self::Error> {
230 self.0.handle_linked_chunk_updates(linked_chunk_id, updates).await.map_err(Into::into)
231 }
232
233 async fn load_all_chunks(
234 &self,
235 linked_chunk_id: LinkedChunkId<'_>,
236 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
237 self.0.load_all_chunks(linked_chunk_id).await.map_err(Into::into)
238 }
239
240 async fn load_all_chunks_metadata(
241 &self,
242 linked_chunk_id: LinkedChunkId<'_>,
243 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
244 self.0.load_all_chunks_metadata(linked_chunk_id).await.map_err(Into::into)
245 }
246
247 async fn load_last_chunk(
248 &self,
249 linked_chunk_id: LinkedChunkId<'_>,
250 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
251 self.0.load_last_chunk(linked_chunk_id).await.map_err(Into::into)
252 }
253
254 async fn load_previous_chunk(
255 &self,
256 linked_chunk_id: LinkedChunkId<'_>,
257 before_chunk_identifier: ChunkIdentifier,
258 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
259 self.0
260 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
261 .await
262 .map_err(Into::into)
263 }
264
265 async fn clear_all_events(&self) -> Result<(), Self::Error> {
266 self.0.clear_all_events().await.map_err(Into::into)
267 }
268
269 async fn filter_duplicated_events(
270 &self,
271 linked_chunk_id: LinkedChunkId<'_>,
272 events: Vec<OwnedEventId>,
273 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
274 self.0.filter_duplicated_events(linked_chunk_id, events).await.map_err(Into::into)
275 }
276
277 async fn find_event(
278 &self,
279 room_id: &RoomId,
280 event_id: &EventId,
281 ) -> Result<Option<Event>, Self::Error> {
282 self.0.find_event(room_id, event_id).await.map_err(Into::into)
283 }
284
285 async fn find_event_relations(
286 &self,
287 room_id: &RoomId,
288 event_id: &EventId,
289 filter: Option<&[RelationType]>,
290 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
291 self.0.find_event_relations(room_id, event_id, filter).await.map_err(Into::into)
292 }
293
294 async fn get_room_events(
295 &self,
296 room_id: &RoomId,
297 event_type: Option<&str>,
298 session_id: Option<&str>,
299 ) -> Result<Vec<Event>, Self::Error> {
300 self.0.get_room_events(room_id, event_type, session_id).await.map_err(Into::into)
301 }
302
303 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
304 self.0.save_event(room_id, event).await.map_err(Into::into)
305 }
306
307 async fn close(&self) -> Result<(), Self::Error> {
308 self.0.close().await.map_err(Into::into)
309 }
310
311 async fn reopen(&self) -> Result<(), Self::Error> {
312 self.0.reopen().await.map_err(Into::into)
313 }
314
315 async fn optimize(&self) -> Result<(), Self::Error> {
316 self.0.optimize().await.map_err(Into::into)?;
317 Ok(())
318 }
319
320 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
321 Ok(self.0.get_size().await.map_err(Into::into)?)
322 }
323}
324
325pub type DynEventCacheStore = dyn EventCacheStore<Error = EventCacheStoreError>;
327
328pub trait IntoEventCacheStore {
334 #[doc(hidden)]
335 fn into_event_cache_store(self) -> Arc<DynEventCacheStore>;
336}
337
338impl IntoEventCacheStore for Arc<DynEventCacheStore> {
339 fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
340 self
341 }
342}
343
344impl<T> IntoEventCacheStore for T
345where
346 T: EventCacheStore + Sized + 'static,
347{
348 fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
349 Arc::new(EraseEventCacheStoreError(self))
350 }
351}
352
353impl<T> IntoEventCacheStore for Arc<T>
356where
357 T: EventCacheStore + 'static,
358{
359 fn into_event_cache_store(self) -> Arc<DynEventCacheStore> {
360 let ptr: *const T = Arc::into_raw(self);
361 let ptr_erased = ptr as *const EraseEventCacheStoreError<T>;
362 unsafe { Arc::from_raw(ptr_erased) }
365 }
366}