matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22 cross_process_lock::{
23 CrossProcessLockGeneration,
24 memory_store_helper::{Lease, try_take_leased_lock},
25 },
26 linked_chunk::{
27 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28 RawChunk, Update, relational::RelationalLinkedChunk,
29 },
30};
31use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType};
32use tracing::error;
33
34use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
35use crate::event_cache::{Event, Gap};
36
37#[derive(Debug, Clone)]
47pub struct MemoryStore {
48 inner: Arc<StdRwLock<MemoryStoreInner>>,
49}
50
51#[derive(Debug)]
52struct MemoryStoreInner {
53 leases: HashMap<String, Lease>,
54 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
55}
56
57impl Default for MemoryStore {
58 fn default() -> Self {
59 Self {
60 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
61 leases: Default::default(),
62 events: RelationalLinkedChunk::new(),
63 })),
64 }
65 }
66}
67
68impl MemoryStore {
69 pub fn new() -> Self {
71 Self::default()
72 }
73}
74
75#[cfg_attr(target_family = "wasm", async_trait(?Send))]
76#[cfg_attr(not(target_family = "wasm"), async_trait)]
77impl EventCacheStore for MemoryStore {
78 type Error = EventCacheStoreError;
79
80 async fn try_take_leased_lock(
81 &self,
82 lease_duration_ms: u32,
83 key: &str,
84 holder: &str,
85 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
86 let mut inner = self.inner.write().unwrap();
87
88 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
89 }
90
91 async fn handle_linked_chunk_updates(
92 &self,
93 linked_chunk_id: LinkedChunkId<'_>,
94 updates: Vec<Update<Event, Gap>>,
95 ) -> Result<(), Self::Error> {
96 let mut inner = self.inner.write().unwrap();
97 inner
98 .events
99 .apply_updates(linked_chunk_id, updates)
100 .map_err(|e| Self::Error::Backend(Box::new(e)))?;
101
102 Ok(())
103 }
104
105 async fn load_all_chunks(
106 &self,
107 linked_chunk_id: LinkedChunkId<'_>,
108 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
109 let inner = self.inner.read().unwrap();
110 inner
111 .events
112 .load_all_chunks(linked_chunk_id)
113 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
114 }
115
116 async fn load_all_chunks_metadata(
117 &self,
118 linked_chunk_id: LinkedChunkId<'_>,
119 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
120 let inner = self.inner.read().unwrap();
121 inner
122 .events
123 .load_all_chunks_metadata(linked_chunk_id)
124 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
125 }
126
127 async fn load_last_chunk(
128 &self,
129 linked_chunk_id: LinkedChunkId<'_>,
130 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
131 let inner = self.inner.read().unwrap();
132 inner
133 .events
134 .load_last_chunk(linked_chunk_id)
135 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
136 }
137
138 async fn load_previous_chunk(
139 &self,
140 linked_chunk_id: LinkedChunkId<'_>,
141 before_chunk_identifier: ChunkIdentifier,
142 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
143 let inner = self.inner.read().unwrap();
144 inner
145 .events
146 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
147 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
148 }
149
150 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
151 self.inner.write().unwrap().events.clear();
152 Ok(())
153 }
154
155 async fn filter_duplicated_events(
156 &self,
157 linked_chunk_id: LinkedChunkId<'_>,
158 mut events: Vec<OwnedEventId>,
159 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
160 if events.is_empty() {
161 return Ok(Vec::new());
162 }
163
164 let inner = self.inner.read().unwrap();
165
166 let mut duplicated_events = Vec::new();
167
168 for (event, position) in
169 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
170 {
171 if let Some(known_event_id) = event.event_id() {
172 if let Some(index) =
174 events.iter().position(|new_event_id| &known_event_id == new_event_id)
175 {
176 duplicated_events.push((events.remove(index), position));
177 }
178 }
179 }
180
181 Ok(duplicated_events)
182 }
183
184 async fn find_event(
185 &self,
186 room_id: &RoomId,
187 event_id: &EventId,
188 ) -> Result<Option<Event>, Self::Error> {
189 let inner = self.inner.read().unwrap();
190
191 let event = inner
192 .events
193 .items(room_id)
194 .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
195
196 Ok(event)
197 }
198
199 async fn find_event_relations(
200 &self,
201 room_id: &RoomId,
202 event_id: &EventId,
203 filters: Option<&[RelationType]>,
204 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
205 let inner = self.inner.read().unwrap();
206
207 let related_events = inner
208 .events
209 .items(room_id)
210 .filter_map(|(event, pos)| {
211 let (related_to, rel_type) = extract_event_relation(event.raw())?;
213 let rel_type = RelationType::from(rel_type.as_str());
214
215 if related_to != event_id {
217 return None;
218 }
219
220 if let Some(filters) = &filters {
222 filters.contains(&rel_type).then_some((event.clone(), pos))
223 } else {
224 Some((event.clone(), pos))
225 }
226 })
227 .collect();
228
229 Ok(related_events)
230 }
231
232 async fn get_room_events(
233 &self,
234 room_id: &RoomId,
235 event_type: Option<&str>,
236 session_id: Option<&str>,
237 ) -> Result<Vec<Event>, Self::Error> {
238 let inner = self.inner.read().unwrap();
239
240 let event: Vec<_> = inner
241 .events
242 .items(room_id)
243 .map(|(event, _pos)| event.clone())
244 .filter(|e| {
245 event_type
246 .is_none_or(|event_type| Some(event_type) == e.kind.event_type().as_deref())
247 })
248 .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
249 .collect();
250
251 Ok(event)
252 }
253
254 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
255 if event.event_id().is_none() {
256 error!(%room_id, "Trying to save an event with no ID");
257 return Ok(());
258 }
259 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
260 Ok(())
261 }
262
263 async fn optimize(&self) -> Result<(), Self::Error> {
264 Ok(())
265 }
266
267 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
268 Ok(None)
269 }
270}
271
272#[cfg(test)]
273#[allow(unused_imports)] mod tests {
275 use super::{MemoryStore, Result};
276 use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
277
278 async fn get_event_cache_store() -> Result<MemoryStore> {
279 Ok(MemoryStore::new())
280 }
281
282 event_cache_store_integration_tests!();
283 #[cfg(not(target_family = "wasm"))]
284 event_cache_store_integration_tests_time!();
285}