matrix_sdk_base/media/store/
media_service.rs

1// Copyright 2025 Kévin Commaille
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use matrix_sdk_common::{
18    SendOutsideWasm, SyncOutsideWasm,
19    executor::{JoinHandle, spawn},
20    locks::Mutex,
21};
22use ruma::{MxcUri, time::SystemTime};
23use tokio::sync::Mutex as AsyncMutex;
24use tracing::error;
25
26use super::{MediaRetentionPolicy, MediaStoreInner};
27use crate::media::MediaRequestParameters;
28
29/// API for implementors of [`MediaStore`] to manage their media through
30/// their implementation of [`MediaStoreInner`].
31///
32/// [`MediaStore`]: crate::media::store::MediaStore
33#[derive(Debug)]
34pub struct MediaService<Time: TimeProvider = DefaultTimeProvider> {
35    inner: Arc<MediaServiceInner<Time>>,
36}
37
38#[derive(Debug)]
39struct MediaServiceInner<Time: TimeProvider = DefaultTimeProvider> {
40    /// The time provider.
41    time_provider: Time,
42
43    /// The current [`MediaRetentionPolicy`].
44    policy: Mutex<MediaRetentionPolicy>,
45
46    /// A mutex to ensure a single cleanup is running at a time.
47    cleanup_guard: AsyncMutex<()>,
48
49    /// The time of the last media cache cleanup.
50    last_media_cleanup_time: Mutex<Option<SystemTime>>,
51
52    /// The [`JoinHandle`] for an automatic media cleanup task.
53    ///
54    /// Used to ensure that only one automatic cleanup is running at a time, and
55    /// to stop the cleanup when the [`MediaServiceInner`] is dropped.
56    automatic_media_cleanup_join_handle: Mutex<Option<JoinHandle<()>>>,
57}
58
59impl MediaService {
60    /// Construct a new default `MediaService`.
61    ///
62    /// [`MediaService::restore()`] should be called after constructing the
63    /// `MediaService` to restore its previous state.
64    pub fn new() -> Self {
65        Self::default()
66    }
67}
68
69impl Default for MediaService {
70    fn default() -> Self {
71        Self::with_time_provider(DefaultTimeProvider)
72    }
73}
74
75impl<Time> MediaService<Time>
76where
77    Time: TimeProvider + 'static,
78{
79    /// Construct a new `MediaService` with the given `TimeProvider` and an
80    /// empty `MediaRetentionPolicy`.
81    fn with_time_provider(time_provider: Time) -> Self {
82        let inner = MediaServiceInner {
83            time_provider,
84            policy: Mutex::new(MediaRetentionPolicy::empty()),
85            cleanup_guard: AsyncMutex::new(()),
86            last_media_cleanup_time: Mutex::new(None),
87            automatic_media_cleanup_join_handle: Mutex::new(None),
88        };
89
90        Self { inner: Arc::new(inner) }
91    }
92
93    /// Restore the previous state of the [`MediaRetentionPolicy`] from data
94    /// that was persisted in the store.
95    ///
96    /// This should be called immediately after constructing the `MediaService`.
97    ///
98    /// # Arguments
99    ///
100    /// * `policy` - The `MediaRetentionPolicy` that was persisted in the store.
101    pub fn restore(
102        &self,
103        policy: Option<MediaRetentionPolicy>,
104        last_media_cleanup_time: Option<SystemTime>,
105    ) {
106        if let Some(policy) = policy {
107            *self.inner.policy.lock() = policy;
108        }
109
110        if let Some(time) = last_media_cleanup_time {
111            *self.inner.last_media_cleanup_time.lock() = Some(time);
112        }
113    }
114
115    /// Get the current time from the inner [`TimeProvider`].
116    fn now(&self) -> SystemTime {
117        self.inner.time_provider.now()
118    }
119
120    /// Set the `MediaRetentionPolicy` of this service.
121    ///
122    /// # Arguments
123    ///
124    /// * `store` - The `MediaStoreInner`.
125    ///
126    /// * `policy` - The `MediaRetentionPolicy` to use.
127    pub async fn set_media_retention_policy<Store: MediaStoreInner + 'static>(
128        &self,
129        store: &Store,
130        policy: MediaRetentionPolicy,
131    ) -> Result<(), Store::Error> {
132        store.set_media_retention_policy_inner(policy).await?;
133
134        *self.inner.policy.lock() = policy;
135
136        self.maybe_spawn_automatic_media_cache_cleanup(store, self.now());
137
138        Ok(())
139    }
140
141    /// Get the `MediaRetentionPolicy` of this service.
142    pub fn media_retention_policy(&self) -> MediaRetentionPolicy {
143        *self.inner.policy.lock()
144    }
145
146    /// Add a media file's content in the media store.
147    ///
148    /// # Arguments
149    ///
150    /// * `store` - The `MediaStoreInner`.
151    ///
152    /// * `request` - The `MediaRequestParameters` of the file.
153    ///
154    /// * `content` - The content of the file.
155    ///
156    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
157    ///   ignored.
158    pub async fn add_media_content<Store: MediaStoreInner + 'static>(
159        &self,
160        store: &Store,
161        request: &MediaRequestParameters,
162        content: Vec<u8>,
163        ignore_policy: IgnoreMediaRetentionPolicy,
164    ) -> Result<(), Store::Error> {
165        let policy = self.media_retention_policy();
166
167        if ignore_policy == IgnoreMediaRetentionPolicy::No
168            && policy.exceeds_max_file_size(content.len() as u64)
169        {
170            // We do not cache the content.
171            return Ok(());
172        }
173
174        let current_time = self.now();
175        store
176            .add_media_content_inner(request, content, current_time, policy, ignore_policy)
177            .await?;
178
179        self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
180
181        Ok(())
182    }
183
184    /// Set whether the current [`MediaRetentionPolicy`] should be ignored for
185    /// the media.
186    ///
187    /// The change will be taken into account in the next cleanup.
188    ///
189    /// # Arguments
190    ///
191    /// * `store` - The `MediaStoreInner`.
192    ///
193    /// * `request` - The `MediaRequestParameters` of the file.
194    ///
195    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
196    ///   ignored.
197    pub async fn set_ignore_media_retention_policy<Store: MediaStoreInner>(
198        &self,
199        store: &Store,
200        request: &MediaRequestParameters,
201        ignore_policy: IgnoreMediaRetentionPolicy,
202    ) -> Result<(), Store::Error> {
203        store.set_ignore_media_retention_policy_inner(request, ignore_policy).await
204    }
205
206    /// Get a media file's content out of the media store.
207    ///
208    /// # Arguments
209    ///
210    /// * `store` - The `MediaStoreInner`.
211    ///
212    /// * `request` - The `MediaRequestParameters` of the file.
213    pub async fn get_media_content<Store: MediaStoreInner + 'static>(
214        &self,
215        store: &Store,
216        request: &MediaRequestParameters,
217    ) -> Result<Option<Vec<u8>>, Store::Error> {
218        let current_time = self.now();
219        let content = store.get_media_content_inner(request, current_time).await?;
220
221        self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
222
223        Ok(content)
224    }
225
226    /// Get a media file's content associated to an `MxcUri` from the
227    /// media store.
228    ///
229    /// # Arguments
230    ///
231    /// * `store` - The `MediaStoreInner`.
232    ///
233    /// * `uri` - The `MxcUri` of the media file.
234    pub async fn get_media_content_for_uri<Store: MediaStoreInner + 'static>(
235        &self,
236        store: &Store,
237        uri: &MxcUri,
238    ) -> Result<Option<Vec<u8>>, Store::Error> {
239        let current_time = self.now();
240        let content = store.get_media_content_for_uri_inner(uri, current_time).await?;
241
242        self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
243
244        Ok(content)
245    }
246
247    /// Clean up the media cache with the current `MediaRetentionPolicy`.
248    ///
249    /// If there is already an ongoing cleanup, this is a noop.
250    ///
251    /// # Arguments
252    ///
253    /// * `store` - The `MediaStoreInner`.
254    pub async fn clean<Store: MediaStoreInner>(&self, store: &Store) -> Result<(), Store::Error> {
255        self.clean_inner(store, self.now()).await
256    }
257
258    async fn clean_inner<Store: MediaStoreInner>(
259        &self,
260        store: &Store,
261        current_time: SystemTime,
262    ) -> Result<(), Store::Error> {
263        let Ok(_guard) = self.inner.cleanup_guard.try_lock() else {
264            // There is another ongoing cleanup.
265            return Ok(());
266        };
267
268        let policy = self.media_retention_policy();
269
270        if !policy.has_limitations() {
271            // No need to call the backend.
272            return Ok(());
273        }
274
275        store.clean_inner(policy, current_time).await?;
276
277        *self.inner.last_media_cleanup_time.lock() = Some(current_time);
278
279        Ok(())
280    }
281
282    /// Spawn an automatic media cache cleanup, according to the media retention
283    /// policy.
284    ///
285    /// A cleanup will be spawned if:
286    /// * The media retention policy's `cleanup_frequency` is set and enough
287    ///   time has passed since the last cleanup.
288    /// * No other cleanup is running,
289    fn maybe_spawn_automatic_media_cache_cleanup<Store: MediaStoreInner + 'static>(
290        &self,
291        store: &Store,
292        current_time: SystemTime,
293    ) {
294        let mut join_handle = self.inner.automatic_media_cleanup_join_handle.lock();
295
296        if join_handle.as_ref().is_some_and(|join_handle| !join_handle.is_finished()) {
297            // There is an ongoing automatic media cache cleanup.
298            return;
299        }
300
301        let policy = self.media_retention_policy();
302        if policy.cleanup_frequency.is_none() || !policy.has_limitations() {
303            // Automatic cleanups are disabled or have no effect.
304            return;
305        }
306
307        let last_media_cleanup_time = *self.inner.last_media_cleanup_time.lock();
308        if last_media_cleanup_time.is_some_and(|last_cleanup_time| {
309            !policy.should_clean_up(current_time, last_cleanup_time)
310        }) {
311            // It is not time to clean up.
312            return;
313        }
314
315        let this = self.clone();
316        let store = store.clone();
317
318        let handle = spawn(async move {
319            if let Err(error) = this.clean_inner(&store, current_time).await {
320                error!("Failed to run automatic media cache cleanup: {error}");
321            }
322        });
323
324        *join_handle = Some(handle);
325    }
326}
327
328impl<Time> Clone for MediaService<Time>
329where
330    Time: TimeProvider,
331{
332    fn clone(&self) -> Self {
333        Self { inner: self.inner.clone() }
334    }
335}
336
337impl<Time> Drop for MediaServiceInner<Time>
338where
339    Time: TimeProvider,
340{
341    fn drop(&mut self) {
342        if let Some(join_handle) = self.automatic_media_cleanup_join_handle.lock().take() {
343            join_handle.abort();
344        }
345    }
346}
347
348/// Whether the [`MediaRetentionPolicy`] should be ignored for the current
349/// content.
350///
351/// Some media cache actions are noops when the media content that is processed
352/// is filtered out by the policy. This can break some features of the SDK, like
353/// the send queue, that expects to be able to persist all media files in the
354/// store to restore them when the client is restored.
355///
356/// This can be converted to a boolean with
357/// [`IgnoreMediaRetentionPolicy::is_yes()`].
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub enum IgnoreMediaRetentionPolicy {
360    /// The media retention policy will be ignored and the current action will
361    /// not be a noop.
362    ///
363    /// Any media content in this state must NOT be used when applying a
364    /// `MediaRetentionPolicy`. This applies to ANY criteria, like the maximum
365    /// file size, the maximum cache size or the last access expiry.
366    ///
367    /// This state is supposed to be transient, and to only be used internally
368    /// by the SDK.
369    Yes,
370
371    /// The media retention policy will be respected and the current action
372    /// might be a noop.
373    No,
374}
375
376impl IgnoreMediaRetentionPolicy {
377    /// Whether this is an [`IgnoreMediaRetentionPolicy::Yes`] variant.
378    pub fn is_yes(self) -> bool {
379        matches!(self, Self::Yes)
380    }
381}
382
383/// An abstract trait to provide the current `SystemTime` for the
384/// [`MediaService`].
385pub trait TimeProvider: SendOutsideWasm + SyncOutsideWasm {
386    /// The current time.
387    fn now(&self) -> SystemTime;
388}
389
390/// The default time provider, that calls `ruma::time::SystemTime::now()`.
391#[derive(Debug)]
392pub struct DefaultTimeProvider;
393
394impl TimeProvider for DefaultTimeProvider {
395    fn now(&self) -> SystemTime {
396        SystemTime::now()
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use std::{
403        fmt,
404        sync::{Arc, MutexGuard},
405    };
406
407    use async_trait::async_trait;
408    use matrix_sdk_common::locks::Mutex;
409    use matrix_sdk_test::async_test;
410    use ruma::{
411        MxcUri, OwnedMxcUri,
412        events::room::MediaSource,
413        mxc_uri,
414        time::{Duration, SystemTime},
415    };
416
417    use super::{
418        IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStoreInner,
419        TimeProvider,
420    };
421    use crate::media::{MediaFormat, MediaRequestParameters, UniqueKey, store::MediaStoreError};
422
423    #[derive(Debug, Default, Clone)]
424    struct MockMediaStoreInner {
425        inner: Arc<Mutex<MockMediaStoreInnerInner>>,
426    }
427
428    impl MockMediaStoreInner {
429        /// Whether the store was accessed.
430        fn accessed(&self) -> bool {
431            self.inner.lock().accessed
432        }
433
434        /// Reset the `accessed` boolean.
435        fn reset_accessed(&self) {
436            self.inner.lock().accessed = false;
437        }
438
439        /// Access the inner store.
440        ///
441        /// Should be called for every access to the inner store as it also sets
442        /// the `accessed` boolean.
443        fn inner(&self) -> MutexGuard<'_, MockMediaStoreInnerInner> {
444            let mut inner = self.inner.lock();
445            inner.accessed = true;
446            inner
447        }
448    }
449
450    #[derive(Debug, Default)]
451    struct MockMediaStoreInnerInner {
452        /// Whether this store was accessed.
453        ///
454        /// Must be set to `true` for any operation that unlocks the store.
455        accessed: bool,
456
457        /// The persisted media retention policy.
458        media_retention_policy: Option<MediaRetentionPolicy>,
459
460        /// The list of media content.
461        media_list: Vec<MediaContent>,
462
463        /// The time of the last cleanup.
464        cleanup_time: Option<SystemTime>,
465    }
466
467    #[derive(Debug, Clone)]
468    struct MediaContent {
469        /// The unique key for the media content.
470        key: String,
471
472        /// The original URI of the media content.
473        uri: OwnedMxcUri,
474
475        /// The media content.
476        content: Vec<u8>,
477
478        /// Whether the `MediaRetentionPolicy` should be ignored for this media
479        /// content;
480        ignore_policy: bool,
481
482        /// The time of the last access of the media content.
483        last_access: SystemTime,
484    }
485
486    #[derive(Debug)]
487    struct MockMediaStoreInnerError;
488
489    impl fmt::Display for MockMediaStoreInnerError {
490        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
491            write!(f, "MockMediaStoreInnerError")
492        }
493    }
494
495    impl std::error::Error for MockMediaStoreInnerError {}
496
497    impl From<MockMediaStoreInnerError> for MediaStoreError {
498        fn from(value: MockMediaStoreInnerError) -> Self {
499            Self::backend(value)
500        }
501    }
502
503    #[cfg_attr(target_family = "wasm", async_trait(?Send))]
504    #[cfg_attr(not(target_family = "wasm"), async_trait)]
505    impl MediaStoreInner for MockMediaStoreInner {
506        type Error = MockMediaStoreInnerError;
507
508        async fn media_retention_policy_inner(
509            &self,
510        ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
511            Ok(self.inner().media_retention_policy)
512        }
513
514        async fn set_media_retention_policy_inner(
515            &self,
516            policy: MediaRetentionPolicy,
517        ) -> Result<(), Self::Error> {
518            self.inner().media_retention_policy = Some(policy);
519            Ok(())
520        }
521
522        async fn add_media_content_inner(
523            &self,
524            request: &MediaRequestParameters,
525            content: Vec<u8>,
526            current_time: SystemTime,
527            policy: MediaRetentionPolicy,
528            ignore_policy: IgnoreMediaRetentionPolicy,
529        ) -> Result<(), Self::Error> {
530            let ignore_policy = ignore_policy.is_yes();
531
532            if !ignore_policy && policy.exceeds_max_file_size(content.len() as u64) {
533                return Ok(());
534            }
535
536            let mut inner = self.inner();
537            let key = request.unique_key();
538
539            if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
540                let media_content = &mut inner.media_list[pos];
541                media_content.content = content;
542                media_content.last_access = current_time;
543                media_content.ignore_policy = ignore_policy;
544            } else {
545                inner.media_list.push(MediaContent {
546                    key,
547                    uri: request.uri().to_owned(),
548                    content,
549                    ignore_policy,
550                    last_access: current_time,
551                });
552            }
553
554            Ok(())
555        }
556
557        async fn set_ignore_media_retention_policy_inner(
558            &self,
559            request: &MediaRequestParameters,
560            ignore_policy: IgnoreMediaRetentionPolicy,
561        ) -> Result<(), Self::Error> {
562            let key = request.unique_key();
563            let mut inner = self.inner();
564
565            if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
566                inner.media_list[pos].ignore_policy = ignore_policy.is_yes();
567            }
568
569            Ok(())
570        }
571
572        async fn get_media_content_inner(
573            &self,
574            request: &MediaRequestParameters,
575            current_time: SystemTime,
576        ) -> Result<Option<Vec<u8>>, Self::Error> {
577            let key = request.unique_key();
578            let mut inner = self.inner();
579
580            let Some(media_content) =
581                inner.media_list.iter_mut().find(|content| content.key == key)
582            else {
583                return Ok(None);
584            };
585
586            media_content.last_access = current_time;
587
588            Ok(Some(media_content.content.clone()))
589        }
590
591        async fn get_media_content_for_uri_inner(
592            &self,
593            uri: &MxcUri,
594            current_time: SystemTime,
595        ) -> Result<Option<Vec<u8>>, Self::Error> {
596            let mut inner = self.inner();
597
598            let Some(media_content) =
599                inner.media_list.iter_mut().find(|content| content.uri == uri)
600            else {
601                return Ok(None);
602            };
603
604            media_content.last_access = current_time;
605
606            Ok(Some(media_content.content.clone()))
607        }
608
609        async fn clean_inner(
610            &self,
611            _policy: MediaRetentionPolicy,
612            current_time: SystemTime,
613        ) -> Result<(), Self::Error> {
614            // This is mostly a noop. We don't care about this test implementation, only
615            // whether this method was called with the right time.
616            self.inner().cleanup_time = Some(current_time);
617
618            Ok(())
619        }
620
621        async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
622            Ok(self.inner().cleanup_time)
623        }
624    }
625
626    #[derive(Debug)]
627    struct MockTimeProvider {
628        now: Mutex<SystemTime>,
629    }
630
631    impl MockTimeProvider {
632        /// Construct a `MockTimeProvider` with the given current time.
633        fn new(now: SystemTime) -> Self {
634            Self { now: Mutex::new(now) }
635        }
636
637        /// Set the current time.
638        fn set_now(&self, now: SystemTime) {
639            *self.now.lock() = now;
640        }
641    }
642
643    impl TimeProvider for MockTimeProvider {
644        fn now(&self) -> SystemTime {
645            *self.now.lock()
646        }
647    }
648
649    #[async_test]
650    async fn test_media_service_empty_policy() {
651        let content = b"some text content";
652        let uri = mxc_uri!("mxc://server.local/AbcDe1234");
653        let request = MediaRequestParameters {
654            source: MediaSource::Plain(uri.to_owned()),
655            format: MediaFormat::File,
656        };
657
658        let now = SystemTime::UNIX_EPOCH;
659
660        let store = MockMediaStoreInner::default();
661        let service = MediaService::with_time_provider(MockTimeProvider::new(now));
662
663        // By default an empty policy is used.
664        assert!(!service.media_retention_policy().has_limitations());
665        service.restore(None, None);
666        assert!(!service.media_retention_policy().has_limitations());
667        assert!(!store.accessed());
668
669        // Add media.
670        service
671            .add_media_content(&store, &request, content.to_vec(), IgnoreMediaRetentionPolicy::No)
672            .await
673            .unwrap();
674        assert!(store.accessed());
675
676        let media_content = store.inner().media_list[0].clone();
677        assert_eq!(media_content.uri, uri);
678        assert_eq!(media_content.content, content);
679        assert!(!media_content.ignore_policy);
680        assert_eq!(media_content.last_access, now);
681
682        let now = now + Duration::from_secs(60);
683        service.inner.time_provider.set_now(now);
684        store.reset_accessed();
685
686        // Get media from request.
687        let loaded_content = service.get_media_content(&store, &request).await.unwrap();
688        assert!(store.accessed());
689        assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
690
691        // The last access time was updated.
692        let media = store.inner().media_list[0].clone();
693        assert_eq!(media.last_access, now);
694
695        let now = now + Duration::from_secs(60);
696        service.inner.time_provider.set_now(now);
697        store.reset_accessed();
698
699        // Get media from URI.
700        let loaded_content = service.get_media_content_for_uri(&store, uri).await.unwrap();
701        assert!(store.accessed());
702        assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
703
704        // The last access time was updated.
705        let media = store.inner().media_list[0].clone();
706        assert_eq!(media.last_access, now);
707
708        // Update ignore_policy.
709        service
710            .set_ignore_media_retention_policy(&store, &request, IgnoreMediaRetentionPolicy::Yes)
711            .await
712            .unwrap();
713        assert!(store.accessed());
714
715        let media_content = store.inner().media_list[0].clone();
716        assert!(media_content.ignore_policy);
717
718        // Try a cleanup. With the empty policy the store should not be accessed.
719        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
720        store.reset_accessed();
721
722        service.clean(&store).await.unwrap();
723        assert!(!store.accessed());
724        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
725    }
726
727    #[async_test]
728    async fn test_media_service_non_empty_policy() {
729        // Content of less than 32 bytes.
730        let small_content = b"some text content";
731        let small_uri = mxc_uri!("mxc://server.local/small");
732        let small_request = MediaRequestParameters {
733            source: MediaSource::Plain(small_uri.to_owned()),
734            format: MediaFormat::File,
735        };
736
737        // Content of more than 32 bytes.
738        let big_content = b"some much much larger text content";
739        let big_uri = mxc_uri!("mxc://server.local/big");
740        let big_request = MediaRequestParameters {
741            source: MediaSource::Plain(big_uri.to_owned()),
742            format: MediaFormat::File,
743        };
744
745        // Limit the file size to 32 bytes in the retention policy.
746        let policy = MediaRetentionPolicy { max_file_size: Some(32), ..Default::default() };
747
748        let now = SystemTime::UNIX_EPOCH;
749
750        let store = MockMediaStoreInner::default();
751        let service = MediaService::with_time_provider(MockTimeProvider::new(now));
752
753        // Check that restoring the policy works.
754        service.restore(Some(MediaRetentionPolicy::default()), None);
755        assert_eq!(service.media_retention_policy(), MediaRetentionPolicy::default());
756        assert!(!store.accessed());
757
758        // Set the media retention policy.
759        service.set_media_retention_policy(&store, policy).await.unwrap();
760        assert!(store.accessed());
761        assert_eq!(service.media_retention_policy(), policy);
762        assert_eq!(store.inner().media_retention_policy, Some(policy));
763
764        store.reset_accessed();
765
766        // Add small media, it should work because its size is lower than the max file
767        // size.
768        service
769            .add_media_content(
770                &store,
771                &small_request,
772                small_content.to_vec(),
773                IgnoreMediaRetentionPolicy::No,
774            )
775            .await
776            .unwrap();
777        assert!(store.accessed());
778
779        let media_content = store.inner().media_list[0].clone();
780        assert_eq!(media_content.uri, small_uri);
781        assert_eq!(media_content.content, small_content);
782        assert!(!media_content.ignore_policy);
783        assert_eq!(media_content.last_access, now);
784
785        let now = now + Duration::from_secs(60);
786        service.inner.time_provider.set_now(now);
787        store.reset_accessed();
788
789        // Get media from request.
790        let loaded_content = service.get_media_content(&store, &small_request).await.unwrap();
791        assert!(store.accessed());
792        assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
793
794        // The last access time was updated.
795        let media = store.inner().media_list[0].clone();
796        assert_eq!(media.last_access, now);
797
798        let now = now + Duration::from_secs(60);
799        service.inner.time_provider.set_now(now);
800        store.reset_accessed();
801
802        // Get media from URI.
803        let loaded_content = service.get_media_content_for_uri(&store, small_uri).await.unwrap();
804        assert!(store.accessed());
805        assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
806
807        // The last access time was updated.
808        let media = store.inner().media_list[0].clone();
809        assert_eq!(media.last_access, now);
810
811        let now = now + Duration::from_secs(60);
812        service.inner.time_provider.set_now(now);
813        store.reset_accessed();
814
815        // Add big media, it will not work because it is bigger than the max file size.
816        service
817            .add_media_content(
818                &store,
819                &big_request,
820                big_content.to_vec(),
821                IgnoreMediaRetentionPolicy::No,
822            )
823            .await
824            .unwrap();
825        assert!(!store.accessed());
826        assert_eq!(store.inner().media_list.len(), 1);
827
828        store.reset_accessed();
829
830        let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
831        assert!(store.accessed());
832        assert_eq!(loaded_content, None);
833
834        store.reset_accessed();
835
836        let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
837        assert!(store.accessed());
838        assert_eq!(loaded_content, None);
839
840        // Add big media, but this time ignore the policy.
841        service
842            .add_media_content(
843                &store,
844                &big_request,
845                big_content.to_vec(),
846                IgnoreMediaRetentionPolicy::Yes,
847            )
848            .await
849            .unwrap();
850        assert!(store.accessed());
851        assert_eq!(store.inner().media_list.len(), 2);
852
853        store.reset_accessed();
854
855        // Get media from request.
856        let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
857        assert!(store.accessed());
858        assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
859
860        // The last access time was updated.
861        let media = store.inner().media_list[1].clone();
862        assert_eq!(media.last_access, now);
863
864        let now = now + Duration::from_secs(60);
865        service.inner.time_provider.set_now(now);
866        store.reset_accessed();
867
868        // Get media from URI.
869        let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
870        assert!(store.accessed());
871        assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
872
873        // The last access time was updated.
874        let media = store.inner().media_list[1].clone();
875        assert_eq!(media.last_access, now);
876
877        // Try a cleanup, the store should be accessed.
878        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
879
880        let now = now + Duration::from_secs(60);
881        service.inner.time_provider.set_now(now);
882        store.reset_accessed();
883
884        service.clean(&store).await.unwrap();
885        assert!(store.accessed());
886        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
887    }
888
889    #[async_test]
890    async fn test_media_service_automatic_cleanup() {
891        // 64 bytes content.
892        let content = vec![0; 64];
893
894        let uri_1 = mxc_uri!("mxc://localhost/media-1");
895        let request_1 = MediaRequestParameters {
896            source: MediaSource::Plain(uri_1.to_owned()),
897            format: MediaFormat::File,
898        };
899        let uri_2 = mxc_uri!("mxc://localhost/media-2");
900        let request_2 = MediaRequestParameters {
901            source: MediaSource::Plain(uri_2.to_owned()),
902            format: MediaFormat::File,
903        };
904
905        let now = SystemTime::UNIX_EPOCH;
906
907        let store = MockMediaStoreInner::default();
908        let service = MediaService::with_time_provider(MockTimeProvider::new(now));
909
910        // Set an empty policy.
911        let policy = MediaRetentionPolicy::empty();
912        service.set_media_retention_policy(&store, policy).await.unwrap();
913
914        // Add the contents.
915        service
916            .add_media_content(&store, &request_1, content.clone(), IgnoreMediaRetentionPolicy::No)
917            .await
918            .unwrap();
919        service
920            .add_media_content(&store, &request_2, content, IgnoreMediaRetentionPolicy::No)
921            .await
922            .unwrap();
923        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
924
925        // Try to launch an automatic cleanup.
926        let now = now + Duration::from_secs(60);
927        service.inner.time_provider.set_now(now);
928        service.maybe_spawn_automatic_media_cache_cleanup(&store, now);
929
930        // No cleanup was spawned since automatic cleanups are disabled.
931        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
932
933        // Set a policy with automatic cleanup every hour.
934        let policy = MediaRetentionPolicy::empty()
935            .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)));
936        let now = now + Duration::from_secs(60);
937        service.inner.time_provider.set_now(now);
938        service.set_media_retention_policy(&store, policy).await.unwrap();
939
940        // No cleanup was spawned since the policy has no limitations.
941        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
942
943        // Set a policy with automatic cleanup every hour and a max file size.
944        let policy = MediaRetentionPolicy::empty()
945            .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)))
946            .with_max_file_size(Some(512));
947        let now = now + Duration::from_secs(60);
948        service.inner.time_provider.set_now(now);
949        service.set_media_retention_policy(&store, policy).await.unwrap();
950
951        // A cleanup was spawned since there was no last_media_cleanup_time.
952        let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
953        join_handle.await.unwrap();
954
955        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
956
957        // Try again one minute in the future, nothing is spawned because we need to
958        // wait for one hour.
959        let now = now + Duration::from_secs(60);
960        service.inner.time_provider.set_now(now);
961        service.get_media_content(&store, &request_1).await.unwrap();
962
963        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
964
965        // Try again 2 hours in the future, another cleanup is spawned.
966        let now = now + Duration::from_secs(2 * 60 * 60);
967        service.inner.time_provider.set_now(now);
968        service.get_media_content_for_uri(&store, uri_1).await.unwrap();
969
970        let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
971        join_handle.await.unwrap();
972
973        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
974    }
975}