matrix_sdk_base/media/store/
memory_store.rs1use std::{
16 collections::HashMap,
17 num::NonZeroUsize,
18 sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23 cross_process_lock::memory_store_helper::try_take_leased_lock, ring_buffer::RingBuffer,
24};
25use ruma::{
26 MxcUri, OwnedMxcUri,
27 time::{Instant, SystemTime},
28};
29
30use super::Result;
31use crate::media::{
32 MediaRequestParameters, UniqueKey as _,
33 store::{
34 IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
35 MediaStoreError, MediaStoreInner,
36 },
37};
38
39#[derive(Debug, Clone)]
43pub struct MemoryMediaStore {
44 inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
45 media_service: MediaService,
46}
47
48#[derive(Debug)]
49struct MemoryMediaStoreInner {
50 media: RingBuffer<MediaContent>,
51 leases: HashMap<String, (String, Instant)>,
52 media_retention_policy: Option<MediaRetentionPolicy>,
53 last_media_cleanup_time: SystemTime,
54}
55
56#[derive(Debug)]
58struct MediaContent {
59 uri: OwnedMxcUri,
61
62 key: String,
64
65 data: Vec<u8>,
67
68 ignore_policy: bool,
70
71 last_access: SystemTime,
73}
74
75const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
76
77impl Default for MemoryMediaStore {
78 fn default() -> Self {
79 let last_media_cleanup_time = SystemTime::now();
81 let media_service = MediaService::new();
82 media_service.restore(None, Some(last_media_cleanup_time));
83
84 Self {
85 inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
86 media: RingBuffer::new(NUMBER_OF_MEDIAS),
87 leases: Default::default(),
88 media_retention_policy: None,
89 last_media_cleanup_time,
90 })),
91 media_service,
92 }
93 }
94}
95
96impl MemoryMediaStore {
97 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103#[cfg_attr(target_family = "wasm", async_trait(?Send))]
104#[cfg_attr(not(target_family = "wasm"), async_trait)]
105impl MediaStore for MemoryMediaStore {
106 type Error = MediaStoreError;
107
108 async fn try_take_leased_lock(
109 &self,
110 lease_duration_ms: u32,
111 key: &str,
112 holder: &str,
113 ) -> Result<bool, Self::Error> {
114 let mut inner = self.inner.write().unwrap();
115
116 Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
117 }
118
119 async fn add_media_content(
120 &self,
121 request: &MediaRequestParameters,
122 data: Vec<u8>,
123 ignore_policy: IgnoreMediaRetentionPolicy,
124 ) -> Result<(), Self::Error> {
125 self.media_service.add_media_content(self, request, data, ignore_policy).await
126 }
127
128 async fn replace_media_key(
129 &self,
130 from: &MediaRequestParameters,
131 to: &MediaRequestParameters,
132 ) -> Result<(), Self::Error> {
133 let expected_key = from.unique_key();
134
135 let mut inner = self.inner.write().unwrap();
136
137 if let Some(media_content) =
138 inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
139 {
140 media_content.uri = to.uri().to_owned();
141 media_content.key = to.unique_key();
142 }
143
144 Ok(())
145 }
146
147 async fn get_media_content(
148 &self,
149 request: &MediaRequestParameters,
150 ) -> Result<Option<Vec<u8>>, Self::Error> {
151 self.media_service.get_media_content(self, request).await
152 }
153
154 async fn remove_media_content(
155 &self,
156 request: &MediaRequestParameters,
157 ) -> Result<(), Self::Error> {
158 let expected_key = request.unique_key();
159
160 let mut inner = self.inner.write().unwrap();
161
162 let Some(index) =
163 inner.media.iter().position(|media_content| media_content.key == expected_key)
164 else {
165 return Ok(());
166 };
167
168 inner.media.remove(index);
169
170 Ok(())
171 }
172
173 async fn get_media_content_for_uri(
174 &self,
175 uri: &MxcUri,
176 ) -> Result<Option<Vec<u8>>, Self::Error> {
177 self.media_service.get_media_content_for_uri(self, uri).await
178 }
179
180 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
181 let mut inner = self.inner.write().unwrap();
182
183 let positions = inner
184 .media
185 .iter()
186 .enumerate()
187 .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
188 .collect::<Vec<_>>();
189
190 for position in positions.into_iter().rev() {
192 inner.media.remove(position);
193 }
194
195 Ok(())
196 }
197
198 async fn set_media_retention_policy(
199 &self,
200 policy: MediaRetentionPolicy,
201 ) -> Result<(), Self::Error> {
202 self.media_service.set_media_retention_policy(self, policy).await
203 }
204
205 fn media_retention_policy(&self) -> MediaRetentionPolicy {
206 self.media_service.media_retention_policy()
207 }
208
209 async fn set_ignore_media_retention_policy(
210 &self,
211 request: &MediaRequestParameters,
212 ignore_policy: IgnoreMediaRetentionPolicy,
213 ) -> Result<(), Self::Error> {
214 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
215 }
216
217 async fn clean(&self) -> Result<(), Self::Error> {
218 self.media_service.clean(self).await
219 }
220}
221
222#[cfg_attr(target_family = "wasm", async_trait(?Send))]
223#[cfg_attr(not(target_family = "wasm"), async_trait)]
224impl MediaStoreInner for MemoryMediaStore {
225 type Error = MediaStoreError;
226
227 async fn media_retention_policy_inner(
228 &self,
229 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
230 Ok(self.inner.read().unwrap().media_retention_policy)
231 }
232
233 async fn set_media_retention_policy_inner(
234 &self,
235 policy: MediaRetentionPolicy,
236 ) -> Result<(), Self::Error> {
237 self.inner.write().unwrap().media_retention_policy = Some(policy);
238 Ok(())
239 }
240
241 async fn add_media_content_inner(
242 &self,
243 request: &MediaRequestParameters,
244 data: Vec<u8>,
245 last_access: SystemTime,
246 policy: MediaRetentionPolicy,
247 ignore_policy: IgnoreMediaRetentionPolicy,
248 ) -> Result<(), Self::Error> {
249 self.remove_media_content(request).await?;
251
252 let ignore_policy = ignore_policy.is_yes();
253
254 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
255 return Ok(());
257 }
258
259 let mut inner = self.inner.write().unwrap();
261 inner.media.push(MediaContent {
262 uri: request.uri().to_owned(),
263 key: request.unique_key(),
264 data,
265 ignore_policy,
266 last_access,
267 });
268
269 Ok(())
270 }
271
272 async fn set_ignore_media_retention_policy_inner(
273 &self,
274 request: &MediaRequestParameters,
275 ignore_policy: IgnoreMediaRetentionPolicy,
276 ) -> Result<(), Self::Error> {
277 let mut inner = self.inner.write().unwrap();
278 let expected_key = request.unique_key();
279
280 if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
281 {
282 media_content.ignore_policy = ignore_policy.is_yes();
283 }
284
285 Ok(())
286 }
287
288 async fn get_media_content_inner(
289 &self,
290 request: &MediaRequestParameters,
291 current_time: SystemTime,
292 ) -> Result<Option<Vec<u8>>, Self::Error> {
293 let mut inner = self.inner.write().unwrap();
294 let expected_key = request.unique_key();
295
296 let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
299 return Ok(None);
300 };
301 let Some(mut content) = inner.media.remove(index) else {
302 return Ok(None);
303 };
304
305 let data = content.data.clone();
307
308 content.last_access = current_time;
310
311 inner.media.push(content);
313
314 Ok(Some(data))
315 }
316
317 async fn get_media_content_for_uri_inner(
318 &self,
319 expected_uri: &MxcUri,
320 current_time: SystemTime,
321 ) -> Result<Option<Vec<u8>>, Self::Error> {
322 let mut inner = self.inner.write().unwrap();
323
324 let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
327 return Ok(None);
328 };
329 let Some(mut content) = inner.media.remove(index) else {
330 return Ok(None);
331 };
332
333 let data = content.data.clone();
335
336 content.last_access = current_time;
338
339 inner.media.push(content);
341
342 Ok(Some(data))
343 }
344
345 async fn clean_inner(
346 &self,
347 policy: MediaRetentionPolicy,
348 current_time: SystemTime,
349 ) -> Result<(), Self::Error> {
350 if !policy.has_limitations() {
351 return Ok(());
353 }
354
355 let mut inner = self.inner.write().unwrap();
356
357 if policy.computed_max_file_size().is_some() {
359 inner.media.retain(|content| {
360 content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
361 });
362 }
363
364 if policy.last_access_expiry.is_some() {
366 inner.media.retain(|content| {
367 content.ignore_policy
368 || !policy.has_content_expired(current_time, content.last_access)
369 });
370 }
371
372 if let Some(max_cache_size) = policy.max_cache_size {
374 let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
378 (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
379 |(mut cache_size, mut items_to_remove), (index, content)| {
380 if content.ignore_policy {
381 return (cache_size, items_to_remove);
383 }
384
385 let remove_item = if items_to_remove.is_empty() {
386 if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
388 cache_size = sum;
389 cache_size > max_cache_size
391 } else {
392 true
396 }
397 } else {
398 true
400 };
401
402 if remove_item {
403 items_to_remove.push(index);
404 }
405
406 (cache_size, items_to_remove)
407 },
408 );
409
410 for index in items_to_remove {
413 inner.media.remove(index);
414 }
415 }
416
417 inner.last_media_cleanup_time = current_time;
418
419 Ok(())
420 }
421
422 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
423 Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::{MemoryMediaStore, Result};
430 use crate::{
431 media_store_inner_integration_tests, media_store_integration_tests,
432 media_store_integration_tests_time,
433 };
434
435 async fn get_media_store() -> Result<MemoryMediaStore> {
436 Ok(MemoryMediaStore::new())
437 }
438
439 media_store_inner_integration_tests!();
440 media_store_integration_tests!();
441 media_store_integration_tests_time!();
442}