matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::{HashMap, HashSet},
17 sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22 cross_process_lock::{
23 CrossProcessLockGeneration,
24 memory_store_helper::{Lease, try_take_leased_lock},
25 },
26 linked_chunk::{
27 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28 RawChunk, Update, relational::RelationalLinkedChunk,
29 },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37#[derive(Debug, Clone)]
47pub struct MemoryStore {
48 inner: Arc<StdRwLock<MemoryStoreInner>>,
49}
50
51#[derive(Debug)]
52struct MemoryStoreInner {
53 leases: HashMap<String, Lease>,
54 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
55}
56
57impl Default for MemoryStore {
58 fn default() -> Self {
59 Self {
60 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
61 leases: Default::default(),
62 events: RelationalLinkedChunk::new(),
63 })),
64 }
65 }
66}
67
68impl MemoryStore {
69 pub fn new() -> Self {
71 Self::default()
72 }
73}
74
75#[cfg_attr(target_family = "wasm", async_trait(?Send))]
76#[cfg_attr(not(target_family = "wasm"), async_trait)]
77impl EventCacheStore for MemoryStore {
78 type Error = EventCacheStoreError;
79
80 async fn try_take_leased_lock(
81 &self,
82 lease_duration_ms: u32,
83 key: &str,
84 holder: &str,
85 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
86 let mut inner = self.inner.write().unwrap();
87
88 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
89 }
90
91 async fn handle_linked_chunk_updates(
92 &self,
93 linked_chunk_id: LinkedChunkId<'_>,
94 updates: Vec<Update<Event, Gap>>,
95 ) -> Result<(), Self::Error> {
96 let mut inner = self.inner.write().unwrap();
97 inner
98 .events
99 .apply_updates(linked_chunk_id, updates)
100 .map_err(|e| Self::Error::Backend(Box::new(e)))?;
101
102 Ok(())
103 }
104
105 async fn load_all_chunks(
106 &self,
107 linked_chunk_id: LinkedChunkId<'_>,
108 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
109 let inner = self.inner.read().unwrap();
110 inner
111 .events
112 .load_all_chunks(linked_chunk_id)
113 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
114 }
115
116 async fn load_all_chunks_metadata(
117 &self,
118 linked_chunk_id: LinkedChunkId<'_>,
119 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
120 let inner = self.inner.read().unwrap();
121 inner
122 .events
123 .load_all_chunks_metadata(linked_chunk_id)
124 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
125 }
126
127 async fn load_last_chunk(
128 &self,
129 linked_chunk_id: LinkedChunkId<'_>,
130 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
131 let inner = self.inner.read().unwrap();
132 inner
133 .events
134 .load_last_chunk(linked_chunk_id)
135 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
136 }
137
138 async fn load_previous_chunk(
139 &self,
140 linked_chunk_id: LinkedChunkId<'_>,
141 before_chunk_identifier: ChunkIdentifier,
142 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
143 let inner = self.inner.read().unwrap();
144 inner
145 .events
146 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
147 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
148 }
149
150 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
151 self.inner.write().unwrap().events.clear();
152 Ok(())
153 }
154
155 async fn filter_duplicated_events(
156 &self,
157 linked_chunk_id: LinkedChunkId<'_>,
158 mut events: Vec<OwnedEventId>,
159 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
160 if events.is_empty() {
161 return Ok(Vec::new());
162 }
163
164 let inner = self.inner.read().unwrap();
165
166 let mut duplicated_events = Vec::new();
167
168 for (event, position) in
169 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
170 {
171 if let Some(known_event_id) = event.event_id() {
172 if let Some(index) =
174 events.iter().position(|new_event_id| &known_event_id == new_event_id)
175 {
176 duplicated_events.push((events.remove(index), position));
177 }
178 }
179 }
180
181 Ok(duplicated_events)
182 }
183
184 async fn find_event(
185 &self,
186 room_id: &RoomId,
187 event_id: &EventId,
188 ) -> Result<Option<Event>, Self::Error> {
189 let inner = self.inner.read().unwrap();
190
191 let event = inner.events.items(room_id).find_map(|(_, (event, _pos))| {
192 (event.event_id()? == event_id).then_some(event.clone())
193 });
194
195 Ok(event)
196 }
197
198 async fn find_event_relations(
199 &self,
200 room_id: &RoomId,
201 event_id: &EventId,
202 filters: Option<&[RelationType]>,
203 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
204 let inner = self.inner.read().unwrap();
205
206 let related_events: Vec<_> = inner
207 .events
208 .items(room_id)
209 .filter_map(|(linked_chunk_id, (event, pos))| {
210 let (related_to, rel_type) = extract_event_relation(event.raw())?;
212 let rel_type = RelationType::from(rel_type.as_str());
213
214 if related_to != event_id {
216 return None;
217 }
218
219 if let Some(filters) = &filters {
221 filters.contains(&rel_type).then_some((linked_chunk_id, (event.clone(), pos)))
222 } else {
223 Some((linked_chunk_id, (event.clone(), pos)))
224 }
225 })
226 .collect();
227
228 let mut deduplicated = HashMap::new();
232 for (linked_chunk_id, (event, position)) in related_events {
233 let event_id = event
234 .event_id()
235 .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })?;
236 match linked_chunk_id.as_ref() {
237 LinkedChunkId::Room(_) => {
238 deduplicated.insert(event_id, (event, position));
240 }
241 _ => {
242 deduplicated.entry(event_id).or_insert_with(|| (event, None));
245 }
246 }
247 }
248
249 Ok(deduplicated.into_values().collect())
250 }
251
252 async fn get_room_events(
253 &self,
254 room_id: &RoomId,
255 event_type: Option<&str>,
256 session_id: Option<&str>,
257 ) -> Result<Vec<Event>, Self::Error> {
258 let inner = self.inner.read().unwrap();
259
260 let (_, event): (_, Vec<_>) = inner
261 .events
262 .items(room_id)
263 .map(|(_, (event, _pos))| event.clone())
264 .filter(|e| {
265 event_type
266 .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
267 })
268 .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
269 .map(|e| {
270 e.event_id()
271 .map(|id| (id, e))
272 .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })
273 })
274 .collect::<Result<Vec<_>>>()?
275 .into_iter()
276 .fold((HashSet::new(), Vec::new()), |(mut ids, mut es), (id, e)| {
277 if !ids.contains(&id) {
278 ids.insert(id);
279 es.push(e);
280 }
281 (ids, es)
282 });
283 Ok(event)
284 }
285
286 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
287 if event.event_id().is_none() {
288 error!(%room_id, "Trying to save an event with no ID");
289 return Ok(());
290 }
291 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
292 Ok(())
293 }
294
295 async fn optimize(&self) -> Result<(), Self::Error> {
296 Ok(())
297 }
298
299 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
300 Ok(None)
301 }
302}
303
304#[cfg(test)]
305#[allow(unused_imports)] mod tests {
307 use super::{MemoryStore, Result};
308 use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
309
310 async fn get_event_cache_store() -> Result<MemoryStore> {
311 Ok(MemoryStore::new())
312 }
313
314 event_cache_store_integration_tests!();
315 #[cfg(not(target_family = "wasm"))]
316 event_cache_store_integration_tests_time!();
317}