matrix_sdk_base/media/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 cross_process_lock::{
24 CrossProcessLockGeneration,
25 memory_store_helper::{Lease, try_take_leased_lock},
26 },
27 ring_buffer::RingBuffer,
28};
29use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
30
31use super::Result;
32use crate::media::{
33 MediaRequestParameters, UniqueKey as _,
34 store::{
35 IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
36 MediaStoreError, MediaStoreInner,
37 },
38};
39
40#[derive(Debug, Clone)]
44pub struct MemoryMediaStore {
45 inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
46 media_service: MediaService,
47}
48
49#[derive(Debug)]
50struct MemoryMediaStoreInner {
51 media: RingBuffer<MediaContent>,
52 leases: HashMap<String, Lease>,
53 media_retention_policy: Option<MediaRetentionPolicy>,
54 last_media_cleanup_time: SystemTime,
55}
56
57#[derive(Debug)]
59struct MediaContent {
60 uri: OwnedMxcUri,
62
63 key: String,
65
66 data: Vec<u8>,
68
69 ignore_policy: bool,
71
72 last_access: SystemTime,
74}
75
76const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
77
78impl Default for MemoryMediaStore {
79 fn default() -> Self {
80 let last_media_cleanup_time = SystemTime::now();
82 let media_service = MediaService::new();
83 media_service.restore(None, Some(last_media_cleanup_time));
84
85 Self {
86 inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
87 media: RingBuffer::new(NUMBER_OF_MEDIAS),
88 leases: Default::default(),
89 media_retention_policy: None,
90 last_media_cleanup_time,
91 })),
92 media_service,
93 }
94 }
95}
96
97impl MemoryMediaStore {
98 pub fn new() -> Self {
100 Self::default()
101 }
102}
103
104#[cfg_attr(target_family = "wasm", async_trait(?Send))]
105#[cfg_attr(not(target_family = "wasm"), async_trait)]
106impl MediaStore for MemoryMediaStore {
107 type Error = MediaStoreError;
108
109 async fn try_take_leased_lock(
110 &self,
111 lease_duration_ms: u32,
112 key: &str,
113 holder: &str,
114 ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
115 let mut inner = self.inner.write().unwrap();
116
117 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
118 }
119
120 async fn add_media_content(
121 &self,
122 request: &MediaRequestParameters,
123 data: Vec<u8>,
124 ignore_policy: IgnoreMediaRetentionPolicy,
125 ) -> Result<(), Self::Error> {
126 self.media_service.add_media_content(self, request, data, ignore_policy).await
127 }
128
129 async fn replace_media_key(
130 &self,
131 from: &MediaRequestParameters,
132 to: &MediaRequestParameters,
133 ) -> Result<(), Self::Error> {
134 let expected_key = from.unique_key();
135
136 let mut inner = self.inner.write().unwrap();
137
138 if let Some(media_content) =
139 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
140 {
141 media_content.uri = to.uri().to_owned();
142 media_content.key = to.unique_key();
143 }
144
145 Ok(())
146 }
147
148 async fn get_media_content(
149 &self,
150 request: &MediaRequestParameters,
151 ) -> Result<Option<Vec<u8>>, Self::Error> {
152 self.media_service.get_media_content(self, request).await
153 }
154
155 async fn remove_media_content(
156 &self,
157 request: &MediaRequestParameters,
158 ) -> Result<(), Self::Error> {
159 let expected_key = request.unique_key();
160
161 let mut inner = self.inner.write().unwrap();
162
163 let Some(index) =
164 inner.media.iter().position(|media_content| media_content.key == expected_key)
165 else {
166 return Ok(());
167 };
168
169 inner.media.remove(index);
170
171 Ok(())
172 }
173
174 async fn get_media_content_for_uri(
175 &self,
176 uri: &MxcUri,
177 ) -> Result<Option<Vec<u8>>, Self::Error> {
178 self.media_service.get_media_content_for_uri(self, uri).await
179 }
180
181 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
182 let mut inner = self.inner.write().unwrap();
183
184 let positions = inner
185 .media
186 .iter()
187 .enumerate()
188 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
189 .collect::<Vec<_>>();
190
191 for position in positions.into_iter().rev() {
193 inner.media.remove(position);
194 }
195
196 Ok(())
197 }
198
199 async fn set_media_retention_policy(
200 &self,
201 policy: MediaRetentionPolicy,
202 ) -> Result<(), Self::Error> {
203 self.media_service.set_media_retention_policy(self, policy).await
204 }
205
206 fn media_retention_policy(&self) -> MediaRetentionPolicy {
207 self.media_service.media_retention_policy()
208 }
209
210 async fn set_ignore_media_retention_policy(
211 &self,
212 request: &MediaRequestParameters,
213 ignore_policy: IgnoreMediaRetentionPolicy,
214 ) -> Result<(), Self::Error> {
215 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
216 }
217
218 async fn clean(&self) -> Result<(), Self::Error> {
219 self.media_service.clean(self).await
220 }
221}
222
223#[cfg_attr(target_family = "wasm", async_trait(?Send))]
224#[cfg_attr(not(target_family = "wasm"), async_trait)]
225impl MediaStoreInner for MemoryMediaStore {
226 type Error = MediaStoreError;
227
228 async fn media_retention_policy_inner(
229 &self,
230 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
231 Ok(self.inner.read().unwrap().media_retention_policy)
232 }
233
234 async fn set_media_retention_policy_inner(
235 &self,
236 policy: MediaRetentionPolicy,
237 ) -> Result<(), Self::Error> {
238 self.inner.write().unwrap().media_retention_policy = Some(policy);
239 Ok(())
240 }
241
242 async fn add_media_content_inner(
243 &self,
244 request: &MediaRequestParameters,
245 data: Vec<u8>,
246 last_access: SystemTime,
247 policy: MediaRetentionPolicy,
248 ignore_policy: IgnoreMediaRetentionPolicy,
249 ) -> Result<(), Self::Error> {
250 self.remove_media_content(request).await?;
252
253 let ignore_policy = ignore_policy.is_yes();
254
255 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
256 return Ok(());
258 }
259
260 let mut inner = self.inner.write().unwrap();
262 inner.media.push(MediaContent {
263 uri: request.uri().to_owned(),
264 key: request.unique_key(),
265 data,
266 ignore_policy,
267 last_access,
268 });
269
270 Ok(())
271 }
272
273 async fn set_ignore_media_retention_policy_inner(
274 &self,
275 request: &MediaRequestParameters,
276 ignore_policy: IgnoreMediaRetentionPolicy,
277 ) -> Result<(), Self::Error> {
278 let mut inner = self.inner.write().unwrap();
279 let expected_key = request.unique_key();
280
281 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
282 {
283 media_content.ignore_policy = ignore_policy.is_yes();
284 }
285
286 Ok(())
287 }
288
289 async fn get_media_content_inner(
290 &self,
291 request: &MediaRequestParameters,
292 current_time: SystemTime,
293 ) -> Result<Option<Vec<u8>>, Self::Error> {
294 let mut inner = self.inner.write().unwrap();
295 let expected_key = request.unique_key();
296
297 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
300 return Ok(None);
301 };
302 let Some(mut content) = inner.media.remove(index) else {
303 return Ok(None);
304 };
305
306 let data = content.data.clone();
308
309 content.last_access = current_time;
311
312 inner.media.push(content);
314
315 Ok(Some(data))
316 }
317
318 async fn get_media_content_for_uri_inner(
319 &self,
320 expected_uri: &MxcUri,
321 current_time: SystemTime,
322 ) -> Result<Option<Vec<u8>>, Self::Error> {
323 let mut inner = self.inner.write().unwrap();
324
325 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
328 return Ok(None);
329 };
330 let Some(mut content) = inner.media.remove(index) else {
331 return Ok(None);
332 };
333
334 let data = content.data.clone();
336
337 content.last_access = current_time;
339
340 inner.media.push(content);
342
343 Ok(Some(data))
344 }
345
346 async fn clean_inner(
347 &self,
348 policy: MediaRetentionPolicy,
349 current_time: SystemTime,
350 ) -> Result<(), Self::Error> {
351 if !policy.has_limitations() {
352 return Ok(());
354 }
355
356 let mut inner = self.inner.write().unwrap();
357
358 if policy.computed_max_file_size().is_some() {
360 inner.media.retain(|content| {
361 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
362 });
363 }
364
365 if policy.last_access_expiry.is_some() {
367 inner.media.retain(|content| {
368 content.ignore_policy
369 || !policy.has_content_expired(current_time, content.last_access)
370 });
371 }
372
373 if let Some(max_cache_size) = policy.max_cache_size {
375 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
379 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
380 |(mut cache_size, mut items_to_remove), (index, content)| {
381 if content.ignore_policy {
382 return (cache_size, items_to_remove);
384 }
385
386 let remove_item = if items_to_remove.is_empty() {
387 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
389 cache_size = sum;
390 cache_size > max_cache_size
392 } else {
393 true
397 }
398 } else {
399 true
401 };
402
403 if remove_item {
404 items_to_remove.push(index);
405 }
406
407 (cache_size, items_to_remove)
408 },
409 );
410
411 for index in items_to_remove {
414 inner.media.remove(index);
415 }
416 }
417
418 inner.last_media_cleanup_time = current_time;
419
420 Ok(())
421 }
422
423 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
424 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::{MemoryMediaStore, Result};
431 use crate::{
432 media_store_inner_integration_tests, media_store_integration_tests,
433 media_store_integration_tests_time,
434 };
435
436 async fn get_media_store() -> Result<MemoryMediaStore> {
437 Ok(MemoryMediaStore::new())
438 }
439
440 media_store_inner_integration_tests!();
441 media_store_integration_tests!();
442 media_store_integration_tests_time!();
443}