matrix_sdk_base/event_cache/store/
mod.rs1use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27mod memory_store;
28mod traits;
29
30use matrix_sdk_common::cross_process_lock::{
31 CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, TryLock,
32};
33pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
34use ruma::{
35 OwnedEventId,
36 events::{AnySyncTimelineEvent, relation::RelationType},
37 serde::Raw,
38};
39use tracing::trace;
40
41#[cfg(any(test, feature = "testing"))]
42pub use self::integration_tests::EventCacheStoreIntegrationTests;
43pub use self::{
44 memory_store::MemoryStore,
45 traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
46};
47
48#[derive(Clone)]
50pub struct EventCacheStoreLock {
51 cross_process_lock: Arc<CrossProcessLock<LockableEventCacheStore>>,
53
54 store: Arc<DynEventCacheStore>,
58}
59
60#[cfg(not(tarpaulin_include))]
61impl fmt::Debug for EventCacheStoreLock {
62 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
63 formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
64 }
65}
66
67impl EventCacheStoreLock {
68 pub fn new<S>(store: S, holder: String) -> Self
73 where
74 S: IntoEventCacheStore,
75 {
76 let store = store.into_event_cache_store();
77
78 Self {
79 cross_process_lock: Arc::new(CrossProcessLock::new(
80 LockableEventCacheStore(store.clone()),
81 "default".to_owned(),
82 holder,
83 )),
84 store,
85 }
86 }
87
88 pub async fn lock(&self) -> Result<EventCacheStoreLockGuard<'_>, CrossProcessLockError> {
90 let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await?;
91
92 Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
93 }
94}
95
96pub struct EventCacheStoreLockGuard<'a> {
100 #[allow(unused)]
102 cross_process_lock_guard: CrossProcessLockGuard,
103
104 store: &'a DynEventCacheStore,
106}
107
108#[cfg(not(tarpaulin_include))]
109impl fmt::Debug for EventCacheStoreLockGuard<'_> {
110 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
111 formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
112 }
113}
114
115impl Deref for EventCacheStoreLockGuard<'_> {
116 type Target = DynEventCacheStore;
117
118 fn deref(&self) -> &Self::Target {
119 self.store
120 }
121}
122
123#[derive(Debug, thiserror::Error)]
125pub enum EventCacheStoreError {
126 #[error(transparent)]
128 Backend(Box<dyn std::error::Error + Send + Sync>),
129
130 #[error("The event cache store failed to be unlocked")]
133 Locked,
134
135 #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
137 Unencrypted,
138
139 #[error("Error encrypting or decrypting data from the event cache store: {0}")]
141 Encryption(#[from] StoreEncryptionError),
142
143 #[error("Error encoding or decoding data from the event cache store: {0}")]
145 Codec(#[from] Utf8Error),
146
147 #[error("Error serializing or deserializing data from the event cache store: {0}")]
149 Serialization(#[from] serde_json::Error),
150
151 #[error(
153 "The database format of the event cache store changed in an incompatible way, \
154 current version: {0}, latest version: {1}"
155 )]
156 UnsupportedDatabaseVersion(usize, usize),
157
158 #[error("The store contains invalid data: {details}")]
160 InvalidData {
161 details: String,
163 },
164}
165
166impl EventCacheStoreError {
167 #[inline]
171 pub fn backend<E>(error: E) -> Self
172 where
173 E: std::error::Error + Send + Sync + 'static,
174 {
175 Self::Backend(Box::new(error))
176 }
177}
178
179pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
181
182#[derive(Clone, Debug)]
185struct LockableEventCacheStore(Arc<DynEventCacheStore>);
186
187impl TryLock for LockableEventCacheStore {
188 type LockError = EventCacheStoreError;
189
190 async fn try_lock(
191 &self,
192 lease_duration_ms: u32,
193 key: &str,
194 holder: &str,
195 ) -> std::result::Result<bool, Self::LockError> {
196 self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
197 }
198}
199
200pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
206 #[derive(serde::Deserialize)]
207 struct RelatesTo {
208 event_id: OwnedEventId,
209 rel_type: String,
210 }
211
212 #[derive(serde::Deserialize)]
213 struct EventContent {
214 #[serde(rename = "m.relates_to")]
215 rel: Option<RelatesTo>,
216 }
217
218 match event.get_field::<EventContent>("content") {
219 Ok(event_content) => {
220 event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
221 }
222 Err(err) => {
223 trace!("when extracting relation data from an event: {err}");
224 None
225 }
226 }
227}
228
229pub fn compute_filters_string(filters: Option<&[RelationType]>) -> Option<Vec<String>> {
234 filters.map(|filter| {
235 filter
236 .iter()
237 .map(|f| {
238 if *f == RelationType::Replacement { "m.replace".to_owned() } else { f.to_string() }
239 })
240 .collect()
241 })
242}