matrix_sdk_base/media/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 cross_process_lock::{
24 CrossProcessLockGeneration,
25 memory_store_helper::{Lease, try_take_leased_lock},
26 },
27 ring_buffer::RingBuffer,
28};
29use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
30
31use super::Result;
32use crate::media::{
33 MediaRequestParameters, UniqueKey as _,
34 store::{
35 IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
36 MediaStoreError, MediaStoreInner,
37 },
38};
39
40#[derive(Debug, Clone)]
44pub struct MemoryMediaStore {
45 inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
46 media_service: MediaService,
47}
48
49#[derive(Debug)]
50struct MemoryMediaStoreInner {
51 media: RingBuffer<MediaContent>,
52 leases: HashMap<String, Lease>,
53 media_retention_policy: Option<MediaRetentionPolicy>,
54 last_media_cleanup_time: SystemTime,
55}
56
57#[derive(Debug)]
59struct MediaContent {
60 uri: OwnedMxcUri,
62
63 key: String,
65
66 data: Vec<u8>,
68
69 ignore_policy: bool,
71
72 last_access: SystemTime,
74}
75
76const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
77
78impl Default for MemoryMediaStore {
79 fn default() -> Self {
80 let last_media_cleanup_time = SystemTime::now();
82 let media_service = MediaService::new();
83 media_service.restore(None, Some(last_media_cleanup_time));
84
85 Self {
86 inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
87 media: RingBuffer::new(NUMBER_OF_MEDIAS),
88 leases: Default::default(),
89 media_retention_policy: None,
90 last_media_cleanup_time,
91 })),
92 media_service,
93 }
94 }
95}
96
97impl MemoryMediaStore {
98 pub fn new() -> Self {
100 Self::default()
101 }
102}
103
104#[cfg_attr(target_family = "wasm", async_trait(?Send))]
105#[cfg_attr(not(target_family = "wasm"), async_trait)]
106impl MediaStore for MemoryMediaStore {
107 type Error = MediaStoreError;
108
109 async fn try_take_leased_lock(
110 &self,
111 lease_duration_ms: u32,
112 key: &str,
113 holder: &str,
114 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
115 let mut inner = self.inner.write().unwrap();
116
117 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
118 }
119
120 async fn add_media_content(
121 &self,
122 request: &MediaRequestParameters,
123 data: Vec<u8>,
124 ignore_policy: IgnoreMediaRetentionPolicy,
125 ) -> Result<(), Self::Error> {
126 self.media_service.add_media_content(self, request, data, ignore_policy).await
127 }
128
129 async fn replace_media_key(
130 &self,
131 from: &MediaRequestParameters,
132 to: &MediaRequestParameters,
133 ) -> Result<(), Self::Error> {
134 let expected_key = from.unique_key();
135
136 let mut inner = self.inner.write().unwrap();
137
138 if let Some(media_content) =
139 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
140 {
141 media_content.uri = to.uri().to_owned();
142 media_content.key = to.unique_key();
143 }
144
145 Ok(())
146 }
147
148 async fn get_media_content(
149 &self,
150 request: &MediaRequestParameters,
151 ) -> Result<Option<Vec<u8>>, Self::Error> {
152 self.media_service.get_media_content(self, request).await
153 }
154
155 async fn remove_media_content(
156 &self,
157 request: &MediaRequestParameters,
158 ) -> Result<(), Self::Error> {
159 let expected_key = request.unique_key();
160
161 let mut inner = self.inner.write().unwrap();
162
163 let Some(index) =
164 inner.media.iter().position(|media_content| media_content.key == expected_key)
165 else {
166 return Ok(());
167 };
168
169 inner.media.remove(index);
170
171 Ok(())
172 }
173
174 async fn get_media_content_for_uri(
175 &self,
176 uri: &MxcUri,
177 ) -> Result<Option<Vec<u8>>, Self::Error> {
178 self.media_service.get_media_content_for_uri(self, uri).await
179 }
180
181 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
182 let mut inner = self.inner.write().unwrap();
183
184 let positions = inner
185 .media
186 .iter()
187 .enumerate()
188 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
189 .collect::<Vec<_>>();
190
191 for position in positions.into_iter().rev() {
193 inner.media.remove(position);
194 }
195
196 Ok(())
197 }
198
199 async fn set_media_retention_policy(
200 &self,
201 policy: MediaRetentionPolicy,
202 ) -> Result<(), Self::Error> {
203 self.media_service.set_media_retention_policy(self, policy).await
204 }
205
206 fn media_retention_policy(&self) -> MediaRetentionPolicy {
207 self.media_service.media_retention_policy()
208 }
209
210 async fn set_ignore_media_retention_policy(
211 &self,
212 request: &MediaRequestParameters,
213 ignore_policy: IgnoreMediaRetentionPolicy,
214 ) -> Result<(), Self::Error> {
215 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
216 }
217
218 async fn clean(&self) -> Result<(), Self::Error> {
219 self.media_service.clean(self).await
220 }
221
222 async fn optimize(&self) -> Result<(), Self::Error> {
223 Ok(())
224 }
225
226 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
227 Ok(None)
228 }
229
230 async fn close(&self) -> Result<(), Self::Error> {
231 Ok(())
232 }
233
234 async fn reopen(&self) -> Result<(), Self::Error> {
235 Ok(())
236 }
237}
238
239#[cfg_attr(target_family = "wasm", async_trait(?Send))]
240#[cfg_attr(not(target_family = "wasm"), async_trait)]
241impl MediaStoreInner for MemoryMediaStore {
242 type Error = MediaStoreError;
243
244 async fn media_retention_policy_inner(
245 &self,
246 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
247 Ok(self.inner.read().unwrap().media_retention_policy)
248 }
249
250 async fn set_media_retention_policy_inner(
251 &self,
252 policy: MediaRetentionPolicy,
253 ) -> Result<(), Self::Error> {
254 self.inner.write().unwrap().media_retention_policy = Some(policy);
255 Ok(())
256 }
257
258 async fn add_media_content_inner(
259 &self,
260 request: &MediaRequestParameters,
261 data: Vec<u8>,
262 last_access: SystemTime,
263 policy: MediaRetentionPolicy,
264 ignore_policy: IgnoreMediaRetentionPolicy,
265 ) -> Result<(), Self::Error> {
266 self.remove_media_content(request).await?;
268
269 let ignore_policy = ignore_policy.is_yes();
270
271 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
272 return Ok(());
274 }
275
276 let mut inner = self.inner.write().unwrap();
278 inner.media.push(MediaContent {
279 uri: request.uri().to_owned(),
280 key: request.unique_key(),
281 data,
282 ignore_policy,
283 last_access,
284 });
285
286 Ok(())
287 }
288
289 async fn set_ignore_media_retention_policy_inner(
290 &self,
291 request: &MediaRequestParameters,
292 ignore_policy: IgnoreMediaRetentionPolicy,
293 ) -> Result<(), Self::Error> {
294 let mut inner = self.inner.write().unwrap();
295 let expected_key = request.unique_key();
296
297 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
298 {
299 media_content.ignore_policy = ignore_policy.is_yes();
300 }
301
302 Ok(())
303 }
304
305 async fn get_media_content_inner(
306 &self,
307 request: &MediaRequestParameters,
308 current_time: SystemTime,
309 ) -> Result<Option<Vec<u8>>, Self::Error> {
310 let mut inner = self.inner.write().unwrap();
311 let expected_key = request.unique_key();
312
313 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
316 return Ok(None);
317 };
318 let Some(mut content) = inner.media.remove(index) else {
319 return Ok(None);
320 };
321
322 let data = content.data.clone();
324
325 content.last_access = current_time;
327
328 inner.media.push(content);
330
331 Ok(Some(data))
332 }
333
334 async fn get_media_content_for_uri_inner(
335 &self,
336 expected_uri: &MxcUri,
337 current_time: SystemTime,
338 ) -> Result<Option<Vec<u8>>, Self::Error> {
339 let mut inner = self.inner.write().unwrap();
340
341 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
344 return Ok(None);
345 };
346 let Some(mut content) = inner.media.remove(index) else {
347 return Ok(None);
348 };
349
350 let data = content.data.clone();
352
353 content.last_access = current_time;
355
356 inner.media.push(content);
358
359 Ok(Some(data))
360 }
361
362 async fn clean_inner(
363 &self,
364 policy: MediaRetentionPolicy,
365 current_time: SystemTime,
366 ) -> Result<(), Self::Error> {
367 if !policy.has_limitations() {
368 return Ok(());
370 }
371
372 let mut inner = self.inner.write().unwrap();
373
374 if policy.computed_max_file_size().is_some() {
376 inner.media.retain(|content| {
377 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
378 });
379 }
380
381 if policy.last_access_expiry.is_some() {
383 inner.media.retain(|content| {
384 content.ignore_policy
385 || !policy.has_content_expired(current_time, content.last_access)
386 });
387 }
388
389 if let Some(max_cache_size) = policy.max_cache_size {
391 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
395 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
396 |(mut cache_size, mut items_to_remove), (index, content)| {
397 if content.ignore_policy {
398 return (cache_size, items_to_remove);
400 }
401
402 let remove_item = if items_to_remove.is_empty() {
403 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
405 cache_size = sum;
406 cache_size > max_cache_size
408 } else {
409 true
413 }
414 } else {
415 true
417 };
418
419 if remove_item {
420 items_to_remove.push(index);
421 }
422
423 (cache_size, items_to_remove)
424 },
425 );
426
427 for index in items_to_remove {
430 inner.media.remove(index);
431 }
432 }
433
434 inner.last_media_cleanup_time = current_time;
435
436 Ok(())
437 }
438
439 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
440 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
441 }
442}
443
444#[cfg(test)]
445mod tests {
446 use super::{MemoryMediaStore, Result};
447 use crate::{
448 media_store_inner_integration_tests, media_store_integration_tests,
449 media_store_integration_tests_time,
450 };
451
452 async fn get_media_store() -> Result<MemoryMediaStore> {
453 Ok(MemoryMediaStore::new())
454 }
455
456 media_store_inner_integration_tests!();
457 media_store_integration_tests!();
458 media_store_integration_tests_time!();
459}