1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 linked_chunk::{
24 relational::RelationalLinkedChunk, ChunkIdentifier, ChunkIdentifierGenerator, Position,
25 RawChunk, Update,
26 },
27 ring_buffer::RingBuffer,
28 store_locks::memory_store_helper::try_take_leased_lock,
29};
30use ruma::{
31 time::{Instant, SystemTime},
32 EventId, MxcUri, OwnedEventId, OwnedMxcUri, RoomId,
33};
34
35use super::{
36 media::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService},
37 EventCacheStore, EventCacheStoreError, Result,
38};
39use crate::{
40 event_cache::{Event, Gap},
41 media::{MediaRequestParameters, UniqueKey as _},
42};
43
44#[derive(Debug, Clone)]
48pub struct MemoryStore {
49 inner: Arc<StdRwLock<MemoryStoreInner>>,
50 media_service: MediaService,
51}
52
53#[derive(Debug)]
54struct MemoryStoreInner {
55 media: RingBuffer<MediaContent>,
56 leases: HashMap<String, (String, Instant)>,
57 events: RelationalLinkedChunk<Event, Gap>,
58 media_retention_policy: Option<MediaRetentionPolicy>,
59 last_media_cleanup_time: SystemTime,
60}
61
62#[derive(Debug)]
64struct MediaContent {
65 uri: OwnedMxcUri,
67
68 key: String,
70
71 data: Vec<u8>,
73
74 ignore_policy: bool,
76
77 last_access: SystemTime,
79}
80
81const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
82
83impl Default for MemoryStore {
84 fn default() -> Self {
85 let last_media_cleanup_time = SystemTime::now();
87 let media_service = MediaService::new();
88 media_service.restore(None, Some(last_media_cleanup_time));
89
90 Self {
91 inner: Arc::new(StdRwLock::new(MemoryStoreInner {
92 media: RingBuffer::new(NUMBER_OF_MEDIAS),
93 leases: Default::default(),
94 events: RelationalLinkedChunk::new(),
95 media_retention_policy: None,
96 last_media_cleanup_time,
97 })),
98 media_service,
99 }
100 }
101}
102
103impl MemoryStore {
104 pub fn new() -> Self {
106 Self::default()
107 }
108}
109
110#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
111#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
112impl EventCacheStore for MemoryStore {
113 type Error = EventCacheStoreError;
114
115 async fn try_take_leased_lock(
116 &self,
117 lease_duration_ms: u32,
118 key: &str,
119 holder: &str,
120 ) -> Result<bool, Self::Error> {
121 let mut inner = self.inner.write().unwrap();
122
123 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
124 }
125
126 async fn handle_linked_chunk_updates(
127 &self,
128 room_id: &RoomId,
129 updates: Vec<Update<Event, Gap>>,
130 ) -> Result<(), Self::Error> {
131 let mut inner = self.inner.write().unwrap();
132 inner.events.apply_updates(room_id, updates);
133
134 Ok(())
135 }
136
137 async fn load_all_chunks(
138 &self,
139 room_id: &RoomId,
140 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
141 let inner = self.inner.read().unwrap();
142 inner
143 .events
144 .load_all_chunks(room_id)
145 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
146 }
147
148 async fn load_last_chunk(
149 &self,
150 room_id: &RoomId,
151 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
152 let inner = self.inner.read().unwrap();
153 inner
154 .events
155 .load_last_chunk(room_id)
156 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
157 }
158
159 async fn load_previous_chunk(
160 &self,
161 room_id: &RoomId,
162 before_chunk_identifier: ChunkIdentifier,
163 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
164 let inner = self.inner.read().unwrap();
165 inner
166 .events
167 .load_previous_chunk(room_id, before_chunk_identifier)
168 .map_err(|err| EventCacheStoreError::InvalidData { details: err })
169 }
170
171 async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
172 self.inner.write().unwrap().events.clear();
173 Ok(())
174 }
175
176 async fn filter_duplicated_events(
177 &self,
178 room_id: &RoomId,
179 mut events: Vec<OwnedEventId>,
180 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
181 let inner = self.inner.read().unwrap();
183
184 let mut duplicated_events = Vec::new();
185
186 for (event, position) in inner.events.unordered_room_items(room_id) {
187 if events.is_empty() {
189 break;
190 }
191
192 if let Some(known_event_id) = event.event_id() {
193 if let Some(index) =
195 events.iter().position(|new_event_id| &known_event_id == new_event_id)
196 {
197 duplicated_events.push((events.remove(index), position));
198 }
199 }
200 }
201
202 Ok(duplicated_events)
203 }
204
205 async fn find_event(
206 &self,
207 room_id: &RoomId,
208 event_id: &EventId,
209 ) -> Result<Option<(Position, Event)>, Self::Error> {
210 let inner = self.inner.read().unwrap();
211
212 let event_and_room = inner.events.items().find_map(|(position, event, this_room_id)| {
213 (room_id == this_room_id && event.event_id()? == event_id)
214 .then_some((position, event.clone()))
215 });
216
217 Ok(event_and_room)
218 }
219
220 async fn add_media_content(
221 &self,
222 request: &MediaRequestParameters,
223 data: Vec<u8>,
224 ignore_policy: IgnoreMediaRetentionPolicy,
225 ) -> Result<()> {
226 self.media_service.add_media_content(self, request, data, ignore_policy).await
227 }
228
229 async fn replace_media_key(
230 &self,
231 from: &MediaRequestParameters,
232 to: &MediaRequestParameters,
233 ) -> Result<(), Self::Error> {
234 let expected_key = from.unique_key();
235
236 let mut inner = self.inner.write().unwrap();
237
238 if let Some(media_content) =
239 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
240 {
241 media_content.uri = to.uri().to_owned();
242 media_content.key = to.unique_key();
243 }
244
245 Ok(())
246 }
247
248 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
249 self.media_service.get_media_content(self, request).await
250 }
251
252 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
253 let expected_key = request.unique_key();
254
255 let mut inner = self.inner.write().unwrap();
256
257 let Some(index) =
258 inner.media.iter().position(|media_content| media_content.key == expected_key)
259 else {
260 return Ok(());
261 };
262
263 inner.media.remove(index);
264
265 Ok(())
266 }
267
268 async fn get_media_content_for_uri(
269 &self,
270 uri: &MxcUri,
271 ) -> Result<Option<Vec<u8>>, Self::Error> {
272 self.media_service.get_media_content_for_uri(self, uri).await
273 }
274
275 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
276 let mut inner = self.inner.write().unwrap();
277
278 let positions = inner
279 .media
280 .iter()
281 .enumerate()
282 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
283 .collect::<Vec<_>>();
284
285 for position in positions.into_iter().rev() {
287 inner.media.remove(position);
288 }
289
290 Ok(())
291 }
292
293 async fn set_media_retention_policy(
294 &self,
295 policy: MediaRetentionPolicy,
296 ) -> Result<(), Self::Error> {
297 self.media_service.set_media_retention_policy(self, policy).await
298 }
299
300 fn media_retention_policy(&self) -> MediaRetentionPolicy {
301 self.media_service.media_retention_policy()
302 }
303
304 async fn set_ignore_media_retention_policy(
305 &self,
306 request: &MediaRequestParameters,
307 ignore_policy: IgnoreMediaRetentionPolicy,
308 ) -> Result<(), Self::Error> {
309 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
310 }
311
312 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
313 self.media_service.clean_up_media_cache(self).await
314 }
315}
316
317#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
318#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
319impl EventCacheStoreMedia for MemoryStore {
320 type Error = EventCacheStoreError;
321
322 async fn media_retention_policy_inner(
323 &self,
324 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
325 Ok(self.inner.read().unwrap().media_retention_policy)
326 }
327
328 async fn set_media_retention_policy_inner(
329 &self,
330 policy: MediaRetentionPolicy,
331 ) -> Result<(), Self::Error> {
332 self.inner.write().unwrap().media_retention_policy = Some(policy);
333 Ok(())
334 }
335
336 async fn add_media_content_inner(
337 &self,
338 request: &MediaRequestParameters,
339 data: Vec<u8>,
340 last_access: SystemTime,
341 policy: MediaRetentionPolicy,
342 ignore_policy: IgnoreMediaRetentionPolicy,
343 ) -> Result<(), Self::Error> {
344 self.remove_media_content(request).await?;
346
347 let ignore_policy = ignore_policy.is_yes();
348
349 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
350 return Ok(());
352 };
353
354 let mut inner = self.inner.write().unwrap();
356 inner.media.push(MediaContent {
357 uri: request.uri().to_owned(),
358 key: request.unique_key(),
359 data,
360 ignore_policy,
361 last_access,
362 });
363
364 Ok(())
365 }
366
367 async fn set_ignore_media_retention_policy_inner(
368 &self,
369 request: &MediaRequestParameters,
370 ignore_policy: IgnoreMediaRetentionPolicy,
371 ) -> Result<(), Self::Error> {
372 let mut inner = self.inner.write().unwrap();
373 let expected_key = request.unique_key();
374
375 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
376 {
377 media_content.ignore_policy = ignore_policy.is_yes();
378 }
379
380 Ok(())
381 }
382
383 async fn get_media_content_inner(
384 &self,
385 request: &MediaRequestParameters,
386 current_time: SystemTime,
387 ) -> Result<Option<Vec<u8>>, Self::Error> {
388 let mut inner = self.inner.write().unwrap();
389 let expected_key = request.unique_key();
390
391 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
394 return Ok(None);
395 };
396 let Some(mut content) = inner.media.remove(index) else {
397 return Ok(None);
398 };
399
400 let data = content.data.clone();
402
403 content.last_access = current_time;
405
406 inner.media.push(content);
408
409 Ok(Some(data))
410 }
411
412 async fn get_media_content_for_uri_inner(
413 &self,
414 expected_uri: &MxcUri,
415 current_time: SystemTime,
416 ) -> Result<Option<Vec<u8>>, Self::Error> {
417 let mut inner = self.inner.write().unwrap();
418
419 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
422 return Ok(None);
423 };
424 let Some(mut content) = inner.media.remove(index) else {
425 return Ok(None);
426 };
427
428 let data = content.data.clone();
430
431 content.last_access = current_time;
433
434 inner.media.push(content);
436
437 Ok(Some(data))
438 }
439
440 async fn clean_up_media_cache_inner(
441 &self,
442 policy: MediaRetentionPolicy,
443 current_time: SystemTime,
444 ) -> Result<(), Self::Error> {
445 if !policy.has_limitations() {
446 return Ok(());
448 }
449
450 let mut inner = self.inner.write().unwrap();
451
452 if policy.computed_max_file_size().is_some() {
454 inner.media.retain(|content| {
455 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
456 });
457 }
458
459 if policy.last_access_expiry.is_some() {
461 inner.media.retain(|content| {
462 content.ignore_policy
463 || !policy.has_content_expired(current_time, content.last_access)
464 });
465 }
466
467 if let Some(max_cache_size) = policy.max_cache_size {
469 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
473 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
474 |(mut cache_size, mut items_to_remove), (index, content)| {
475 if content.ignore_policy {
476 return (cache_size, items_to_remove);
478 }
479
480 let remove_item = if items_to_remove.is_empty() {
481 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
483 cache_size = sum;
484 cache_size > max_cache_size
486 } else {
487 true
491 }
492 } else {
493 true
495 };
496
497 if remove_item {
498 items_to_remove.push(index);
499 }
500
501 (cache_size, items_to_remove)
502 },
503 );
504
505 for index in items_to_remove {
508 inner.media.remove(index);
509 }
510 }
511
512 inner.last_media_cleanup_time = current_time;
513
514 Ok(())
515 }
516
517 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
518 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::{MemoryStore, Result};
525 use crate::event_cache_store_media_integration_tests;
526
527 async fn get_event_cache_store() -> Result<MemoryStore> {
528 Ok(MemoryStore::new())
529 }
530
531 event_cache_store_integration_tests!();
532 event_cache_store_integration_tests_time!();
533 event_cache_store_media_integration_tests!(with_media_size_tests);
534}