matrix_sdk_base/event_cache/store/
mod.rs1use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27mod memory_store;
28mod traits;
29
30use matrix_sdk_common::cross_process_lock::{
31 CrossProcessLock, CrossProcessLockConfig, CrossProcessLockError, CrossProcessLockGeneration,
32 CrossProcessLockGuard, MappedCrossProcessLockState, TryLock,
33};
34pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
35use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
36use tracing::trace;
37
38#[cfg(any(test, feature = "testing"))]
39pub use self::integration_tests::EventCacheStoreIntegrationTests;
40pub use self::{
41 memory_store::MemoryStore,
42 traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
43};
44
45#[derive(Clone)]
47pub struct EventCacheStoreLock {
48 cross_process_lock: Arc<CrossProcessLock<LockableEventCacheStore>>,
50
51 store: Arc<DynEventCacheStore>,
55}
56
57#[cfg(not(tarpaulin_include))]
58impl fmt::Debug for EventCacheStoreLock {
59 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
60 formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
61 }
62}
63
64impl EventCacheStoreLock {
65 pub fn new<S>(store: S, cross_process_lock_config: CrossProcessLockConfig) -> Self
70 where
71 S: IntoEventCacheStore,
72 {
73 let store = store.into_event_cache_store();
74
75 let cross_process_lock = Arc::new(CrossProcessLock::new(
76 LockableEventCacheStore(store.clone()),
77 "default".to_owned(),
78 cross_process_lock_config,
79 ));
80 Self { cross_process_lock, store }
81 }
82
83 pub async fn close(&self) -> Result<(), EventCacheStoreError> {
85 self.store.close().await
86 }
87
88 pub async fn reopen(&self) -> Result<(), EventCacheStoreError> {
90 self.store.reopen().await
91 }
92
93 pub async fn lock(&self) -> Result<EventCacheStoreLockState, CrossProcessLockError> {
95 Ok(self.cross_process_lock.spin_lock(None).await??.map(|cross_process_lock_guard| {
96 EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.clone() }
97 }))
98 }
99}
100
101pub type EventCacheStoreLockState = MappedCrossProcessLockState<EventCacheStoreLockGuard>;
105
106#[derive(Clone)]
110pub struct EventCacheStoreLockGuard {
111 #[allow(unused)]
113 cross_process_lock_guard: CrossProcessLockGuard,
114
115 store: Arc<DynEventCacheStore>,
117}
118
119impl EventCacheStoreLockGuard {
120 pub fn clear_dirty(this: &Self) {
125 this.cross_process_lock_guard.clear_dirty();
126 }
127
128 pub fn is_dirty(this: &Self) -> bool {
130 this.cross_process_lock_guard.is_dirty()
131 }
132}
133
134#[cfg(not(tarpaulin_include))]
135impl fmt::Debug for EventCacheStoreLockGuard {
136 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
137 formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
138 }
139}
140
141impl Deref for EventCacheStoreLockGuard {
142 type Target = DynEventCacheStore;
143
144 fn deref(&self) -> &Self::Target {
145 self.store.as_ref()
146 }
147}
148
149#[derive(Clone, Debug, thiserror::Error)]
151pub enum EventCacheStoreError {
152 #[error(transparent)]
154 Backend(Arc<dyn std::error::Error + Send + Sync>),
155
156 #[error("The event cache store failed to be unlocked")]
159 Locked,
160
161 #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
163 Unencrypted,
164
165 #[error("Error encrypting or decrypting data from the event cache store: {0}")]
167 Encryption(#[from] Arc<StoreEncryptionError>),
168
169 #[error("Error encoding or decoding data from the event cache store: {0}")]
171 Codec(#[from] Utf8Error),
172
173 #[error("Error serializing or deserializing data from the event cache store: {0}")]
175 Serialization(#[from] Arc<serde_json::Error>),
176
177 #[error(
179 "The database format of the event cache store changed in an incompatible way, \
180 current version: {0}, latest version: {1}"
181 )]
182 UnsupportedDatabaseVersion(usize, usize),
183
184 #[error("The store contains invalid data: {details}")]
186 InvalidData {
187 details: String,
189 },
190}
191
192impl EventCacheStoreError {
193 #[inline]
197 pub fn backend<E>(error: E) -> Self
198 where
199 E: std::error::Error + Send + Sync + 'static,
200 {
201 Self::Backend(Arc::new(error))
202 }
203}
204
205impl From<EventCacheStoreError> for CrossProcessLockError {
206 fn from(value: EventCacheStoreError) -> Self {
207 Self::TryLock(Arc::new(value))
208 }
209}
210
211pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
213
214#[derive(Clone, Debug)]
217struct LockableEventCacheStore(Arc<DynEventCacheStore>);
218
219impl TryLock for LockableEventCacheStore {
220 type LockError = EventCacheStoreError;
221
222 async fn try_lock(
223 &self,
224 lease_duration_ms: u32,
225 key: &str,
226 holder: &str,
227 ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
228 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
229 }
230}
231
232pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
238 #[derive(serde::Deserialize)]
239 struct RelatesTo {
240 event_id: OwnedEventId,
241 rel_type: String,
242 }
243
244 #[derive(serde::Deserialize)]
245 struct EventContent {
246 #[serde(rename = "m.relates_to")]
247 rel: Option<RelatesTo>,
248 }
249
250 match event.get_field::<EventContent>("content") {
251 Ok(event_content) => {
252 event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
253 }
254 Err(err) => {
255 trace!("when extracting relation data from an event: {err}");
256 None
257 }
258 }
259}