matrix_sdk_base/event_cache/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 sync::{Arc, RwLock as StdRwLock},
18};
19
20use async_trait::async_trait;
21use matrix_sdk_common::{
22 cross_process_lock::memory_store_helper::try_take_leased_lock,
23 linked_chunk::{
24 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
25 RawChunk, Update, relational::RelationalLinkedChunk,
26 },
27};
28use ruma::{EventId, OwnedEventId, RoomId, events::relation::RelationType, time::Instant};
29use tracing::error;
30
31use super::{EventCacheStore, EventCacheStoreError, Result, extract_event_relation};
32use crate::event_cache::{Event, Gap};
33
34#[derive(Debug, Clone)]
38pub struct MemoryStore {
39 inner: Arc<StdRwLock<MemoryStoreInner>>,
40}
41
42#[derive(Debug)]
43struct MemoryStoreInner {
44 leases: HashMap<String, (String, Instant)>,
45 events: RelationalLinkedChunk<OwnedEventId, Event, Gap>,
46}
47
48impl Default for MemoryStore {
49 fn default() -> Self {
50 Self {
51 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
52 leases: Default::default(),
53 events: RelationalLinkedChunk::new(),
54 })),
55 }
56 }
57}
58
59impl MemoryStore {
60 pub fn new() -> Self {
62 Self::default()
63 }
64}
65
66#[cfg_attr(target_family = "wasm", async_trait(?Send))]
67#[cfg_attr(not(target_family = "wasm"), async_trait)]
68impl EventCacheStore for MemoryStore {
69 type Error = EventCacheStoreError;
70
71 async fn try_take_leased_lock(
72 &self,
73 lease_duration_ms: u32,
74 key: &str,
75 holder: &str,
76 ) -> Result<bool, Self::Error> {
77 let mut inner = self.inner.write().unwrap();
78
79 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
80 }
81
82 async fn handle_linked_chunk_updates(
83 &self,
84 linked_chunk_id: LinkedChunkId<'_>,
85 updates: Vec<Update<Event, Gap>>,
86 ) -> Result<(), Self::Error> {
87 let mut inner = self.inner.write().unwrap();
88 inner.events.apply_updates(linked_chunk_id, updates);
89
90 Ok(())
91 }
92
93 async fn load_all_chunks(
94 &self,
95 linked_chunk_id: LinkedChunkId<'_>,
96 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
97 let inner = self.inner.read().unwrap();
98 inner
99 .events
100 .load_all_chunks(linked_chunk_id)
101 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
102 }
103
104 async fn load_all_chunks_metadata(
105 &self,
106 linked_chunk_id: LinkedChunkId<'_>,
107 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
108 let inner = self.inner.read().unwrap();
109 inner
110 .events
111 .load_all_chunks_metadata(linked_chunk_id)
112 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
113 }
114
115 async fn load_last_chunk(
116 &self,
117 linked_chunk_id: LinkedChunkId<'_>,
118 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
119 let inner = self.inner.read().unwrap();
120 inner
121 .events
122 .load_last_chunk(linked_chunk_id)
123 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
124 }
125
126 async fn load_previous_chunk(
127 &self,
128 linked_chunk_id: LinkedChunkId<'_>,
129 before_chunk_identifier: ChunkIdentifier,
130 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
131 let inner = self.inner.read().unwrap();
132 inner
133 .events
134 .load_previous_chunk(linked_chunk_id, before_chunk_identifier)
135 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
136 }
137
138 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
139 self.inner.write().unwrap().events.clear();
140 Ok(())
141 }
142
143 async fn filter_duplicated_events(
144 &self,
145 linked_chunk_id: LinkedChunkId<'_>,
146 mut events: Vec<OwnedEventId>,
147 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
148 if events.is_empty() {
149 return Ok(Vec::new());
150 }
151
152 let inner = self.inner.read().unwrap();
153
154 let mut duplicated_events = Vec::new();
155
156 for (event, position) in
157 inner.events.unordered_linked_chunk_items(&linked_chunk_id.to_owned())
158 {
159 if let Some(known_event_id) = event.event_id() {
160 if let Some(index) =
162 events.iter().position(|new_event_id| &known_event_id == new_event_id)
163 {
164 duplicated_events.push((events.remove(index), position));
165 }
166 }
167 }
168
169 Ok(duplicated_events)
170 }
171
172 async fn find_event(
173 &self,
174 room_id: &RoomId,
175 event_id: &EventId,
176 ) -> Result<Option<Event>, Self::Error> {
177 let inner = self.inner.read().unwrap();
178
179 let event = inner
180 .events
181 .items(room_id)
182 .find_map(|(event, _pos)| (event.event_id()? == event_id).then_some(event.clone()));
183
184 Ok(event)
185 }
186
187 async fn find_event_relations(
188 &self,
189 room_id: &RoomId,
190 event_id: &EventId,
191 filters: Option<&[RelationType]>,
192 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
193 let inner = self.inner.read().unwrap();
194
195 let related_events = inner
196 .events
197 .items(room_id)
198 .filter_map(|(event, pos)| {
199 let (related_to, rel_type) = extract_event_relation(event.raw())?;
201 let rel_type = RelationType::from(rel_type.as_str());
202
203 if related_to != event_id {
205 return None;
206 }
207
208 if let Some(filters) = &filters {
210 filters.contains(&rel_type).then_some((event.clone(), pos))
211 } else {
212 Some((event.clone(), pos))
213 }
214 })
215 .collect();
216
217 Ok(related_events)
218 }
219
220 async fn get_room_events(&self, room_id: &RoomId) -> Result<Vec<Event>, Self::Error> {
221 let inner = self.inner.read().unwrap();
222
223 let event: Vec<_> =
224 inner.events.items(room_id).map(|(event, _pos)| event.clone()).collect();
225
226 Ok(event)
227 }
228
229 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
230 if event.event_id().is_none() {
231 error!(%room_id, "Trying to save an event with no ID");
232 return Ok(());
233 }
234 self.inner.write().unwrap().events.save_item(room_id.to_owned(), event);
235 Ok(())
236 }
237}
238
239#[cfg(test)]
240#[allow(unused_imports)] mod tests {
242 use super::{MemoryStore, Result};
243 use crate::{event_cache_store_integration_tests, event_cache_store_integration_tests_time};
244
245 async fn get_event_cache_store() -> Result<MemoryStore> {
246 Ok(MemoryStore::new())
247 }
248
249 event_cache_store_integration_tests!();
250 event_cache_store_integration_tests_time!();
251}