matrix_sdk_base/event_cache/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22    cross_process_lock::memory_store_helper::try_take_leased_lock,
23    linked_chunk::{
24        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
25        RawChunk, Update, relational::RelationalLinkedChunk,
26    },
27};
28use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant};
29use tracing::error;
30
31use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
32use crate::event_cache::{Event, Gap};
33
34/// In-memory, non-persistent implementation of the `EventCacheStore`.
35///
36/// Default if no other is configured at startup.
37#[derive(Debug, Clone)]
38pub struct MemoryStore {
39    inner: Arc<StdRwLock<MemoryStoreInner>>,
40}
41
42#[derive(Debug)]
43struct MemoryStoreInner {
44    leases: HashMap<String, (String, Instant)>,
45    events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
46}
47
48impl Default for MemoryStore {
49    fn default() -> Self {
50        Self {
51            inner: Arc::new(StdRwLock::new(MemoryStoreInner {
52                leases: Default::default(),
53                events: RelationalLinkedChunk::new(),
54            })),
55        }
56    }
57}
58
59impl MemoryStore {
60    /// Create a new empty MemoryStore
61    pub fn new() -> Self {
62        Self::default()
63    }
64}
65
66#[cfg_attr(target_family = "wasm", async_trait(?Send))]
67#[cfg_attr(not(target_family = "wasm"), async_trait)]
68impl EventCacheStore for MemoryStore {
69    type Error = EventCacheStoreError;
70
71    async fn try_take_leased_lock(
72        &self,
73        lease_duration_ms: u32,
74        key: &str,
75        holder: &str,
76    ) -> Result<bool, Self::Error> {
77        let mut inner = self.inner.write().unwrap();
78
79        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
80    }
81
82    async fn handle_linked_chunk_updates(
83        &self,
84        linked_chunk_id: LinkedChunkId<'_>,
85        updates: Vec<Update<Event, Gap>>,
86    ) -> Result<(), Self::Error> {
87        let mut inner = self.inner.write().unwrap();
88        inner.events.apply_updates(linked_chunk_id, updates);
89
90        Ok(())
91    }
92
93    async fn load_all_chunks(
94        &self,
95        linked_chunk_id: LinkedChunkId<'_>,
96    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
97        let inner = self.inner.read().unwrap();
98        inner
99            .events
100            .load_all_chunks(linked_chunk_id)
101            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
102    }
103
104    async fn load_all_chunks_metadata(
105        &self,
106        linked_chunk_id: LinkedChunkId<'_>,
107    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
108        let inner = self.inner.read().unwrap();
109        inner
110            .events
111            .load_all_chunks_metadata(linked_chunk_id)
112            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
113    }
114
115    async fn load_last_chunk(
116        &self,
117        linked_chunk_id: LinkedChunkId<'_>,
118    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
119        let inner = self.inner.read().unwrap();
120        inner
121            .events
122            .load_last_chunk(linked_chunk_id)
123            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
124    }
125
126    async fn load_previous_chunk(
127        &self,
128        linked_chunk_id: LinkedChunkId<'_>,
129        before_chunk_identifier: ChunkIdentifier,
130    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
131        let inner = self.inner.read().unwrap();
132        inner
133            .events
134            .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
135            .map_err(|err| EventCacheStoreError::InvalidData { details: err })
136    }
137
138    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
139        self.inner.write().unwrap().events.clear();
140        Ok(())
141    }
142
143    async fn filter_duplicated_events(
144        &self,
145        linked_chunk_id: LinkedChunkId<'_>,
146        mut events: Vec<OwnedEventId>,
147    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
148        if events.is_empty() {
149            return Ok(Vec::new());
150        }
151
152        let inner = self.inner.read().unwrap();
153
154        let mut duplicated_events = Vec::new();
155
156        for (event, position) in
157            inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
158        {
159            if let Some(known_event_id) = event.event_id() {
160                // This event is a duplicate!
161                if let Some(index) =
162                    events.iter().position(|new_event_id| &known_event_id == new_event_id)
163                {
164                    duplicated_events.push((events.remove(index), position));
165                }
166            }
167        }
168
169        Ok(duplicated_events)
170    }
171
172    async fn find_event(
173        &self,
174        room_id: &RoomId,
175        event_id: &EventId,
176    ) -> Result<Option<Event>, Self::Error> {
177        let inner = self.inner.read().unwrap();
178
179        let event = inner
180            .events
181            .items(room_id)
182            .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
183
184        Ok(event)
185    }
186
187    async fn find_event_relations(
188        &self,
189        room_id: &RoomId,
190        event_id: &EventId,
191        filters: Option<&[RelationType]>,
192    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
193        let inner = self.inner.read().unwrap();
194
195        let related_events = inner
196            .events
197            .items(room_id)
198            .filter_map(|(event, pos)| {
199                // Must have a relation.
200                let (related_to, rel_type) = extract_event_relation(event.raw())?;
201                let rel_type = RelationType::from(rel_type.as_str());
202
203                // Must relate to the target item.
204                if related_to != event_id {
205                    return None;
206                }
207
208                // Must not be filtered out.
209                if let Some(filters) = &filters {
210                    filters.contains(&rel_type).then_some((event.clone(), pos))
211                } else {
212                    Some((event.clone(), pos))
213                }
214            })
215            .collect();
216
217        Ok(related_events)
218    }
219
220    async fn get_room_events(&self, room_id: &RoomId) -> Result<Vec<Event>, Self::Error> {
221        let inner = self.inner.read().unwrap();
222
223        let event: Vec<_> =
224            inner.events.items(room_id).map(|(event, _pos)| event.clone()).collect();
225
226        Ok(event)
227    }
228
229    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
230        if event.event_id().is_none() {
231            error!(%room_id, "Trying to save an event with no ID");
232            return Ok(());
233        }
234        self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
235        Ok(())
236    }
237}
238
239#[cfg(test)]
240#[allow(unused_imports)] // There seems to be a false positive when importing the test macros.
241mod tests {
242    use super::{MemoryStore, Result};
243    use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
244
245    async fn get_event_cache_store() -> Result<MemoryStore> {
246        Ok(MemoryStore::new())
247    }
248
249    event_cache_store_integration_tests!();
250    event_cache_store_integration_tests_time!();
251}