matrix_sdk_base/media/store/
memory_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    num::NonZeroUsize,
18    sync::{Arc, RwLock as StdRwLock},
19};
20
21use async_trait::async_trait;
22use matrix_sdk_common::{
23    cross_process_lock::memory_store_helper::try_take_leased_lock, ring_buffer::RingBuffer,
24};
25use ruma::{
26    MxcUri, OwnedMxcUri,
27    time::{Instant, SystemTime},
28};
29
30use super::Result;
31use crate::media::{
32    MediaRequestParameters, UniqueKey as _,
33    store::{
34        IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
35        MediaStoreError, MediaStoreInner,
36    },
37};
38
39/// In-memory, non-persistent implementation of the `MediaStore`.
40///
41/// Default if no other is configured at startup.
42#[derive(Debug, Clone)]
43pub struct MemoryMediaStore {
44    inner: Arc<StdRwLock<MemoryMediaStoreInner>>,
45    media_service: MediaService,
46}
47
48#[derive(Debug)]
49struct MemoryMediaStoreInner {
50    media: RingBuffer<MediaContent>,
51    leases: HashMap<String, (String, Instant)>,
52    media_retention_policy: Option<MediaRetentionPolicy>,
53    last_media_cleanup_time: SystemTime,
54}
55
56/// A media content in the `MemoryStore`.
57#[derive(Debug)]
58struct MediaContent {
59    /// The URI of the content.
60    uri: OwnedMxcUri,
61
62    /// The unique key of the content.
63    key: String,
64
65    /// The bytes of the content.
66    data: Vec<u8>,
67
68    /// Whether we should ignore the [`MediaRetentionPolicy`] for this content.
69    ignore_policy: bool,
70
71    /// The time of the last access of the content.
72    last_access: SystemTime,
73}
74
75const NUMBER_OF_MEDIAS: NonZeroUsize = NonZeroUsize::new(20).unwrap();
76
77impl Default for MemoryMediaStore {
78    fn default() -> Self {
79        // Given that the store is empty, we won't need to clean it up right away.
80        let last_media_cleanup_time = SystemTime::now();
81        let media_service = MediaService::new();
82        media_service.restore(None, Some(last_media_cleanup_time));
83
84        Self {
85            inner: Arc::new(StdRwLock::new(MemoryMediaStoreInner {
86                media: RingBuffer::new(NUMBER_OF_MEDIAS),
87                leases: Default::default(),
88                media_retention_policy: None,
89                last_media_cleanup_time,
90            })),
91            media_service,
92        }
93    }
94}
95
96impl MemoryMediaStore {
97    /// Create a new empty MemoryMediaStore
98    pub fn new() -> Self {
99        Self::default()
100    }
101}
102
103#[cfg_attr(target_family = "wasm", async_trait(?Send))]
104#[cfg_attr(not(target_family = "wasm"), async_trait)]
105impl MediaStore for MemoryMediaStore {
106    type Error = MediaStoreError;
107
108    async fn try_take_leased_lock(
109        &self,
110        lease_duration_ms: u32,
111        key: &str,
112        holder: &str,
113    ) -> Result<bool, Self::Error> {
114        let mut inner = self.inner.write().unwrap();
115
116        Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
117    }
118
119    async fn add_media_content(
120        &self,
121        request: &MediaRequestParameters,
122        data: Vec<u8>,
123        ignore_policy: IgnoreMediaRetentionPolicy,
124    ) -> Result<(), Self::Error> {
125        self.media_service.add_media_content(self, request, data, ignore_policy).await
126    }
127
128    async fn replace_media_key(
129        &self,
130        from: &MediaRequestParameters,
131        to: &MediaRequestParameters,
132    ) -> Result<(), Self::Error> {
133        let expected_key = from.unique_key();
134
135        let mut inner = self.inner.write().unwrap();
136
137        if let Some(media_content) =
138            inner.media.iter_mut().find(|media_content| media_content.key == expected_key)
139        {
140            media_content.uri = to.uri().to_owned();
141            media_content.key = to.unique_key();
142        }
143
144        Ok(())
145    }
146
147    async fn get_media_content(
148        &self,
149        request: &MediaRequestParameters,
150    ) -> Result<Option<Vec<u8>>, Self::Error> {
151        self.media_service.get_media_content(self, request).await
152    }
153
154    async fn remove_media_content(
155        &self,
156        request: &MediaRequestParameters,
157    ) -> Result<(), Self::Error> {
158        let expected_key = request.unique_key();
159
160        let mut inner = self.inner.write().unwrap();
161
162        let Some(index) =
163            inner.media.iter().position(|media_content| media_content.key == expected_key)
164        else {
165            return Ok(());
166        };
167
168        inner.media.remove(index);
169
170        Ok(())
171    }
172
173    async fn get_media_content_for_uri(
174        &self,
175        uri: &MxcUri,
176    ) -> Result<Option<Vec<u8>>, Self::Error> {
177        self.media_service.get_media_content_for_uri(self, uri).await
178    }
179
180    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<(), Self::Error> {
181        let mut inner = self.inner.write().unwrap();
182
183        let positions = inner
184            .media
185            .iter()
186            .enumerate()
187            .filter_map(|(position, media_content)| (media_content.uri == uri).then_some(position))
188            .collect::<Vec<_>>();
189
190        // Iterate in reverse-order so that positions stay valid after first removals.
191        for position in positions.into_iter().rev() {
192            inner.media.remove(position);
193        }
194
195        Ok(())
196    }
197
198    async fn set_media_retention_policy(
199        &self,
200        policy: MediaRetentionPolicy,
201    ) -> Result<(), Self::Error> {
202        self.media_service.set_media_retention_policy(self, policy).await
203    }
204
205    fn media_retention_policy(&self) -> MediaRetentionPolicy {
206        self.media_service.media_retention_policy()
207    }
208
209    async fn set_ignore_media_retention_policy(
210        &self,
211        request: &MediaRequestParameters,
212        ignore_policy: IgnoreMediaRetentionPolicy,
213    ) -> Result<(), Self::Error> {
214        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
215    }
216
217    async fn clean(&self) -> Result<(), Self::Error> {
218        self.media_service.clean(self).await
219    }
220}
221
222#[cfg_attr(target_family = "wasm", async_trait(?Send))]
223#[cfg_attr(not(target_family = "wasm"), async_trait)]
224impl MediaStoreInner for MemoryMediaStore {
225    type Error = MediaStoreError;
226
227    async fn media_retention_policy_inner(
228        &self,
229    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
230        Ok(self.inner.read().unwrap().media_retention_policy)
231    }
232
233    async fn set_media_retention_policy_inner(
234        &self,
235        policy: MediaRetentionPolicy,
236    ) -> Result<(), Self::Error> {
237        self.inner.write().unwrap().media_retention_policy = Some(policy);
238        Ok(())
239    }
240
241    async fn add_media_content_inner(
242        &self,
243        request: &MediaRequestParameters,
244        data: Vec<u8>,
245        last_access: SystemTime,
246        policy: MediaRetentionPolicy,
247        ignore_policy: IgnoreMediaRetentionPolicy,
248    ) -> Result<(), Self::Error> {
249        // Avoid duplication. Let's try to remove it first.
250        self.remove_media_content(request).await?;
251
252        let ignore_policy = ignore_policy.is_yes();
253
254        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
255            // Do not store it.
256            return Ok(());
257        }
258
259        // Now, let's add it.
260        let mut inner = self.inner.write().unwrap();
261        inner.media.push(MediaContent {
262            uri: request.uri().to_owned(),
263            key: request.unique_key(),
264            data,
265            ignore_policy,
266            last_access,
267        });
268
269        Ok(())
270    }
271
272    async fn set_ignore_media_retention_policy_inner(
273        &self,
274        request: &MediaRequestParameters,
275        ignore_policy: IgnoreMediaRetentionPolicy,
276    ) -> Result<(), Self::Error> {
277        let mut inner = self.inner.write().unwrap();
278        let expected_key = request.unique_key();
279
280        if let Some(media_content) = inner.media.iter_mut().find(|media| media.key == expected_key)
281        {
282            media_content.ignore_policy = ignore_policy.is_yes();
283        }
284
285        Ok(())
286    }
287
288    async fn get_media_content_inner(
289        &self,
290        request: &MediaRequestParameters,
291        current_time: SystemTime,
292    ) -> Result<Option<Vec<u8>>, Self::Error> {
293        let mut inner = self.inner.write().unwrap();
294        let expected_key = request.unique_key();
295
296        // First get the content out of the buffer, we are going to put it back at the
297        // end.
298        let Some(index) = inner.media.iter().position(|media| media.key == expected_key) else {
299            return Ok(None);
300        };
301        let Some(mut content) = inner.media.remove(index) else {
302            return Ok(None);
303        };
304
305        // Clone the data.
306        let data = content.data.clone();
307
308        // Update the last access time.
309        content.last_access = current_time;
310
311        // Put it back in the buffer.
312        inner.media.push(content);
313
314        Ok(Some(data))
315    }
316
317    async fn get_media_content_for_uri_inner(
318        &self,
319        expected_uri: &MxcUri,
320        current_time: SystemTime,
321    ) -> Result<Option<Vec<u8>>, Self::Error> {
322        let mut inner = self.inner.write().unwrap();
323
324        // First get the content out of the buffer, we are going to put it back at the
325        // end.
326        let Some(index) = inner.media.iter().position(|media| media.uri == expected_uri) else {
327            return Ok(None);
328        };
329        let Some(mut content) = inner.media.remove(index) else {
330            return Ok(None);
331        };
332
333        // Clone the data.
334        let data = content.data.clone();
335
336        // Update the last access time.
337        content.last_access = current_time;
338
339        // Put it back in the buffer.
340        inner.media.push(content);
341
342        Ok(Some(data))
343    }
344
345    async fn clean_inner(
346        &self,
347        policy: MediaRetentionPolicy,
348        current_time: SystemTime,
349    ) -> Result<(), Self::Error> {
350        if !policy.has_limitations() {
351            // We can safely skip all the checks.
352            return Ok(());
353        }
354
355        let mut inner = self.inner.write().unwrap();
356
357        // First, check media content that exceed the max filesize.
358        if policy.computed_max_file_size().is_some() {
359            inner.media.retain(|content| {
360                content.ignore_policy || !policy.exceeds_max_file_size(content.data.len() as u64)
361            });
362        }
363
364        // Then, clean up expired media content.
365        if policy.last_access_expiry.is_some() {
366            inner.media.retain(|content| {
367                content.ignore_policy
368                    || !policy.has_content_expired(current_time, content.last_access)
369            });
370        }
371
372        // Finally, if the cache size is too big, remove old items until it fits.
373        if let Some(max_cache_size) = policy.max_cache_size {
374            // Reverse the iterator because in case the cache size is overflowing, we want
375            // to count the number of old items to remove. Items are sorted by last access
376            // and old items are at the start.
377            let (_, items_to_remove) = inner.media.iter().enumerate().rev().fold(
378                (0u64, Vec::with_capacity(NUMBER_OF_MEDIAS.into())),
379                |(mut cache_size, mut items_to_remove), (index, content)| {
380                    if content.ignore_policy {
381                        // Do not count it.
382                        return (cache_size, items_to_remove);
383                    }
384
385                    let remove_item = if items_to_remove.is_empty() {
386                        // We have not reached the max cache size yet.
387                        if let Some(sum) = cache_size.checked_add(content.data.len() as u64) {
388                            cache_size = sum;
389                            // Start removing items if we have exceeded the max cache size.
390                            cache_size > max_cache_size
391                        } else {
392                            // The cache size is overflowing, remove the remaining items, since the
393                            // max cache size cannot be bigger than
394                            // usize::MAX.
395                            true
396                        }
397                    } else {
398                        // We have reached the max cache size already, just remove it.
399                        true
400                    };
401
402                    if remove_item {
403                        items_to_remove.push(index);
404                    }
405
406                    (cache_size, items_to_remove)
407                },
408            );
409
410            // The indexes are already in reverse order so we can just iterate in that order
411            // to remove them starting by the end.
412            for index in items_to_remove {
413                inner.media.remove(index);
414            }
415        }
416
417        inner.last_media_cleanup_time = current_time;
418
419        Ok(())
420    }
421
422    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
423        Ok(Some(self.inner.read().unwrap().last_media_cleanup_time))
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::{MemoryMediaStore, Result};
430    use crate::{
431        media_store_inner_integration_tests, media_store_integration_tests,
432        media_store_integration_tests_time,
433    };
434
435    async fn get_media_store() -> Result<MemoryMediaStore> {
436        Ok(MemoryMediaStore::new())
437    }
438
439    media_store_inner_integration_tests!();
440    media_store_integration_tests!();
441    media_store_integration_tests_time!();
442}