matrix_sdk_base/event_cache/store/
mod.rs1use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27mod memory_store;
28mod traits;
29
30use matrix_sdk_common::cross_process_lock::{
31 CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock,
32};
33pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
34use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
35use tracing::trace;
36
37#[cfg(any(test, feature = "testing"))]
38pub use self::integration_tests::EventCacheStoreIntegrationTests;
39pub use self::{
40 memory_store::MemoryStore,
41 traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
42};
43
44#[derive(Clone)]
46pub struct EventCacheStoreLock {
47 cross_process_lock: Arc<CrossProcessLock<LockableEventCacheStore>>,
49
50 store: Arc<DynEventCacheStore>,
54}
55
56#[cfg(not(tarpaulin_include))]
57impl fmt::Debug for EventCacheStoreLock {
58 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
59 formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
60 }
61}
62
63impl EventCacheStoreLock {
64 pub fn new<S>(store: S, holder: String) -> Self
69 where
70 S: IntoEventCacheStore,
71 {
72 let store = store.into_event_cache_store();
73
74 Self {
75 cross_process_lock: Arc::new(CrossProcessLock::new(
76 LockableEventCacheStore(store.clone()),
77 "default".to_owned(),
78 holder,
79 )),
80 store,
81 }
82 }
83
84 pub async fn lock(&self) -> Result<EventCacheStoreLockGuard<'_>, CrossProcessLockError> {
86 let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?;
87
88 Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
89 }
90}
91
92pub struct EventCacheStoreLockGuard<'a> {
96 #[allow(unused)]
98 cross_process_lock_guard: CrossProcessLockGuard,
99
100 store: &'a DynEventCacheStore,
102}
103
104#[cfg(not(tarpaulin_include))]
105impl fmt::Debug for EventCacheStoreLockGuard<'_> {
106 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
107 formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
108 }
109}
110
111impl Deref for EventCacheStoreLockGuard<'_> {
112 type Target = DynEventCacheStore;
113
114 fn deref(&self) -> &Self::Target {
115 self.store
116 }
117}
118
119#[derive(Debug, thiserror::Error)]
121pub enum EventCacheStoreError {
122 #[error(transparent)]
124 Backend(Box<dyn std::error::Error + Send + Sync>),
125
126 #[error("The event cache store failed to be unlocked")]
129 Locked,
130
131 #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
133 Unencrypted,
134
135 #[error("Error encrypting or decrypting data from the event cache store: {0}")]
137 Encryption(#[from] StoreEncryptionError),
138
139 #[error("Error encoding or decoding data from the event cache store: {0}")]
141 Codec(#[from] Utf8Error),
142
143 #[error("Error serializing or deserializing data from the event cache store: {0}")]
145 Serialization(#[from] serde_json::Error),
146
147 #[error(
149 "The database format of the event cache store changed in an incompatible way, \
150 current version: {0}, latest version: {1}"
151 )]
152 UnsupportedDatabaseVersion(usize, usize),
153
154 #[error("The store contains invalid data: {details}")]
156 InvalidData {
157 details: String,
159 },
160}
161
162impl EventCacheStoreError {
163 #[inline]
167 pub fn backend<E>(error: E) -> Self
168 where
169 E: std::error::Error + Send + Sync + 'static,
170 {
171 Self::Backend(Box::new(error))
172 }
173}
174
175pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
177
178#[derive(Clone, Debug)]
181struct LockableEventCacheStore(Arc<DynEventCacheStore>);
182
183impl TryLock for LockableEventCacheStore {
184 type LockError = EventCacheStoreError;
185
186 async fn try_lock(
187 &self,
188 lease_duration_ms: u32,
189 key: &str,
190 holder: &str,
191 ) -> std::result::Result<bool, Self::LockError> {
192 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
193 }
194}
195
196pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
202 #[derive(serde::Deserialize)]
203 struct RelatesTo {
204 event_id: OwnedEventId,
205 rel_type: String,
206 }
207
208 #[derive(serde::Deserialize)]
209 struct EventContent {
210 #[serde(rename = "m.relates_to")]
211 rel: Option<RelatesTo>,
212 }
213
214 match event.get_field::<EventContent>("content") {
215 Ok(event_content) => {
216 event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
217 }
218 Err(err) => {
219 trace!("when extracting relation data from an event: {err}");
220 None
221 }
222 }
223}