matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::{HashMap, HashSet},
17 sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22 cross_process_lock::{
23 CrossProcessLockGeneration,
24 memory_store_helper::{Lease, try_take_leased_lock},
25 },
26 linked_chunk::{
27 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28 RawChunk, Update, relational::RelationalLinkedChunk,
29 },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37#[derive(Debug, Clone)]
47pub struct MemoryStore {
48 inner: Arc<StdRwLock<MemoryStoreInner>>,
49}
50
51#[derive(Debug)]
52struct MemoryStoreInner {
53 leases: HashMap<String, Lease>,
54 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
55}
56
57impl Default for MemoryStore {
58 fn default() -> Self {
59 Self {
60 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
61 leases: Default::default(),
62 events: RelationalLinkedChunk::new(),
63 })),
64 }
65 }
66}
67
68impl MemoryStore {
69 pub fn new() -> Self {
71 Self::default()
72 }
73}
74
75#[cfg_attr(target_family = "wasm", async_trait(?Send))]
76#[cfg_attr(not(target_family = "wasm"), async_trait)]
77impl EventCacheStore for MemoryStore {
78 type Error = EventCacheStoreError;
79
80 async fn close(&self) -> Result<(), Self::Error> {
81 Ok(())
82 }
83
84 async fn reopen(&self) -> Result<(), Self::Error> {
85 Ok(())
86 }
87
88 async fn try_take_leased_lock(
89 &self,
90 lease_duration_ms: u32,
91 key: &str,
92 holder: &str,
93 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
94 let mut inner = self.inner.write().unwrap();
95
96 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
97 }
98
99 async fn handle_linked_chunk_updates(
100 &self,
101 linked_chunk_id: LinkedChunkId<'_>,
102 updates: Vec<Update<Event, Gap>>,
103 ) -> Result<(), Self::Error> {
104 let mut inner = self.inner.write().unwrap();
105 inner
106 .events
107 .apply_updates(linked_chunk_id, updates)
108 .map_err(|e| Self::Error::Backend(Arc::new(e)))?;
109
110 Ok(())
111 }
112
113 async fn load_all_chunks(
114 &self,
115 linked_chunk_id: LinkedChunkId<'_>,
116 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
117 let inner = self.inner.read().unwrap();
118 inner
119 .events
120 .load_all_chunks(linked_chunk_id)
121 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
122 }
123
124 async fn load_all_chunks_metadata(
125 &self,
126 linked_chunk_id: LinkedChunkId<'_>,
127 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
128 let inner = self.inner.read().unwrap();
129 inner
130 .events
131 .load_all_chunks_metadata(linked_chunk_id)
132 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
133 }
134
135 async fn load_last_chunk(
136 &self,
137 linked_chunk_id: LinkedChunkId<'_>,
138 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
139 let inner = self.inner.read().unwrap();
140 inner
141 .events
142 .load_last_chunk(linked_chunk_id)
143 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
144 }
145
146 async fn load_previous_chunk(
147 &self,
148 linked_chunk_id: LinkedChunkId<'_>,
149 before_chunk_identifier: ChunkIdentifier,
150 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
151 let inner = self.inner.read().unwrap();
152 inner
153 .events
154 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
155 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
156 }
157
158 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
159 self.inner.write().unwrap().events.clear();
160 Ok(())
161 }
162
163 async fn filter_duplicated_events(
164 &self,
165 linked_chunk_id: LinkedChunkId<'_>,
166 mut events: Vec<OwnedEventId>,
167 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
168 if events.is_empty() {
169 return Ok(Vec::new());
170 }
171
172 let inner = self.inner.read().unwrap();
173
174 let mut duplicated_events = Vec::new();
175
176 for (event, position) in
177 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
178 {
179 if let Some(known_event_id) = event.event_id() {
180 if let Some(index) =
182 events.iter().position(|new_event_id| &known_event_id == new_event_id)
183 {
184 duplicated_events.push((events.remove(index), position));
185 }
186 }
187 }
188
189 Ok(duplicated_events)
190 }
191
192 async fn find_event(
193 &self,
194 room_id: &RoomId,
195 event_id: &EventId,
196 ) -> Result<Option<Event>, Self::Error> {
197 let inner = self.inner.read().unwrap();
198
199 let event = inner.events.items(room_id).find_map(|(_, (event, _pos))| {
200 (event.event_id()? == event_id).then_some(event.clone())
201 });
202
203 Ok(event)
204 }
205
206 async fn find_event_relations(
207 &self,
208 room_id: &RoomId,
209 event_id: &EventId,
210 filters: Option<&[RelationType]>,
211 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
212 let inner = self.inner.read().unwrap();
213
214 let related_events: Vec<_> = inner
215 .events
216 .items(room_id)
217 .filter_map(|(linked_chunk_id, (event, pos))| {
218 let (related_to, rel_type) = extract_event_relation(event.raw())?;
220 let rel_type = RelationType::from(rel_type.as_str());
221
222 if related_to != event_id {
224 return None;
225 }
226
227 if let Some(filters) = &filters {
229 filters.contains(&rel_type).then_some((linked_chunk_id, (event.clone(), pos)))
230 } else {
231 Some((linked_chunk_id, (event.clone(), pos)))
232 }
233 })
234 .collect();
235
236 let mut deduplicated = HashMap::new();
240 for (linked_chunk_id, (event, position)) in related_events {
241 let event_id = event
242 .event_id()
243 .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })?;
244 match linked_chunk_id.as_ref() {
245 LinkedChunkId::Room(_) => {
246 deduplicated.insert(event_id, (event, position));
248 }
249 _ => {
250 deduplicated.entry(event_id).or_insert_with(|| (event, None));
253 }
254 }
255 }
256
257 Ok(deduplicated.into_values().collect())
258 }
259
260 async fn get_room_events(
261 &self,
262 room_id: &RoomId,
263 event_type: Option<&str>,
264 session_id: Option<&str>,
265 ) -> Result<Vec<Event>, Self::Error> {
266 let inner = self.inner.read().unwrap();
267
268 let (_, event): (_, Vec<_>) = inner
269 .events
270 .items(room_id)
271 .map(|(_, (event, _pos))| event.clone())
272 .filter(|e| {
273 event_type
274 .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
275 })
276 .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
277 .map(|e| {
278 e.event_id()
279 .map(|id| (id, e))
280 .ok_or(Self::Error::InvalidData { details: String::from("missing event id") })
281 })
282 .collect::<Result<Vec<_>>>()?
283 .into_iter()
284 .fold((HashSet::new(), Vec::new()), |(mut ids, mut es), (id, e)| {
285 if !ids.contains(&id) {
286 ids.insert(id);
287 es.push(e);
288 }
289 (ids, es)
290 });
291 Ok(event)
292 }
293
294 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
295 if event.event_id().is_none() {
296 error!(%room_id, "Trying to save an event with no ID");
297 return Ok(());
298 }
299 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
300 Ok(())
301 }
302
303 async fn optimize(&self) -> Result<(), Self::Error> {
304 Ok(())
305 }
306
307 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
308 Ok(None)
309 }
310}
311
312#[cfg(test)]
313#[allow(unused_imports)] mod tests {
315 use super::{MemoryStore, Result};
316 use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
317
318 async fn get_event_cache_store() -> Result<MemoryStore> {
319 Ok(MemoryStore::new())
320 }
321
322 event_cache_store_integration_tests!();
323 #[cfg(not(target_family = "wasm"))]
324 event_cache_store_integration_tests_time!();
325}