matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22    cross_process_lock::{
23        CrossProcessLockGeneration,
24        memory_store_helper::{Lease, try_take_leased_lock},
25    },
26    linked_chunk::{
27        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28        RawChunk, Update, relational::RelationalLinkedChunk,
29    },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37/// In-memory, non-persistent implementation of the `EventCacheStore`.
38///
39/// Default if no other is configured at startup.
40///
41/// Note that this store is not transactional. This is particularly
42/// relevant when calling [`EventCacheStore::handle_linked_chunk_updates`],
43/// which consumes a list of [`Update`]s. When processing this list, if
44/// one of the [`Update`]s fails, the previous updates in the list
45/// will not be reversed.
46#[derive(Debug, Clone)]
47pub struct MemoryStore {
48    inner: Arc<StdRwLock<MemoryStoreInner>>,
49}
50
51#[derive(Debug)]
52struct MemoryStoreInner {
53    leases: HashMap<String, Lease>,
54    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
55}
56
57impl Default for MemoryStore {
58    fn default() -> Self {
59        Self {
60            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
61                leases: Default::default(),
62                events: RelationalLinkedChunk::new(),
63            })),
64        }
65    }
66}
67
68impl MemoryStore {
69    /// Create a new empty MemoryStore
70    pub fn new() -> Self {
71        Self::default()
72    }
73}
74
75#[cfg_attr(target_family = "wasm", async_trait(?Send))]
76#[cfg_attr(not(target_family = "wasm"), async_trait)]
77impl EventCacheStore for MemoryStore {
78    type Error = EventCacheStoreError;
79
80    async fn try_take_leased_lock(
81        &self,
82        lease_duration_ms: u32,
83        key: &str,
84        holder: &str,
85    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
86        let mut inner = self.inner.write().unwrap();
87
88        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
89    }
90
91    async fn handle_linked_chunk_updates(
92        &self,
93        linked_chunk_id: LinkedChunkId<'_>,
94        updates: Vec<Update<Event, Gap>>,
95    ) -> Result<(), Self::Error> {
96        let mut inner = self.inner.write().unwrap();
97        inner
98            .events
99            .apply_updates(linked_chunk_id, updates)
100            .map_err(|e| Self::Error::Backend(Box::new(e)))?;
101
102        Ok(())
103    }
104
105    async fn load_all_chunks(
106        &self,
107        linked_chunk_id: LinkedChunkId<'_>,
108    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
109        let inner = self.inner.read().unwrap();
110        inner
111            .events
112            .load_all_chunks(linked_chunk_id)
113            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
114    }
115
116    async fn load_all_chunks_metadata(
117        &self,
118        linked_chunk_id: LinkedChunkId<'_>,
119    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
120        let inner = self.inner.read().unwrap();
121        inner
122            .events
123            .load_all_chunks_metadata(linked_chunk_id)
124            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
125    }
126
127    async fn load_last_chunk(
128        &self,
129        linked_chunk_id: LinkedChunkId<'_>,
130    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
131        let inner = self.inner.read().unwrap();
132        inner
133            .events
134            .load_last_chunk(linked_chunk_id)
135            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
136    }
137
138    async fn load_previous_chunk(
139        &self,
140        linked_chunk_id: LinkedChunkId<'_>,
141        before_chunk_identifier: ChunkIdentifier,
142    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
143        let inner = self.inner.read().unwrap();
144        inner
145            .events
146            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
147            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
148    }
149
150    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
151        self.inner.write().unwrap().events.clear();
152        Ok(())
153    }
154
155    async fn filter_duplicated_events(
156        &self,
157        linked_chunk_id: LinkedChunkId<'_>,
158        mut events: Vec<OwnedEventId>,
159    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
160        if events.is_empty() {
161            return Ok(Vec::new());
162        }
163
164        let inner = self.inner.read().unwrap();
165
166        let mut duplicated_events = Vec::new();
167
168        for (event, position) in
169            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
170        {
171            if let Some(known_event_id) = event.event_id() {
172                // This event is a duplicate!
173                if let Some(index) =
174                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
175                {
176                    duplicated_events.push((events.remove(index), position));
177                }
178            }
179        }
180
181        Ok(duplicated_events)
182    }
183
184    async fn find_event(
185        &self,
186        room_id: &RoomId,
187        event_id: &EventId,
188    ) -> Result<Option<Event>, Self::Error> {
189        let inner = self.inner.read().unwrap();
190
191        let event = inner
192            .events
193            .items(room_id)
194            .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
195
196        Ok(event)
197    }
198
199    async fn find_event_relations(
200        &self,
201        room_id: &RoomId,
202        event_id: &EventId,
203        filters: Option<&[RelationType]>,
204    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
205        let inner = self.inner.read().unwrap();
206
207        let related_events = inner
208            .events
209            .items(room_id)
210            .filter_map(|(event, pos)| {
211                // Must have a relation.
212                let (related_to, rel_type) = extract_event_relation(event.raw())?;
213                let rel_type = RelationType::from(rel_type.as_str());
214
215                // Must relate to the target item.
216                if related_to != event_id {
217                    return None;
218                }
219
220                // Must not be filtered out.
221                if let Some(filters) = &filters {
222                    filters.contains(&rel_type).then_some((event.clone(), pos))
223                } else {
224                    Some((event.clone(), pos))
225                }
226            })
227            .collect();
228
229        Ok(related_events)
230    }
231
232    async fn get_room_events(
233        &self,
234        room_id: &RoomId,
235        event_type: Option<&str>,
236        session_id: Option<&str>,
237    ) -> Result<Vec<Event>, Self::Error> {
238        let inner = self.inner.read().unwrap();
239
240        let event: Vec<_> = inner
241            .events
242            .items(room_id)
243            .map(|(event, _pos)| event.clone())
244            .filter(|e| {
245                event_type
246                    .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
247            })
248            .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
249            .collect();
250
251        Ok(event)
252    }
253
254    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
255        if event.event_id().is_none() {
256            error!(%room_id, "Trying to save an event with no ID");
257            return Ok(());
258        }
259        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
260        Ok(())
261    }
262
263    async fn optimize(&self) -> Result<(), Self::Error> {
264        Ok(())
265    }
266
267    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
268        Ok(None)
269    }
270}
271
272#[cfg(test)]
273#[allow(unused_imports)] // There seems to be a false positive when importing the test macros.
274mod tests {
275    use super::{MemoryStore, Result};
276    use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
277
278    async fn get_event_cache_store() -> Result<MemoryStore> {
279        Ok(MemoryStore::new())
280    }
281
282    event_cache_store_integration_tests!();
283    #[cfg(not(target_family = "wasm"))]
284    event_cache_store_integration_tests_time!();
285}