matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22    cross_process_lock::memory_store_helper::try_take_leased_lock,
23    linked_chunk::{
24        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
25        RawChunk, Update, relational::RelationalLinkedChunk,
26    },
27};
28use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant};
29use tracing::error;
30
31use super::{
32    EventCacheStore, EventCacheStoreError, Result, compute_filters_string, extract_event_relation,
33};
34use crate::event_cache::{Event, Gap};
35
36/// In-memory, non-persistent implementation of the `EventCacheStore`.
37///
38/// Default if no other is configured at startup.
39#[derive(Debug, Clone)]
40pub struct MemoryStore {
41    inner: Arc<StdRwLock<MemoryStoreInner>>,
42}
43
44#[derive(Debug)]
45struct MemoryStoreInner {
46    leases: HashMap<String, (String, Instant)>,
47    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
48}
49
50impl Default for MemoryStore {
51    fn default() -> Self {
52        Self {
53            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
54                leases: Default::default(),
55                events: RelationalLinkedChunk::new(),
56            })),
57        }
58    }
59}
60
61impl MemoryStore {
62    /// Create a new empty MemoryStore
63    pub fn new() -> Self {
64        Self::default()
65    }
66}
67
68#[cfg_attr(target_family = "wasm", async_trait(?Send))]
69#[cfg_attr(not(target_family = "wasm"), async_trait)]
70impl EventCacheStore for MemoryStore {
71    type Error = EventCacheStoreError;
72
73    async fn try_take_leased_lock(
74        &self,
75        lease_duration_ms: u32,
76        key: &str,
77        holder: &str,
78    ) -> Result<bool, Self::Error> {
79        let mut inner = self.inner.write().unwrap();
80
81        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
82    }
83
84    async fn handle_linked_chunk_updates(
85        &self,
86        linked_chunk_id: LinkedChunkId<'_>,
87        updates: Vec<Update<Event, Gap>>,
88    ) -> Result<(), Self::Error> {
89        let mut inner = self.inner.write().unwrap();
90        inner.events.apply_updates(linked_chunk_id, updates);
91
92        Ok(())
93    }
94
95    async fn load_all_chunks(
96        &self,
97        linked_chunk_id: LinkedChunkId<'_>,
98    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
99        let inner = self.inner.read().unwrap();
100        inner
101            .events
102            .load_all_chunks(linked_chunk_id)
103            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
104    }
105
106    async fn load_all_chunks_metadata(
107        &self,
108        linked_chunk_id: LinkedChunkId<'_>,
109    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
110        let inner = self.inner.read().unwrap();
111        inner
112            .events
113            .load_all_chunks_metadata(linked_chunk_id)
114            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
115    }
116
117    async fn load_last_chunk(
118        &self,
119        linked_chunk_id: LinkedChunkId<'_>,
120    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
121        let inner = self.inner.read().unwrap();
122        inner
123            .events
124            .load_last_chunk(linked_chunk_id)
125            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
126    }
127
128    async fn load_previous_chunk(
129        &self,
130        linked_chunk_id: LinkedChunkId<'_>,
131        before_chunk_identifier: ChunkIdentifier,
132    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
133        let inner = self.inner.read().unwrap();
134        inner
135            .events
136            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
137            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
138    }
139
140    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
141        self.inner.write().unwrap().events.clear();
142        Ok(())
143    }
144
145    async fn filter_duplicated_events(
146        &self,
147        linked_chunk_id: LinkedChunkId<'_>,
148        mut events: Vec<OwnedEventId>,
149    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
150        if events.is_empty() {
151            return Ok(Vec::new());
152        }
153
154        let inner = self.inner.read().unwrap();
155
156        let mut duplicated_events = Vec::new();
157
158        for (event, position) in
159            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
160        {
161            if let Some(known_event_id) = event.event_id() {
162                // This event is a duplicate!
163                if let Some(index) =
164                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
165                {
166                    duplicated_events.push((events.remove(index), position));
167                }
168            }
169        }
170
171        Ok(duplicated_events)
172    }
173
174    async fn find_event(
175        &self,
176        room_id: &RoomId,
177        event_id: &EventId,
178    ) -> Result<Option<Event>, Self::Error> {
179        let inner = self.inner.read().unwrap();
180
181        let event = inner
182            .events
183            .items(room_id)
184            .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
185
186        Ok(event)
187    }
188
189    async fn find_event_relations(
190        &self,
191        room_id: &RoomId,
192        event_id: &EventId,
193        filters: Option<&[RelationType]>,
194    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
195        let inner = self.inner.read().unwrap();
196
197        let filters = compute_filters_string(filters);
198
199        let related_events = inner
200            .events
201            .items(room_id)
202            .filter_map(|(event, pos)| {
203                // Must have a relation.
204                let (related_to, rel_type) = extract_event_relation(event.raw())?;
205
206                // Must relate to the target item.
207                if related_to != event_id {
208                    return None;
209                }
210
211                // Must not be filtered out.
212                if let Some(filters) = &filters {
213                    filters.contains(&rel_type).then_some((event.clone(), pos))
214                } else {
215                    Some((event.clone(), pos))
216                }
217            })
218            .collect();
219
220        Ok(related_events)
221    }
222
223    async fn get_room_events(&self, room_id: &RoomId) -> Result<Vec<Event>, Self::Error> {
224        let inner = self.inner.read().unwrap();
225
226        let event: Vec<_> =
227            inner.events.items(room_id).map(|(event, _pos)| event.clone()).collect();
228
229        Ok(event)
230    }
231
232    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
233        if event.event_id().is_none() {
234            error!(%room_id, "Trying to save an event with no ID");
235            return Ok(());
236        }
237        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
238        Ok(())
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::{MemoryStore, Result};
245    use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
246
247    async fn get_event_cache_store() -> Result<MemoryStore> {
248        Ok(MemoryStore::new())
249    }
250
251    event_cache_store_integration_tests!();
252    event_cache_store_integration_tests_time!();
253}