Skip to main content

matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashMap, HashSet},
17    sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22    cross_process_lock::{
23        CrossProcessLockGeneration,
24        memory_store_helper::{Lease, try_take_leased_lock},
25    },
26    linked_chunk::{
27        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28        RawChunk, Update, relational::RelationalLinkedChunk,
29    },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37/// In-memory, non-persistent implementation of the `EventCacheStore`.
38///
39/// Default if no other is configured at startup.
40///
41/// Note that this store is not transactional. This is particularly
42/// relevant when calling [`EventCacheStore::handle_linked_chunk_updates`],
43/// which consumes a list of [`Update`]s. When processing this list, if
44/// one of the [`Update`]s fails, the previous updates in the list
45/// will not be reversed.
46#[derive(Debug, Clone)]
47pub struct MemoryStore {
48    inner: Arc<StdRwLock<MemoryStoreInner>>,
49}
50
51#[derive(Debug)]
52struct MemoryStoreInner {
53    leases: HashMap<String, Lease>,
54    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
55}
56
57impl Default for MemoryStore {
58    fn default() -> Self {
59        Self {
60            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
61                leases: Default::default(),
62                events: RelationalLinkedChunk::new(),
63            })),
64        }
65    }
66}
67
68impl MemoryStore {
69    /// Create a new empty MemoryStore
70    pub fn new() -> Self {
71        Self::default()
72    }
73}
74
75#[cfg_attr(target_family = "wasm", async_trait(?Send))]
76#[cfg_attr(not(target_family = "wasm"), async_trait)]
77impl EventCacheStore for MemoryStore {
78    type Error = EventCacheStoreError;
79
80    async fn try_take_leased_lock(
81        &self,
82        lease_duration_ms: u32,
83        key: &str,
84        holder: &str,
85    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
86        let mut inner = self.inner.write().unwrap();
87
88        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
89    }
90
91    async fn handle_linked_chunk_updates(
92        &self,
93        linked_chunk_id: LinkedChunkId<'_>,
94        updates: Vec<Update<Event, Gap>>,
95    ) -> Result<(), Self::Error> {
96        let mut inner = self.inner.write().unwrap();
97        inner
98            .events
99            .apply_updates(linked_chunk_id, updates)
100            .map_err(|e| Self::Error::Backend(Box::new(e)))?;
101
102        Ok(())
103    }
104
105    async fn load_all_chunks(
106        &self,
107        linked_chunk_id: LinkedChunkId<'_>,
108    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
109        let inner = self.inner.read().unwrap();
110        inner
111            .events
112            .load_all_chunks(linked_chunk_id)
113            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
114    }
115
116    async fn load_all_chunks_metadata(
117        &self,
118        linked_chunk_id: LinkedChunkId<'_>,
119    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
120        let inner = self.inner.read().unwrap();
121        inner
122            .events
123            .load_all_chunks_metadata(linked_chunk_id)
124            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
125    }
126
127    async fn load_last_chunk(
128        &self,
129        linked_chunk_id: LinkedChunkId<'_>,
130    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
131        let inner = self.inner.read().unwrap();
132        inner
133            .events
134            .load_last_chunk(linked_chunk_id)
135            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
136    }
137
138    async fn load_previous_chunk(
139        &self,
140        linked_chunk_id: LinkedChunkId<'_>,
141        before_chunk_identifier: ChunkIdentifier,
142    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
143        let inner = self.inner.read().unwrap();
144        inner
145            .events
146            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
147            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
148    }
149
150    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
151        self.inner.write().unwrap().events.clear();
152        Ok(())
153    }
154
155    async fn filter_duplicated_events(
156        &self,
157        linked_chunk_id: LinkedChunkId<'_>,
158        mut events: Vec<OwnedEventId>,
159    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
160        if events.is_empty() {
161            return Ok(Vec::new());
162        }
163
164        let inner = self.inner.read().unwrap();
165
166        let mut duplicated_events = Vec::new();
167
168        for (event, position) in
169            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
170        {
171            if let Some(known_event_id) = event.event_id() {
172                // This event is a duplicate!
173                if let Some(index) =
174                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
175                {
176                    duplicated_events.push((events.remove(index), position));
177                }
178            }
179        }
180
181        Ok(duplicated_events)
182    }
183
184    async fn find_event(
185        &self,
186        room_id: &RoomId,
187        event_id: &EventId,
188    ) -> Result<Option<Event>, Self::Error> {
189        let inner = self.inner.read().unwrap();
190
191        let event = inner.events.items(room_id).find_map(|(_, (event, _pos))| {
192            (event.event_id()? == event_id).then_some(event.clone())
193        });
194
195        Ok(event)
196    }
197
198    async fn find_event_relations(
199        &self,
200        room_id: &RoomId,
201        event_id: &EventId,
202        filters: Option<&[RelationType]>,
203    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
204        let inner = self.inner.read().unwrap();
205
206        let related_events: Vec<_> = inner
207            .events
208            .items(room_id)
209            .filter_map(|(linked_chunk_id, (event, pos))| {
210                // Must have a relation.
211                let (related_to, rel_type) = extract_event_relation(event.raw())?;
212                let rel_type = RelationType::from(rel_type.as_str());
213
214                // Must relate to the target item.
215                if related_to != event_id {
216                    return None;
217                }
218
219                // Must not be filtered out.
220                if let Some(filters) = &filters {
221                    filters.contains(&rel_type).then_some((linked_chunk_id, (event.clone(), pos)))
222                } else {
223                    Some((linked_chunk_id, (event.clone(), pos)))
224                }
225            })
226            .collect();
227
228        // Remove any duplicate events which may exist in both a room and thread
229        // linked chunk. Additionally, remove any position information from non-room
230        // linked chunks.
231        let mut deduplicated = HashMap::new();
232        for (linked_chunk_id, (event, position)) in related_events {
233            let event_id = event
234                .event_id()
235                .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })?;
236            match linked_chunk_id.as_ref() {
237                LinkedChunkId::Room(_) => {
238                    // Prioritize events that come from a room linked chunk
239                    deduplicated.insert(event_id, (event, position));
240                }
241                _ => {
242                    // Remove position information from events that come
243                    // from any other type of linked chunk
244                    deduplicated.entry(event_id).or_insert_with(|| (event, None));
245                }
246            }
247        }
248
249        Ok(deduplicated.into_values().collect())
250    }
251
252    async fn get_room_events(
253        &self,
254        room_id: &RoomId,
255        event_type: Option<&str>,
256        session_id: Option<&str>,
257    ) -> Result<Vec<Event>, Self::Error> {
258        let inner = self.inner.read().unwrap();
259
260        let (_, event): (_, Vec<_>) = inner
261            .events
262            .items(room_id)
263            .map(|(_, (event, _pos))| event.clone())
264            .filter(|e| {
265                event_type
266                    .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
267            })
268            .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
269            .map(|e| {
270                e.event_id()
271                    .map(|id| (id, e))
272                    .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })
273            })
274            .collect::<Result<Vec<_>>>()?
275            .into_iter()
276            .fold((HashSet::new(), Vec::new()), |(mut ids, mut es), (id, e)| {
277                if !ids.contains(&id) {
278                    ids.insert(id);
279                    es.push(e);
280                }
281                (ids, es)
282            });
283        Ok(event)
284    }
285
286    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
287        if event.event_id().is_none() {
288            error!(%room_id, "Trying to save an event with no ID");
289            return Ok(());
290        }
291        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
292        Ok(())
293    }
294
295    async fn optimize(&self) -> Result<(), Self::Error> {
296        Ok(())
297    }
298
299    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
300        Ok(None)
301    }
302}
303
304#[cfg(test)]
305#[allow(unused_imports)] // There seems to be a false positive when importing the test macros.
306mod tests {
307    use super::{MemoryStore, Result};
308    use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
309
310    async fn get_event_cache_store() -> Result<MemoryStore> {
311        Ok(MemoryStore::new())
312    }
313
314    event_cache_store_integration_tests!();
315    #[cfg(not(target_family = "wasm"))]
316    event_cache_store_integration_tests_time!();
317}