matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22    cross_process_lock::{
23        CrossProcessLockGeneration,
24        memory_store_helper::{Lease, try_take_leased_lock},
25    },
26    linked_chunk::{
27        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28        RawChunk, Update, relational::RelationalLinkedChunk,
29    },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37/// In-memory, non-persistent implementation of the `EventCacheStore`.
38///
39/// Default if no other is configured at startup.
40#[derive(Debug, Clone)]
41pub struct MemoryStore {
42    inner: Arc<StdRwLock<MemoryStoreInner>>,
43}
44
45#[derive(Debug)]
46struct MemoryStoreInner {
47    leases: HashMap<String, Lease>,
48    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
49}
50
51impl Default for MemoryStore {
52    fn default() -> Self {
53        Self {
54            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
55                leases: Default::default(),
56                events: RelationalLinkedChunk::new(),
57            })),
58        }
59    }
60}
61
62impl MemoryStore {
63    /// Create a new empty MemoryStore
64    pub fn new() -> Self {
65        Self::default()
66    }
67}
68
69#[cfg_attr(target_family = "wasm", async_trait(?Send))]
70#[cfg_attr(not(target_family = "wasm"), async_trait)]
71impl EventCacheStore for MemoryStore {
72    type Error = EventCacheStoreError;
73
74    async fn try_take_leased_lock(
75        &self,
76        lease_duration_ms: u32,
77        key: &str,
78        holder: &str,
79    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
80        let mut inner = self.inner.write().unwrap();
81
82        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
83    }
84
85    async fn handle_linked_chunk_updates(
86        &self,
87        linked_chunk_id: LinkedChunkId<'_>,
88        updates: Vec<Update<Event, Gap>>,
89    ) -> Result<(), Self::Error> {
90        let mut inner = self.inner.write().unwrap();
91        inner.events.apply_updates(linked_chunk_id, updates);
92
93        Ok(())
94    }
95
96    async fn load_all_chunks(
97        &self,
98        linked_chunk_id: LinkedChunkId<'_>,
99    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
100        let inner = self.inner.read().unwrap();
101        inner
102            .events
103            .load_all_chunks(linked_chunk_id)
104            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
105    }
106
107    async fn load_all_chunks_metadata(
108        &self,
109        linked_chunk_id: LinkedChunkId<'_>,
110    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
111        let inner = self.inner.read().unwrap();
112        inner
113            .events
114            .load_all_chunks_metadata(linked_chunk_id)
115            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
116    }
117
118    async fn load_last_chunk(
119        &self,
120        linked_chunk_id: LinkedChunkId<'_>,
121    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
122        let inner = self.inner.read().unwrap();
123        inner
124            .events
125            .load_last_chunk(linked_chunk_id)
126            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
127    }
128
129    async fn load_previous_chunk(
130        &self,
131        linked_chunk_id: LinkedChunkId<'_>,
132        before_chunk_identifier: ChunkIdentifier,
133    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
134        let inner = self.inner.read().unwrap();
135        inner
136            .events
137            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
138            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
139    }
140
141    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
142        self.inner.write().unwrap().events.clear();
143        Ok(())
144    }
145
146    async fn filter_duplicated_events(
147        &self,
148        linked_chunk_id: LinkedChunkId<'_>,
149        mut events: Vec<OwnedEventId>,
150    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
151        if events.is_empty() {
152            return Ok(Vec::new());
153        }
154
155        let inner = self.inner.read().unwrap();
156
157        let mut duplicated_events = Vec::new();
158
159        for (event, position) in
160            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
161        {
162            if let Some(known_event_id) = event.event_id() {
163                // This event is a duplicate!
164                if let Some(index) =
165                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
166                {
167                    duplicated_events.push((events.remove(index), position));
168                }
169            }
170        }
171
172        Ok(duplicated_events)
173    }
174
175    async fn find_event(
176        &self,
177        room_id: &RoomId,
178        event_id: &EventId,
179    ) -> Result<Option<Event>, Self::Error> {
180        let inner = self.inner.read().unwrap();
181
182        let event = inner
183            .events
184            .items(room_id)
185            .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
186
187        Ok(event)
188    }
189
190    async fn find_event_relations(
191        &self,
192        room_id: &RoomId,
193        event_id: &EventId,
194        filters: Option<&[RelationType]>,
195    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
196        let inner = self.inner.read().unwrap();
197
198        let related_events = inner
199            .events
200            .items(room_id)
201            .filter_map(|(event, pos)| {
202                // Must have a relation.
203                let (related_to, rel_type) = extract_event_relation(event.raw())?;
204                let rel_type = RelationType::from(rel_type.as_str());
205
206                // Must relate to the target item.
207                if related_to != event_id {
208                    return None;
209                }
210
211                // Must not be filtered out.
212                if let Some(filters) = &filters {
213                    filters.contains(&rel_type).then_some((event.clone(), pos))
214                } else {
215                    Some((event.clone(), pos))
216                }
217            })
218            .collect();
219
220        Ok(related_events)
221    }
222
223    async fn get_room_events(
224        &self,
225        room_id: &RoomId,
226        event_type: Option<&str>,
227        session_id: Option<&str>,
228    ) -> Result<Vec<Event>, Self::Error> {
229        let inner = self.inner.read().unwrap();
230
231        let event: Vec<_> = inner
232            .events
233            .items(room_id)
234            .map(|(event, _pos)| event.clone())
235            .filter(|e| {
236                event_type
237                    .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
238            })
239            .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
240            .collect();
241
242        Ok(event)
243    }
244
245    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
246        if event.event_id().is_none() {
247            error!(%room_id, "Trying to save an event with no ID");
248            return Ok(());
249        }
250        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
251        Ok(())
252    }
253}
254
255#[cfg(test)]
256#[allow(unused_imports)] // There seems to be a false positive when importing the test macros.
257mod tests {
258    use super::{MemoryStore, Result};
259    use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
260
261    async fn get_event_cache_store() -> Result<MemoryStore> {
262        Ok(MemoryStore::new())
263    }
264
265    event_cache_store_integration_tests!();
266    event_cache_store_integration_tests_time!();
267}