matrix_sdk_base/media/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    num::NonZeroUsize,
18    sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23    cross_process_lock::{
24        CrossProcessLockGeneration,
25        memory_store_helper::{Lease, try_take_leased_lock},
26    },
27    ring_buffer::RingBuffer,
28};
29use ruma::{MxcUri, OwnedMxcUri, time::SystemTime};
30
31use super::Result;
32use crate::media::{
33    MediaRequestParameters, UniqueKey as _,
34    store::{
35        IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
36        MediaStoreError, MediaStoreInner,
37    },
38};
39
40/// In-memory, non-persistent implementation of the `MediaStore`.
41///
42/// Default if no other is configured at startup.
43#[derive(Debug, Clone)]
44pub struct MemoryMediaStore {
45    inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
46    media_service: MediaService,
47}
48
49#[derive(Debug)]
50struct MemoryMediaStoreInner {
51    media: RingBuffer<MediaContent>,
52    leases: HashMap<String, Lease>,
53    media_retention_policy: Option<MediaRetentionPolicy>,
54    last_media_cleanup_time: SystemTime,
55}
56
57/// A media content in the `MemoryStore`.
58#[derive(Debug)]
59struct MediaContent {
60    /// The URI of the content.
61    uri: OwnedMxcUri,
62
63    /// The unique key of the content.
64    key: String,
65
66    /// The bytes of the content.
67    data: Vec<u8>,
68
69    /// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
70    ignore_policy: bool,
71
72    /// The time of the last access of the content.
73    last_access: SystemTime,
74}
75
76const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
77
78impl Default for MemoryMediaStore {
79    fn default() -> Self {
80        // Given that the store is empty, we won't need to clean it up right away.
81        let last_media_cleanup_time = SystemTime::now();
82        let media_service = MediaService::new();
83        media_service.restore(None, Some(last_media_cleanup_time));
84
85        Self {
86            inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
87                media: RingBuffer::new(NUMBER_OF_MEDIAS),
88                leases: Default::default(),
89                media_retention_policy: None,
90                last_media_cleanup_time,
91            })),
92            media_service,
93        }
94    }
95}
96
97impl MemoryMediaStore {
98    /// Create a new empty MemoryMediaStore
99    pub fn new() -> Self {
100        Self::default()
101    }
102}
103
104#[cfg_attr(target_family = "wasm", async_trait(?Send))]
105#[cfg_attr(not(target_family = "wasm"), async_trait)]
106impl MediaStore for MemoryMediaStore {
107    type Error = MediaStoreError;
108
109    async fn try_take_leased_lock(
110        &self,
111        lease_duration_ms: u32,
112        key: &str,
113        holder: &str,
114    ) -> Result<Option<CrossProcessLockGeneration>, Self::Error> {
115        let mut inner = self.inner.write().unwrap();
116
117        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
118    }
119
120    async fn add_media_content(
121        &self,
122        request: &MediaRequestParameters,
123        data: Vec<u8>,
124        ignore_policy: IgnoreMediaRetentionPolicy,
125    ) -> Result<(), Self::Error> {
126        self.media_service.add_media_content(self, request, data, ignore_policy).await
127    }
128
129    async fn replace_media_key(
130        &self,
131        from: &MediaRequestParameters,
132        to: &MediaRequestParameters,
133    ) -> Result<(), Self::Error> {
134        let expected_key = from.unique_key();
135
136        let mut inner = self.inner.write().unwrap();
137
138        if let Some(media_content) =
139            inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
140        {
141            media_content.uri = to.uri().to_owned();
142            media_content.key = to.unique_key();
143        }
144
145        Ok(())
146    }
147
148    async fn get_media_content(
149        &self,
150        request: &MediaRequestParameters,
151    ) -> Result<Option<Vec<u8>>, Self::Error> {
152        self.media_service.get_media_content(self, request).await
153    }
154
155    async fn remove_media_content(
156        &self,
157        request: &MediaRequestParameters,
158    ) -> Result<(), Self::Error> {
159        let expected_key = request.unique_key();
160
161        let mut inner = self.inner.write().unwrap();
162
163        let Some(index) =
164            inner.media.iter().position(|media_content| media_content.key == expected_key)
165        else {
166            return Ok(());
167        };
168
169        inner.media.remove(index);
170
171        Ok(())
172    }
173
174    async fn get_media_content_for_uri(
175        &self,
176        uri: &MxcUri,
177    ) -> Result<Option<Vec<u8>>, Self::Error> {
178        self.media_service.get_media_content_for_uri(self, uri).await
179    }
180
181    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
182        let mut inner = self.inner.write().unwrap();
183
184        let positions = inner
185            .media
186            .iter()
187            .enumerate()
188            .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
189            .collect::<Vec<_>>();
190
191        // Iterate in reverse-order so that positions stay valid after first removals.
192        for position in positions.into_iter().rev() {
193            inner.media.remove(position);
194        }
195
196        Ok(())
197    }
198
199    async fn set_media_retention_policy(
200        &self,
201        policy: MediaRetentionPolicy,
202    ) -> Result<(), Self::Error> {
203        self.media_service.set_media_retention_policy(self, policy).await
204    }
205
206    fn media_retention_policy(&self) -> MediaRetentionPolicy {
207        self.media_service.media_retention_policy()
208    }
209
210    async fn set_ignore_media_retention_policy(
211        &self,
212        request: &MediaRequestParameters,
213        ignore_policy: IgnoreMediaRetentionPolicy,
214    ) -> Result<(), Self::Error> {
215        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
216    }
217
218    async fn clean(&self) -> Result<(), Self::Error> {
219        self.media_service.clean(self).await
220    }
221}
222
223#[cfg_attr(target_family = "wasm", async_trait(?Send))]
224#[cfg_attr(not(target_family = "wasm"), async_trait)]
225impl MediaStoreInner for MemoryMediaStore {
226    type Error = MediaStoreError;
227
228    async fn media_retention_policy_inner(
229        &self,
230    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
231        Ok(self.inner.read().unwrap().media_retention_policy)
232    }
233
234    async fn set_media_retention_policy_inner(
235        &self,
236        policy: MediaRetentionPolicy,
237    ) -> Result<(), Self::Error> {
238        self.inner.write().unwrap().media_retention_policy = Some(policy);
239        Ok(())
240    }
241
242    async fn add_media_content_inner(
243        &self,
244        request: &MediaRequestParameters,
245        data: Vec<u8>,
246        last_access: SystemTime,
247        policy: MediaRetentionPolicy,
248        ignore_policy: IgnoreMediaRetentionPolicy,
249    ) -> Result<(), Self::Error> {
250        // Avoid duplication. Let's try to remove it first.
251        self.remove_media_content(request).await?;
252
253        let ignore_policy = ignore_policy.is_yes();
254
255        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
256            // Do not store it.
257            return Ok(());
258        }
259
260        // Now, let's add it.
261        let mut inner = self.inner.write().unwrap();
262        inner.media.push(MediaContent {
263            uri: request.uri().to_owned(),
264            key: request.unique_key(),
265            data,
266            ignore_policy,
267            last_access,
268        });
269
270        Ok(())
271    }
272
273    async fn set_ignore_media_retention_policy_inner(
274        &self,
275        request: &MediaRequestParameters,
276        ignore_policy: IgnoreMediaRetentionPolicy,
277    ) -> Result<(), Self::Error> {
278        let mut inner = self.inner.write().unwrap();
279        let expected_key = request.unique_key();
280
281        if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
282        {
283            media_content.ignore_policy = ignore_policy.is_yes();
284        }
285
286        Ok(())
287    }
288
289    async fn get_media_content_inner(
290        &self,
291        request: &MediaRequestParameters,
292        current_time: SystemTime,
293    ) -> Result<Option<Vec<u8>>, Self::Error> {
294        let mut inner = self.inner.write().unwrap();
295        let expected_key = request.unique_key();
296
297        // First get the content out of the buffer, we are going to put it back at the
298        // end.
299        let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
300            return Ok(None);
301        };
302        let Some(mut content) = inner.media.remove(index) else {
303            return Ok(None);
304        };
305
306        // Clone the data.
307        let data = content.data.clone();
308
309        // Update the last access time.
310        content.last_access = current_time;
311
312        // Put it back in the buffer.
313        inner.media.push(content);
314
315        Ok(Some(data))
316    }
317
318    async fn get_media_content_for_uri_inner(
319        &self,
320        expected_uri: &MxcUri,
321        current_time: SystemTime,
322    ) -> Result<Option<Vec<u8>>, Self::Error> {
323        let mut inner = self.inner.write().unwrap();
324
325        // First get the content out of the buffer, we are going to put it back at the
326        // end.
327        let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
328            return Ok(None);
329        };
330        let Some(mut content) = inner.media.remove(index) else {
331            return Ok(None);
332        };
333
334        // Clone the data.
335        let data = content.data.clone();
336
337        // Update the last access time.
338        content.last_access = current_time;
339
340        // Put it back in the buffer.
341        inner.media.push(content);
342
343        Ok(Some(data))
344    }
345
346    async fn clean_inner(
347        &self,
348        policy: MediaRetentionPolicy,
349        current_time: SystemTime,
350    ) -> Result<(), Self::Error> {
351        if !policy.has_limitations() {
352            // We can safely skip all the checks.
353            return Ok(());
354        }
355
356        let mut inner = self.inner.write().unwrap();
357
358        // First, check media content that exceed the max filesize.
359        if policy.computed_max_file_size().is_some() {
360            inner.media.retain(|content| {
361                content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
362            });
363        }
364
365        // Then, clean up expired media content.
366        if policy.last_access_expiry.is_some() {
367            inner.media.retain(|content| {
368                content.ignore_policy
369                    || !policy.has_content_expired(current_time, content.last_access)
370            });
371        }
372
373        // Finally, if the cache size is too big, remove old items until it fits.
374        if let Some(max_cache_size) = policy.max_cache_size {
375            // Reverse the iterator because in case the cache size is overflowing, we want
376            // to count the number of old items to remove. Items are sorted by last access
377            // and old items are at the start.
378            let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
379                (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
380                |(mut cache_size, mut items_to_remove), (index, content)| {
381                    if content.ignore_policy {
382                        // Do not count it.
383                        return (cache_size, items_to_remove);
384                    }
385
386                    let remove_item = if items_to_remove.is_empty() {
387                        // We have not reached the max cache size yet.
388                        if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
389                            cache_size = sum;
390                            // Start removing items if we have exceeded the max cache size.
391                            cache_size > max_cache_size
392                        } else {
393                            // The cache size is overflowing, remove the remaining items, since the
394                            // max cache size cannot be bigger than
395                            // usize::MAX.
396                            true
397                        }
398                    } else {
399                        // We have reached the max cache size already, just remove it.
400                        true
401                    };
402
403                    if remove_item {
404                        items_to_remove.push(index);
405                    }
406
407                    (cache_size, items_to_remove)
408                },
409            );
410
411            // The indexes are already in reverse order so we can just iterate in that order
412            // to remove them starting by the end.
413            for index in items_to_remove {
414                inner.media.remove(index);
415            }
416        }
417
418        inner.last_media_cleanup_time = current_time;
419
420        Ok(())
421    }
422
423    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
424        Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::{MemoryMediaStore, Result};
431    use crate::{
432        media_store_inner_integration_tests, media_store_integration_tests,
433        media_store_integration_tests_time,
434    };
435
436    async fn get_media_store() -> Result<MemoryMediaStore> {
437        Ok(MemoryMediaStore::new())
438    }
439
440    media_store_inner_integration_tests!();
441    media_store_integration_tests!();
442    media_store_integration_tests_time!();
443}