matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22 cross_process_lock::memory_store_helper::try_take_leased_lock,
23 linked_chunk::{
24 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
25 RawChunk, Update, relational::RelationalLinkedChunk,
26 },
27};
28use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant};
29use tracing::error;
30
31use super::{
32 EventCacheStore, EventCacheStoreError, Result, compute_filters_string, extract_event_relation,
33};
34use crate::event_cache::{Event, Gap};
35
36#[derive(Debug, Clone)]
40pub struct MemoryStore {
41 inner: Arc<StdRwLock<MemoryStoreInner>>,
42}
43
44#[derive(Debug)]
45struct MemoryStoreInner {
46 leases: HashMap<String, (String, Instant)>,
47 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
48}
49
50impl Default for MemoryStore {
51 fn default() -> Self {
52 Self {
53 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
54 leases: Default::default(),
55 events: RelationalLinkedChunk::new(),
56 })),
57 }
58 }
59}
60
61impl MemoryStore {
62 pub fn new() -> Self {
64 Self::default()
65 }
66}
67
68#[cfg_attr(target_family = "wasm", async_trait(?Send))]
69#[cfg_attr(not(target_family = "wasm"), async_trait)]
70impl EventCacheStore for MemoryStore {
71 type Error = EventCacheStoreError;
72
73 async fn try_take_leased_lock(
74 &self,
75 lease_duration_ms: u32,
76 key: &str,
77 holder: &str,
78 ) -> Result<bool, Self::Error> {
79 let mut inner = self.inner.write().unwrap();
80
81 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
82 }
83
84 async fn handle_linked_chunk_updates(
85 &self,
86 linked_chunk_id: LinkedChunkId<'_>,
87 updates: Vec<Update<Event, Gap>>,
88 ) -> Result<(), Self::Error> {
89 let mut inner = self.inner.write().unwrap();
90 inner.events.apply_updates(linked_chunk_id, updates);
91
92 Ok(())
93 }
94
95 async fn load_all_chunks(
96 &self,
97 linked_chunk_id: LinkedChunkId<'_>,
98 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
99 let inner = self.inner.read().unwrap();
100 inner
101 .events
102 .load_all_chunks(linked_chunk_id)
103 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
104 }
105
106 async fn load_all_chunks_metadata(
107 &self,
108 linked_chunk_id: LinkedChunkId<'_>,
109 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
110 let inner = self.inner.read().unwrap();
111 inner
112 .events
113 .load_all_chunks_metadata(linked_chunk_id)
114 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
115 }
116
117 async fn load_last_chunk(
118 &self,
119 linked_chunk_id: LinkedChunkId<'_>,
120 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
121 let inner = self.inner.read().unwrap();
122 inner
123 .events
124 .load_last_chunk(linked_chunk_id)
125 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
126 }
127
128 async fn load_previous_chunk(
129 &self,
130 linked_chunk_id: LinkedChunkId<'_>,
131 before_chunk_identifier: ChunkIdentifier,
132 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
133 let inner = self.inner.read().unwrap();
134 inner
135 .events
136 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
137 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
138 }
139
140 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
141 self.inner.write().unwrap().events.clear();
142 Ok(())
143 }
144
145 async fn filter_duplicated_events(
146 &self,
147 linked_chunk_id: LinkedChunkId<'_>,
148 mut events: Vec<OwnedEventId>,
149 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
150 if events.is_empty() {
151 return Ok(Vec::new());
152 }
153
154 let inner = self.inner.read().unwrap();
155
156 let mut duplicated_events = Vec::new();
157
158 for (event, position) in
159 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
160 {
161 if let Some(known_event_id) = event.event_id() {
162 if let Some(index) =
164 events.iter().position(|new_event_id| &known_event_id == new_event_id)
165 {
166 duplicated_events.push((events.remove(index), position));
167 }
168 }
169 }
170
171 Ok(duplicated_events)
172 }
173
174 async fn find_event(
175 &self,
176 room_id: &RoomId,
177 event_id: &EventId,
178 ) -> Result<Option<Event>, Self::Error> {
179 let inner = self.inner.read().unwrap();
180
181 let event = inner
182 .events
183 .items(room_id)
184 .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
185
186 Ok(event)
187 }
188
189 async fn find_event_relations(
190 &self,
191 room_id: &RoomId,
192 event_id: &EventId,
193 filters: Option<&[RelationType]>,
194 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
195 let inner = self.inner.read().unwrap();
196
197 let filters = compute_filters_string(filters);
198
199 let related_events = inner
200 .events
201 .items(room_id)
202 .filter_map(|(event, pos)| {
203 let (related_to, rel_type) = extract_event_relation(event.raw())?;
205
206 if related_to != event_id {
208 return None;
209 }
210
211 if let Some(filters) = &filters {
213 filters.contains(&rel_type).then_some((event.clone(), pos))
214 } else {
215 Some((event.clone(), pos))
216 }
217 })
218 .collect();
219
220 Ok(related_events)
221 }
222
223 async fn get_room_events(&self, room_id: &RoomId) -> Result<Vec<Event>, Self::Error> {
224 let inner = self.inner.read().unwrap();
225
226 let event: Vec<_> =
227 inner.events.items(room_id).map(|(event, _pos)| event.clone()).collect();
228
229 Ok(event)
230 }
231
232 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
233 if event.event_id().is_none() {
234 error!(%room_id, "Trying to save an event with no ID");
235 return Ok(());
236 }
237 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
238 Ok(())
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::{MemoryStore, Result};
245 use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
246
247 async fn get_event_cache_store() -> Result<MemoryStore> {
248 Ok(MemoryStore::new())
249 }
250
251 event_cache_store_integration_tests!();
252 event_cache_store_integration_tests_time!();
253}