Skip to main content

matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashMap, HashSet},
17    sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22    cross_process_lock::{
23        CrossProcessLockGeneration,
24        memory_store_helper::{Lease, try_take_leased_lock},
25    },
26    linked_chunk::{
27        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28        RawChunk, Update, relational::RelationalLinkedChunk,
29    },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37/// In-memory, non-persistent implementation of the `EventCacheStore`.
38///
39/// Default if no other is configured at startup.
40///
41/// Note that this store is not transactional. This is particularly
42/// relevant when calling [`EventCacheStore::handle_linked_chunk_updates`],
43/// which consumes a list of [`Update`]s. When processing this list, if
44/// one of the [`Update`]s fails, the previous updates in the list
45/// will not be reversed.
46#[derive(Debug, Clone)]
47pub struct MemoryStore {
48    inner: Arc<StdRwLock<MemoryStoreInner>>,
49}
50
51#[derive(Debug)]
52struct MemoryStoreInner {
53    leases: HashMap<String, Lease>,
54    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
55}
56
57impl Default for MemoryStore {
58    fn default() -> Self {
59        Self {
60            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
61                leases: Default::default(),
62                events: RelationalLinkedChunk::new(),
63            })),
64        }
65    }
66}
67
68impl MemoryStore {
69    /// Create a new empty MemoryStore
70    pub fn new() -> Self {
71        Self::default()
72    }
73}
74
75#[cfg_attr(target_family = "wasm", async_trait(?Send))]
76#[cfg_attr(not(target_family = "wasm"), async_trait)]
77impl EventCacheStore for MemoryStore {
78    type Error = EventCacheStoreError;
79
80    async fn close(&self) -> Result<(), Self::Error> {
81        Ok(())
82    }
83
84    async fn reopen(&self) -> Result<(), Self::Error> {
85        Ok(())
86    }
87
88    async fn try_take_leased_lock(
89        &self,
90        lease_duration_ms: u32,
91        key: &str,
92        holder: &str,
93    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
94        let mut inner = self.inner.write().unwrap();
95
96        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
97    }
98
99    async fn handle_linked_chunk_updates(
100        &self,
101        linked_chunk_id: LinkedChunkId<'_>,
102        updates: Vec<Update<Event, Gap>>,
103    ) -> Result<(), Self::Error> {
104        let mut inner = self.inner.write().unwrap();
105        inner
106            .events
107            .apply_updates(linked_chunk_id, updates)
108            .map_err(|e| Self::Error::Backend(Arc::new(e)))?;
109
110        Ok(())
111    }
112
113    async fn load_all_chunks(
114        &self,
115        linked_chunk_id: LinkedChunkId<'_>,
116    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
117        let inner = self.inner.read().unwrap();
118        inner
119            .events
120            .load_all_chunks(linked_chunk_id)
121            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
122    }
123
124    async fn load_all_chunks_metadata(
125        &self,
126        linked_chunk_id: LinkedChunkId<'_>,
127    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
128        let inner = self.inner.read().unwrap();
129        inner
130            .events
131            .load_all_chunks_metadata(linked_chunk_id)
132            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
133    }
134
135    async fn load_last_chunk(
136        &self,
137        linked_chunk_id: LinkedChunkId<'_>,
138    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
139        let inner = self.inner.read().unwrap();
140        inner
141            .events
142            .load_last_chunk(linked_chunk_id)
143            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
144    }
145
146    async fn load_previous_chunk(
147        &self,
148        linked_chunk_id: LinkedChunkId<'_>,
149        before_chunk_identifier: ChunkIdentifier,
150    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
151        let inner = self.inner.read().unwrap();
152        inner
153            .events
154            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
155            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
156    }
157
158    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
159        self.inner.write().unwrap().events.clear();
160        Ok(())
161    }
162
163    async fn filter_duplicated_events(
164        &self,
165        linked_chunk_id: LinkedChunkId<'_>,
166        mut events: Vec<OwnedEventId>,
167    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
168        if events.is_empty() {
169            return Ok(Vec::new());
170        }
171
172        let inner = self.inner.read().unwrap();
173
174        let mut duplicated_events = Vec::new();
175
176        for (event, position) in
177            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
178        {
179            if let Some(known_event_id) = event.event_id() {
180                // This event is a duplicate!
181                if let Some(index) =
182                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
183                {
184                    duplicated_events.push((events.remove(index), position));
185                }
186            }
187        }
188
189        Ok(duplicated_events)
190    }
191
192    async fn find_event(
193        &self,
194        room_id: &RoomId,
195        event_id: &EventId,
196    ) -> Result<Option<Event>, Self::Error> {
197        let inner = self.inner.read().unwrap();
198
199        let event = inner.events.items(room_id).find_map(|(_, (event, _pos))| {
200            (event.event_id()? == event_id).then_some(event.clone())
201        });
202
203        Ok(event)
204    }
205
206    async fn find_event_relations(
207        &self,
208        room_id: &RoomId,
209        event_id: &EventId,
210        filters: Option<&[RelationType]>,
211    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
212        let inner = self.inner.read().unwrap();
213
214        let related_events: Vec<_> = inner
215            .events
216            .items(room_id)
217            .filter_map(|(linked_chunk_id, (event, pos))| {
218                // Must have a relation.
219                let (related_to, rel_type) = extract_event_relation(event.raw())?;
220                let rel_type = RelationType::from(rel_type.as_str());
221
222                // Must relate to the target item.
223                if related_to != event_id {
224                    return None;
225                }
226
227                // Must not be filtered out.
228                if let Some(filters) = &filters {
229                    filters.contains(&rel_type).then_some((linked_chunk_id, (event.clone(), pos)))
230                } else {
231                    Some((linked_chunk_id, (event.clone(), pos)))
232                }
233            })
234            .collect();
235
236        // Remove any duplicate events which may exist in both a room and thread
237        // linked chunk. Additionally, remove any position information from non-room
238        // linked chunks.
239        let mut deduplicated = HashMap::new();
240        for (linked_chunk_id, (event, position)) in related_events {
241            let event_id = event
242                .event_id()
243                .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })?;
244            match linked_chunk_id.as_ref() {
245                LinkedChunkId::Room(_) => {
246                    // Prioritize events that come from a room linked chunk
247                    deduplicated.insert(event_id, (event, position));
248                }
249                _ => {
250                    // Remove position information from events that come
251                    // from any other type of linked chunk
252                    deduplicated.entry(event_id).or_insert_with(|| (event, None));
253                }
254            }
255        }
256
257        Ok(deduplicated.into_values().collect())
258    }
259
260    async fn get_room_events(
261        &self,
262        room_id: &RoomId,
263        event_type: Option<&str>,
264        session_id: Option<&str>,
265    ) -> Result<Vec<Event>, Self::Error> {
266        let inner = self.inner.read().unwrap();
267
268        let (_, event): (_, Vec<_>) = inner
269            .events
270            .items(room_id)
271            .map(|(_, (event, _pos))| event.clone())
272            .filter(|e| {
273                event_type
274                    .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
275            })
276            .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
277            .map(|e| {
278                e.event_id()
279                    .map(|id| (id, e))
280                    .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })
281            })
282            .collect::<Result<Vec<_>>>()?
283            .into_iter()
284            .fold((HashSet::new(), Vec::new()), |(mut ids, mut es), (id, e)| {
285                if !ids.contains(&id) {
286                    ids.insert(id);
287                    es.push(e);
288                }
289                (ids, es)
290            });
291        Ok(event)
292    }
293
294    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
295        if event.event_id().is_none() {
296            error!(%room_id, "Trying to save an event with no ID");
297            return Ok(());
298        }
299        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
300        Ok(())
301    }
302
303    async fn optimize(&self) -> Result<(), Self::Error> {
304        Ok(())
305    }
306
307    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
308        Ok(None)
309    }
310}
311
312#[cfg(test)]
313#[allow(unused_imports)] // There seems to be a false positive when importing the test macros.
314mod tests {
315    use super::{MemoryStore, Result};
316    use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
317
318    async fn get_event_cache_store() -> Result<MemoryStore> {
319        Ok(MemoryStore::new())
320    }
321
322    event_cache_store_integration_tests!();
323    #[cfg(not(target_family = "wasm"))]
324    event_cache_store_integration_tests_time!();
325}