matrix_sdk_base/event_cache/store/
mod.rs1use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27mod memory_store;
28mod traits;
29
30use matrix_sdk_common::cross_process_lock::{
31 CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
32 MappedCrossProcessLockState, TryLock,
33};
34pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
35use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
36use tracing::trace;
37
38#[cfg(any(test, feature = "testing"))]
39pub use self::integration_tests::EventCacheStoreIntegrationTests;
40pub use self::{
41 memory_store::MemoryStore,
42 traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
43};
44
45#[derive(Clone)]
47pub struct EventCacheStoreLock {
48 cross_process_lock: Arc<CrossProcessLock<LockableEventCacheStore>>,
50
51 store: Arc<DynEventCacheStore>,
55}
56
57#[cfg(not(tarpaulin_include))]
58impl fmt::Debug for EventCacheStoreLock {
59 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
60 formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
61 }
62}
63
64impl EventCacheStoreLock {
65 pub fn new<S>(store: S, holder: String) -> Self
70 where
71 S: IntoEventCacheStore,
72 {
73 let store = store.into_event_cache_store();
74
75 Self {
76 cross_process_lock: Arc::new(CrossProcessLock::new(
77 LockableEventCacheStore(store.clone()),
78 "default".to_owned(),
79 holder,
80 )),
81 store,
82 }
83 }
84
85 pub async fn lock(&self) -> Result<EventCacheStoreLockState, CrossProcessLockError> {
87 let lock_state =
88 self.cross_process_lock.spin_lock(None).await??.map(|cross_process_lock_guard| {
89 EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.clone() }
90 });
91
92 Ok(lock_state)
93 }
94}
95
96pub type EventCacheStoreLockState = MappedCrossProcessLockState<EventCacheStoreLockGuard>;
100
101#[derive(Clone)]
105pub struct EventCacheStoreLockGuard {
106 #[allow(unused)]
108 cross_process_lock_guard: CrossProcessLockGuard,
109
110 store: Arc<DynEventCacheStore>,
112}
113
114impl EventCacheStoreLockGuard {
115 pub fn clear_dirty(this: &Self) {
120 this.cross_process_lock_guard.clear_dirty();
121 }
122
123 pub fn is_dirty(this: &Self) -> bool {
125 this.cross_process_lock_guard.is_dirty()
126 }
127}
128
129#[cfg(not(tarpaulin_include))]
130impl fmt::Debug for EventCacheStoreLockGuard {
131 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
132 formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
133 }
134}
135
136impl Deref for EventCacheStoreLockGuard {
137 type Target = DynEventCacheStore;
138
139 fn deref(&self) -> &Self::Target {
140 self.store.as_ref()
141 }
142}
143
144#[derive(Debug, thiserror::Error)]
146pub enum EventCacheStoreError {
147 #[error(transparent)]
149 Backend(Box<dyn std::error::Error + Send + Sync>),
150
151 #[error("The event cache store failed to be unlocked")]
154 Locked,
155
156 #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
158 Unencrypted,
159
160 #[error("Error encrypting or decrypting data from the event cache store: {0}")]
162 Encryption(#[from] StoreEncryptionError),
163
164 #[error("Error encoding or decoding data from the event cache store: {0}")]
166 Codec(#[from] Utf8Error),
167
168 #[error("Error serializing or deserializing data from the event cache store: {0}")]
170 Serialization(#[from] serde_json::Error),
171
172 #[error(
174 "The database format of the event cache store changed in an incompatible way, \
175 current version: {0}, latest version: {1}"
176 )]
177 UnsupportedDatabaseVersion(usize, usize),
178
179 #[error("The store contains invalid data: {details}")]
181 InvalidData {
182 details: String,
184 },
185}
186
187impl EventCacheStoreError {
188 #[inline]
192 pub fn backend<E>(error: E) -> Self
193 where
194 E: std::error::Error + Send + Sync + 'static,
195 {
196 Self::Backend(Box::new(error))
197 }
198}
199
200impl From<EventCacheStoreError> for CrossProcessLockError {
201 fn from(value: EventCacheStoreError) -> Self {
202 Self::TryLock(Box::new(value))
203 }
204}
205
206pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
208
209#[derive(Clone, Debug)]
212struct LockableEventCacheStore(Arc<DynEventCacheStore>);
213
214impl TryLock for LockableEventCacheStore {
215 type LockError = EventCacheStoreError;
216
217 async fn try_lock(
218 &self,
219 lease_duration_ms: u32,
220 key: &str,
221 holder: &str,
222 ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
223 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
224 }
225}
226
227pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
233 #[derive(serde::Deserialize)]
234 struct RelatesTo {
235 event_id: OwnedEventId,
236 rel_type: String,
237 }
238
239 #[derive(serde::Deserialize)]
240 struct EventContent {
241 #[serde(rename = "m.relates_to")]
242 rel: Option<RelatesTo>,
243 }
244
245 match event.get_field::<EventContent>("content") {
246 Ok(event_content) => {
247 event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
248 }
249 Err(err) => {
250 trace!("when extracting relation data from an event: {err}");
251 None
252 }
253 }
254}