Skip to main content

matrix_sdk_base/media/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    num::NonZeroUsize,
18    sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23    cross_process_lock::{
24        CrossProcessLockGeneration,
25        memory_store_helper::{Lease, try_take_leased_lock},
26    },
27    ring_buffer::RingBuffer,
28};
29use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
30
31use super::Result;
32use crate::media::{
33    MediaRequestParameters, UniqueKey as _,
34    store::{
35        IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
36        MediaStoreError, MediaStoreInner,
37    },
38};
39
40/// In-memory, non-persistent implementation of the `MediaStore`.
41///
42/// Default if no other is configured at startup.
43#[derive(Debug, Clone)]
44pub struct MemoryMediaStore {
45    inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
46    media_service: MediaService,
47}
48
49#[derive(Debug)]
50struct MemoryMediaStoreInner {
51    media: RingBuffer<MediaContent>,
52    leases: HashMap<String, Lease>,
53    media_retention_policy: Option<MediaRetentionPolicy>,
54    last_media_cleanup_time: SystemTime,
55}
56
57/// A media content in the `MemoryStore`.
58#[derive(Debug)]
59struct MediaContent {
60    /// The URI of the content.
61    uri: OwnedMxcUri,
62
63    /// The unique key of the content.
64    key: String,
65
66    /// The bytes of the content.
67    data: Vec<u8>,
68
69    /// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
70    ignore_policy: bool,
71
72    /// The time of the last access of the content.
73    last_access: SystemTime,
74}
75
76const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
77
78impl Default for MemoryMediaStore {
79    fn default() -> Self {
80        // Given that the store is empty, we won't need to clean it up right away.
81        let last_media_cleanup_time = SystemTime::now();
82        let media_service = MediaService::new();
83        media_service.restore(None, Some(last_media_cleanup_time));
84
85        Self {
86            inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
87                media: RingBuffer::new(NUMBER_OF_MEDIAS),
88                leases: Default::default(),
89                media_retention_policy: None,
90                last_media_cleanup_time,
91            })),
92            media_service,
93        }
94    }
95}
96
97impl MemoryMediaStore {
98    /// Create a new empty MemoryMediaStore
99    pub fn new() -> Self {
100        Self::default()
101    }
102}
103
104#[cfg_attr(target_family = "wasm", async_trait(?Send))]
105#[cfg_attr(not(target_family = "wasm"), async_trait)]
106impl MediaStore for MemoryMediaStore {
107    type Error = MediaStoreError;
108
109    async fn try_take_leased_lock(
110        &self,
111        lease_duration_ms: u32,
112        key: &str,
113        holder: &str,
114    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
115        let mut inner = self.inner.write().unwrap();
116
117        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
118    }
119
120    async fn add_media_content(
121        &self,
122        request: &MediaRequestParameters,
123        data: Vec<u8>,
124        ignore_policy: IgnoreMediaRetentionPolicy,
125    ) -> Result<(), Self::Error> {
126        self.media_service.add_media_content(self, request, data, ignore_policy).await
127    }
128
129    async fn replace_media_key(
130        &self,
131        from: &MediaRequestParameters,
132        to: &MediaRequestParameters,
133    ) -> Result<(), Self::Error> {
134        let expected_key = from.unique_key();
135
136        let mut inner = self.inner.write().unwrap();
137
138        if let Some(media_content) =
139            inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
140        {
141            media_content.uri = to.uri().to_owned();
142            media_content.key = to.unique_key();
143        }
144
145        Ok(())
146    }
147
148    async fn get_media_content(
149        &self,
150        request: &MediaRequestParameters,
151    ) -> Result<Option<Vec<u8>>, Self::Error> {
152        self.media_service.get_media_content(self, request).await
153    }
154
155    async fn remove_media_content(
156        &self,
157        request: &MediaRequestParameters,
158    ) -> Result<(), Self::Error> {
159        let expected_key = request.unique_key();
160
161        let mut inner = self.inner.write().unwrap();
162
163        let Some(index) =
164            inner.media.iter().position(|media_content| media_content.key == expected_key)
165        else {
166            return Ok(());
167        };
168
169        inner.media.remove(index);
170
171        Ok(())
172    }
173
174    async fn get_media_content_for_uri(
175        &self,
176        uri: &MxcUri,
177    ) -> Result<Option<Vec<u8>>, Self::Error> {
178        self.media_service.get_media_content_for_uri(self, uri).await
179    }
180
181    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
182        let mut inner = self.inner.write().unwrap();
183
184        let positions = inner
185            .media
186            .iter()
187            .enumerate()
188            .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
189            .collect::<Vec<_>>();
190
191        // Iterate in reverse-order so that positions stay valid after first removals.
192        for position in positions.into_iter().rev() {
193            inner.media.remove(position);
194        }
195
196        Ok(())
197    }
198
199    async fn set_media_retention_policy(
200        &self,
201        policy: MediaRetentionPolicy,
202    ) -> Result<(), Self::Error> {
203        self.media_service.set_media_retention_policy(self, policy).await
204    }
205
206    fn media_retention_policy(&self) -> MediaRetentionPolicy {
207        self.media_service.media_retention_policy()
208    }
209
210    async fn set_ignore_media_retention_policy(
211        &self,
212        request: &MediaRequestParameters,
213        ignore_policy: IgnoreMediaRetentionPolicy,
214    ) -> Result<(), Self::Error> {
215        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
216    }
217
218    async fn clean(&self) -> Result<(), Self::Error> {
219        self.media_service.clean(self).await
220    }
221
222    async fn optimize(&self) -> Result<(), Self::Error> {
223        Ok(())
224    }
225
226    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
227        Ok(None)
228    }
229
230    async fn close(&self) -> Result<(), Self::Error> {
231        Ok(())
232    }
233
234    async fn reopen(&self) -> Result<(), Self::Error> {
235        Ok(())
236    }
237}
238
239#[cfg_attr(target_family = "wasm", async_trait(?Send))]
240#[cfg_attr(not(target_family = "wasm"), async_trait)]
241impl MediaStoreInner for MemoryMediaStore {
242    type Error = MediaStoreError;
243
244    async fn media_retention_policy_inner(
245        &self,
246    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
247        Ok(self.inner.read().unwrap().media_retention_policy)
248    }
249
250    async fn set_media_retention_policy_inner(
251        &self,
252        policy: MediaRetentionPolicy,
253    ) -> Result<(), Self::Error> {
254        self.inner.write().unwrap().media_retention_policy = Some(policy);
255        Ok(())
256    }
257
258    async fn add_media_content_inner(
259        &self,
260        request: &MediaRequestParameters,
261        data: Vec<u8>,
262        last_access: SystemTime,
263        policy: MediaRetentionPolicy,
264        ignore_policy: IgnoreMediaRetentionPolicy,
265    ) -> Result<(), Self::Error> {
266        // Avoid duplication. Let's try to remove it first.
267        self.remove_media_content(request).await?;
268
269        let ignore_policy = ignore_policy.is_yes();
270
271        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
272            // Do not store it.
273            return Ok(());
274        }
275
276        // Now, let's add it.
277        let mut inner = self.inner.write().unwrap();
278        inner.media.push(MediaContent {
279            uri: request.uri().to_owned(),
280            key: request.unique_key(),
281            data,
282            ignore_policy,
283            last_access,
284        });
285
286        Ok(())
287    }
288
289    async fn set_ignore_media_retention_policy_inner(
290        &self,
291        request: &MediaRequestParameters,
292        ignore_policy: IgnoreMediaRetentionPolicy,
293    ) -> Result<(), Self::Error> {
294        let mut inner = self.inner.write().unwrap();
295        let expected_key = request.unique_key();
296
297        if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
298        {
299            media_content.ignore_policy = ignore_policy.is_yes();
300        }
301
302        Ok(())
303    }
304
305    async fn get_media_content_inner(
306        &self,
307        request: &MediaRequestParameters,
308        current_time: SystemTime,
309    ) -> Result<Option<Vec<u8>>, Self::Error> {
310        let mut inner = self.inner.write().unwrap();
311        let expected_key = request.unique_key();
312
313        // First get the content out of the buffer, we are going to put it back at the
314        // end.
315        let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
316            return Ok(None);
317        };
318        let Some(mut content) = inner.media.remove(index) else {
319            return Ok(None);
320        };
321
322        // Clone the data.
323        let data = content.data.clone();
324
325        // Update the last access time.
326        content.last_access = current_time;
327
328        // Put it back in the buffer.
329        inner.media.push(content);
330
331        Ok(Some(data))
332    }
333
334    async fn get_media_content_for_uri_inner(
335        &self,
336        expected_uri: &MxcUri,
337        current_time: SystemTime,
338    ) -> Result<Option<Vec<u8>>, Self::Error> {
339        let mut inner = self.inner.write().unwrap();
340
341        // First get the content out of the buffer, we are going to put it back at the
342        // end.
343        let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
344            return Ok(None);
345        };
346        let Some(mut content) = inner.media.remove(index) else {
347            return Ok(None);
348        };
349
350        // Clone the data.
351        let data = content.data.clone();
352
353        // Update the last access time.
354        content.last_access = current_time;
355
356        // Put it back in the buffer.
357        inner.media.push(content);
358
359        Ok(Some(data))
360    }
361
362    async fn clean_inner(
363        &self,
364        policy: MediaRetentionPolicy,
365        current_time: SystemTime,
366    ) -> Result<(), Self::Error> {
367        if !policy.has_limitations() {
368            // We can safely skip all the checks.
369            return Ok(());
370        }
371
372        let mut inner = self.inner.write().unwrap();
373
374        // First, check media content that exceed the max filesize.
375        if policy.computed_max_file_size().is_some() {
376            inner.media.retain(|content| {
377                content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
378            });
379        }
380
381        // Then, clean up expired media content.
382        if policy.last_access_expiry.is_some() {
383            inner.media.retain(|content| {
384                content.ignore_policy
385                    || !policy.has_content_expired(current_time, content.last_access)
386            });
387        }
388
389        // Finally, if the cache size is too big, remove old items until it fits.
390        if let Some(max_cache_size) = policy.max_cache_size {
391            // Reverse the iterator because in case the cache size is overflowing, we want
392            // to count the number of old items to remove. Items are sorted by last access
393            // and old items are at the start.
394            let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
395                (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
396                |(mut cache_size, mut items_to_remove), (index, content)| {
397                    if content.ignore_policy {
398                        // Do not count it.
399                        return (cache_size, items_to_remove);
400                    }
401
402                    let remove_item = if items_to_remove.is_empty() {
403                        // We have not reached the max cache size yet.
404                        if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
405                            cache_size = sum;
406                            // Start removing items if we have exceeded the max cache size.
407                            cache_size > max_cache_size
408                        } else {
409                            // The cache size is overflowing, remove the remaining items, since the
410                            // max cache size cannot be bigger than
411                            // usize::MAX.
412                            true
413                        }
414                    } else {
415                        // We have reached the max cache size already, just remove it.
416                        true
417                    };
418
419                    if remove_item {
420                        items_to_remove.push(index);
421                    }
422
423                    (cache_size, items_to_remove)
424                },
425            );
426
427            // The indexes are already in reverse order so we can just iterate in that order
428            // to remove them starting by the end.
429            for index in items_to_remove {
430                inner.media.remove(index);
431            }
432        }
433
434        inner.last_media_cleanup_time = current_time;
435
436        Ok(())
437    }
438
439    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
440        Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::{MemoryMediaStore, Result};
447    use crate::{
448        media_store_inner_integration_tests, media_store_integration_tests,
449        media_store_integration_tests_time,
450    };
451
452    async fn get_media_store() -> Result<MemoryMediaStore> {
453        Ok(MemoryMediaStore::new())
454    }
455
456    media_store_inner_integration_tests!();
457    media_store_integration_tests!();
458    media_store_integration_tests_time!();
459}