Skip to main content

matrix_sdk_base/event_cache/store/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache stores holds events when the cache was
16//! activated to save bandwidth at the cost of increased storage space usage.
17//!
18//! Implementing the `EventCacheStore` trait, you can plug any storage backend
19//! into the event cache for the actual storage. By default this brings an
20//! in-memory store.
21
22use std::{fmt, ops::Deref, str::Utf8Error, sync::Arc};
23
24#[cfg(any(test, feature = "testing"))]
25#[macro_use]
26pub mod integration_tests;
27mod memory_store;
28mod traits;
29
30use matrix_sdk_common::cross_process_lock::{
31    CrossProcessLock, CrossProcessLockConfig, CrossProcessLockError, CrossProcessLockGeneration,
32    CrossProcessLockGuard, MappedCrossProcessLockState, TryLock,
33};
34pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
35use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
36use tracing::trace;
37
38#[cfg(any(test, feature = "testing"))]
39pub use self::integration_tests::EventCacheStoreIntegrationTests;
40pub use self::{
41    memory_store::MemoryStore,
42    traits::{DEFAULT_CHUNK_CAPACITY, DynEventCacheStore, EventCacheStore, IntoEventCacheStore},
43};
44
45/// The high-level public type to represent an `EventCacheStore` lock.
46#[derive(Clone)]
47pub struct EventCacheStoreLock {
48    /// The inner cross process lock that is used to lock the `EventCacheStore`.
49    cross_process_lock: Arc<CrossProcessLock<LockableEventCacheStore>>,
50
51    /// The store itself.
52    ///
53    /// That's the only place where the store exists.
54    store: Arc<DynEventCacheStore>,
55}
56
57#[cfg(not(tarpaulin_include))]
58impl fmt::Debug for EventCacheStoreLock {
59    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
60        formatter.debug_struct("EventCacheStoreLock").finish_non_exhaustive()
61    }
62}
63
64impl EventCacheStoreLock {
65    /// Create a new lock around the [`EventCacheStore`].
66    ///
67    /// The `cross_process_lock_config` argument controls whether we need to
68    /// hold the cross process lock or not.
69    pub fn new<S>(store: S, cross_process_lock_config: CrossProcessLockConfig) -> Self
70    where
71        S: IntoEventCacheStore,
72    {
73        let store = store.into_event_cache_store();
74
75        let cross_process_lock = Arc::new(CrossProcessLock::new(
76            LockableEventCacheStore(store.clone()),
77            "default".to_owned(),
78            cross_process_lock_config,
79        ));
80        Self { cross_process_lock, store }
81    }
82
83    /// Close the store, releasing database connections and file locks.
84    pub async fn close(&self) -> Result<(), EventCacheStoreError> {
85        self.store.close().await
86    }
87
88    /// Reopen the store after a close.
89    pub async fn reopen(&self) -> Result<(), EventCacheStoreError> {
90        self.store.reopen().await
91    }
92
93    /// Acquire a spin lock (see [`CrossProcessLock::spin_lock`]).
94    pub async fn lock(&self) -> Result<EventCacheStoreLockState, CrossProcessLockError> {
95        Ok(self.cross_process_lock.spin_lock(None).await??.map(|cross_process_lock_guard| {
96            EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.clone() }
97        }))
98    }
99}
100
101/// The equivalent of [`CrossProcessLockState`] but for the [`EventCacheStore`].
102///
103/// [`CrossProcessLockState`]: matrix_sdk_common::cross_process_lock::CrossProcessLockState
104pub type EventCacheStoreLockState = MappedCrossProcessLockState<EventCacheStoreLockGuard>;
105
106/// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`].
107/// When this structure is dropped (falls out of scope), the lock will be
108/// unlocked.
109#[derive(Clone)]
110pub struct EventCacheStoreLockGuard {
111    /// The cross process lock guard.
112    #[allow(unused)]
113    cross_process_lock_guard: CrossProcessLockGuard,
114
115    /// A reference to the store.
116    store: Arc<DynEventCacheStore>,
117}
118
119impl EventCacheStoreLockGuard {
120    /// Forward to [`CrossProcessLockGuard::clear_dirty`].
121    ///
122    /// This is an associated method to avoid colliding with the [`Deref`]
123    /// implementation.
124    pub fn clear_dirty(this: &Self) {
125        this.cross_process_lock_guard.clear_dirty();
126    }
127
128    /// Force to [`CrossProcessLockGuard::is_dirty`].
129    pub fn is_dirty(this: &Self) -> bool {
130        this.cross_process_lock_guard.is_dirty()
131    }
132}
133
134#[cfg(not(tarpaulin_include))]
135impl fmt::Debug for EventCacheStoreLockGuard {
136    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
137        formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
138    }
139}
140
141impl Deref for EventCacheStoreLockGuard {
142    type Target = DynEventCacheStore;
143
144    fn deref(&self) -> &Self::Target {
145        self.store.as_ref()
146    }
147}
148
149/// Event cache store specific error type.
150#[derive(Clone, Debug, thiserror::Error)]
151pub enum EventCacheStoreError {
152    /// An error happened in the underlying database backend.
153    #[error(transparent)]
154    Backend(Arc<dyn std::error::Error + Send + Sync>),
155
156    /// The store is locked with a passphrase and an incorrect passphrase
157    /// was given.
158    #[error("The event cache store failed to be unlocked")]
159    Locked,
160
161    /// An unencrypted store was tried to be unlocked with a passphrase.
162    #[error("The event cache store is not encrypted but tried to be opened with a passphrase")]
163    Unencrypted,
164
165    /// The store failed to encrypt or decrypt some data.
166    #[error("Error encrypting or decrypting data from the event cache store: {0}")]
167    Encryption(#[from] Arc<StoreEncryptionError>),
168
169    /// The store failed to encode or decode some data.
170    #[error("Error encoding or decoding data from the event cache store: {0}")]
171    Codec(#[from] Utf8Error),
172
173    /// The store failed to serialize or deserialize some data.
174    #[error("Error serializing or deserializing data from the event cache store: {0}")]
175    Serialization(#[from] Arc<serde_json::Error>),
176
177    /// The database format has changed in a backwards incompatible way.
178    #[error(
179        "The database format of the event cache store changed in an incompatible way, \
180         current version: {0}, latest version: {1}"
181    )]
182    UnsupportedDatabaseVersion(usize, usize),
183
184    /// The store contains invalid data.
185    #[error("The store contains invalid data: {details}")]
186    InvalidData {
187        /// Details why the data contained in the store was invalid.
188        details: String,
189    },
190}
191
192impl EventCacheStoreError {
193    /// Create a new [`Backend`][Self::Backend] error.
194    ///
195    /// Shorthand for `EventCacheStoreError::Backend(Box::new(error))`.
196    #[inline]
197    pub fn backend<E>(error: E) -> Self
198    where
199        E: std::error::Error + Send + Sync + 'static,
200    {
201        Self::Backend(Arc::new(error))
202    }
203}
204
205impl From<EventCacheStoreError> for CrossProcessLockError {
206    fn from(value: EventCacheStoreError) -> Self {
207        Self::TryLock(Arc::new(value))
208    }
209}
210
211/// An `EventCacheStore` specific result type.
212pub type Result<T, E = EventCacheStoreError> = std::result::Result<T, E>;
213
214/// A type that wraps the [`EventCacheStore`] but implements [`TryLock`] to
215/// make it usable inside the cross process lock.
216#[derive(Clone, Debug)]
217struct LockableEventCacheStore(Arc<DynEventCacheStore>);
218
219impl TryLock for LockableEventCacheStore {
220    type LockError = EventCacheStoreError;
221
222    async fn try_lock(
223        &self,
224        lease_duration_ms: u32,
225        key: &str,
226        holder: &str,
227    ) -> std::result::Result<Option<CrossProcessLockGeneration>, Self::LockError> {
228        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
229    }
230}
231
232/// Helper to extract the relation information from an event.
233///
234/// If the event isn't in relation to another event, then this will return
235/// `None`. Otherwise, returns both the event id this event relates to, and the
236/// kind of relation as a string (e.g. `m.replace`).
237pub fn extract_event_relation(event: &Raw<AnySyncTimelineEvent>) -> Option<(OwnedEventId, String)> {
238    #[derive(serde::Deserialize)]
239    struct RelatesTo {
240        event_id: OwnedEventId,
241        rel_type: String,
242    }
243
244    #[derive(serde::Deserialize)]
245    struct EventContent {
246        #[serde(rename = "m.relates_to")]
247        rel: Option<RelatesTo>,
248    }
249
250    match event.get_field::<EventContent>("content") {
251        Ok(event_content) => {
252            event_content.and_then(|c| c.rel).map(|rel| (rel.event_id, rel.rel_type))
253        }
254        Err(err) => {
255            trace!("when extracting relation data from an event: {err}");
256            None
257        }
258    }
259}