matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22 cross_process_lock::{
23 CrossProcessLockGeneration,
24 memory_store_helper::{Lease, try_take_leased_lock},
25 },
26 linked_chunk::{
27 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28 RawChunk, Update, relational::RelationalLinkedChunk,
29 },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37#[derive(Debug, Clone)]
41pub struct MemoryStore {
42 inner: Arc<StdRwLock<MemoryStoreInner>>,
43}
44
45#[derive(Debug)]
46struct MemoryStoreInner {
47 leases: HashMap<String, Lease>,
48 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
49}
50
51impl Default for MemoryStore {
52 fn default() -> Self {
53 Self {
54 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
55 leases: Default::default(),
56 events: RelationalLinkedChunk::new(),
57 })),
58 }
59 }
60}
61
62impl MemoryStore {
63 pub fn new() -> Self {
65 Self::default()
66 }
67}
68
69#[cfg_attr(target_family = "wasm", async_trait(?Send))]
70#[cfg_attr(not(target_family = "wasm"), async_trait)]
71impl EventCacheStore for MemoryStore {
72 type Error = EventCacheStoreError;
73
74 async fn try_take_leased_lock(
75 &self,
76 lease_duration_ms: u32,
77 key: &str,
78 holder: &str,
79 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
80 let mut inner = self.inner.write().unwrap();
81
82 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
83 }
84
85 async fn handle_linked_chunk_updates(
86 &self,
87 linked_chunk_id: LinkedChunkId<'_>,
88 updates: Vec<Update<Event, Gap>>,
89 ) -> Result<(), Self::Error> {
90 let mut inner = self.inner.write().unwrap();
91 inner.events.apply_updates(linked_chunk_id, updates);
92
93 Ok(())
94 }
95
96 async fn load_all_chunks(
97 &self,
98 linked_chunk_id: LinkedChunkId<'_>,
99 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
100 let inner = self.inner.read().unwrap();
101 inner
102 .events
103 .load_all_chunks(linked_chunk_id)
104 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
105 }
106
107 async fn load_all_chunks_metadata(
108 &self,
109 linked_chunk_id: LinkedChunkId<'_>,
110 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
111 let inner = self.inner.read().unwrap();
112 inner
113 .events
114 .load_all_chunks_metadata(linked_chunk_id)
115 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
116 }
117
118 async fn load_last_chunk(
119 &self,
120 linked_chunk_id: LinkedChunkId<'_>,
121 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
122 let inner = self.inner.read().unwrap();
123 inner
124 .events
125 .load_last_chunk(linked_chunk_id)
126 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
127 }
128
129 async fn load_previous_chunk(
130 &self,
131 linked_chunk_id: LinkedChunkId<'_>,
132 before_chunk_identifier: ChunkIdentifier,
133 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
134 let inner = self.inner.read().unwrap();
135 inner
136 .events
137 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
138 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
139 }
140
141 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
142 self.inner.write().unwrap().events.clear();
143 Ok(())
144 }
145
146 async fn filter_duplicated_events(
147 &self,
148 linked_chunk_id: LinkedChunkId<'_>,
149 mut events: Vec<OwnedEventId>,
150 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
151 if events.is_empty() {
152 return Ok(Vec::new());
153 }
154
155 let inner = self.inner.read().unwrap();
156
157 let mut duplicated_events = Vec::new();
158
159 for (event, position) in
160 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
161 {
162 if let Some(known_event_id) = event.event_id() {
163 if let Some(index) =
165 events.iter().position(|new_event_id| &known_event_id == new_event_id)
166 {
167 duplicated_events.push((events.remove(index), position));
168 }
169 }
170 }
171
172 Ok(duplicated_events)
173 }
174
175 async fn find_event(
176 &self,
177 room_id: &RoomId,
178 event_id: &EventId,
179 ) -> Result<Option<Event>, Self::Error> {
180 let inner = self.inner.read().unwrap();
181
182 let event = inner
183 .events
184 .items(room_id)
185 .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
186
187 Ok(event)
188 }
189
190 async fn find_event_relations(
191 &self,
192 room_id: &RoomId,
193 event_id: &EventId,
194 filters: Option<&[RelationType]>,
195 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
196 let inner = self.inner.read().unwrap();
197
198 let related_events = inner
199 .events
200 .items(room_id)
201 .filter_map(|(event, pos)| {
202 let (related_to, rel_type) = extract_event_relation(event.raw())?;
204 let rel_type = RelationType::from(rel_type.as_str());
205
206 if related_to != event_id {
208 return None;
209 }
210
211 if let Some(filters) = &filters {
213 filters.contains(&rel_type).then_some((event.clone(), pos))
214 } else {
215 Some((event.clone(), pos))
216 }
217 })
218 .collect();
219
220 Ok(related_events)
221 }
222
223 async fn get_room_events(
224 &self,
225 room_id: &RoomId,
226 event_type: Option<&str>,
227 session_id: Option<&str>,
228 ) -> Result<Vec<Event>, Self::Error> {
229 let inner = self.inner.read().unwrap();
230
231 let event: Vec<_> = inner
232 .events
233 .items(room_id)
234 .map(|(event, _pos)| event.clone())
235 .filter(|e| {
236 event_type
237 .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
238 })
239 .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
240 .collect();
241
242 Ok(event)
243 }
244
245 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
246 if event.event_id().is_none() {
247 error!(%room_id, "Trying to save an event with no ID");
248 return Ok(());
249 }
250 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
251 Ok(())
252 }
253}
254
255#[cfg(test)]
256#[allow(unused_imports)] mod tests {
258 use super::{MemoryStore, Result};
259 use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
260
261 async fn get_event_cache_store() -> Result<MemoryStore> {
262 Ok(MemoryStore::new())
263 }
264
265 event_cache_store_integration_tests!();
266 event_cache_store_integration_tests_time!();
267}