matrix_sdk_base/event_cache/store/media/
media_service.rs

1// Copyright 2025 Kévin Commaille
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19    executor::{spawn, JoinHandle},
20    locks::Mutex,
21    AsyncTraitDeps, SendOutsideWasm, SyncOutsideWasm,
22};
23use ruma::{time::SystemTime, MxcUri};
24use tokio::sync::Mutex as AsyncMutex;
25use tracing::error;
26
27use super::MediaRetentionPolicy;
28use crate::{event_cache::store::EventCacheStoreError, media::MediaRequestParameters};
29
30/// API for implementors of [`EventCacheStore`] to manage their media through
31/// their implementation of [`EventCacheStoreMedia`].
32///
33/// [`EventCacheStore`]: crate::event_cache::store::EventCacheStore
34#[derive(Debug)]
35pub struct MediaService<Time: TimeProvider = DefaultTimeProvider> {
36    inner: Arc<MediaServiceInner<Time>>,
37}
38
39#[derive(Debug)]
40struct MediaServiceInner<Time: TimeProvider = DefaultTimeProvider> {
41    /// The time provider.
42    time_provider: Time,
43
44    /// The current [`MediaRetentionPolicy`].
45    policy: Mutex<MediaRetentionPolicy>,
46
47    /// A mutex to ensure a single cleanup is running at a time.
48    cleanup_guard: AsyncMutex<()>,
49
50    /// The time of the last media cache cleanup.
51    last_media_cleanup_time: Mutex<Option<SystemTime>>,
52
53    /// The [`JoinHandle`] for an automatic media cleanup task.
54    ///
55    /// Used to ensure that only one automatic cleanup is running at a time, and
56    /// to stop the cleanup when the [`MediaServiceInner`] is dropped.
57    automatic_media_cleanup_join_handle: Mutex<Option<JoinHandle<()>>>,
58}
59
60impl MediaService {
61    /// Construct a new default `MediaService`.
62    ///
63    /// [`MediaService::restore()`] should be called after constructing the
64    /// `MediaService` to restore its previous state.
65    pub fn new() -> Self {
66        Self::default()
67    }
68}
69
70impl Default for MediaService {
71    fn default() -> Self {
72        Self::with_time_provider(DefaultTimeProvider)
73    }
74}
75
76impl<Time> MediaService<Time>
77where
78    Time: TimeProvider + 'static,
79{
80    /// Construct a new `MediaService` with the given `TimeProvider` and an
81    /// empty `MediaRetentionPolicy`.
82    fn with_time_provider(time_provider: Time) -> Self {
83        let inner = MediaServiceInner {
84            time_provider,
85            policy: Mutex::new(MediaRetentionPolicy::empty()),
86            cleanup_guard: AsyncMutex::new(()),
87            last_media_cleanup_time: Mutex::new(None),
88            automatic_media_cleanup_join_handle: Mutex::new(None),
89        };
90
91        Self { inner: Arc::new(inner) }
92    }
93
94    /// Restore the previous state of the [`MediaRetentionPolicy`] from data
95    /// that was persisted in the store.
96    ///
97    /// This should be called immediately after constructing the `MediaService`.
98    ///
99    /// # Arguments
100    ///
101    /// * `policy` - The `MediaRetentionPolicy` that was persisted in the store.
102    pub fn restore(
103        &self,
104        policy: Option<MediaRetentionPolicy>,
105        last_media_cleanup_time: Option<SystemTime>,
106    ) {
107        if let Some(policy) = policy {
108            *self.inner.policy.lock() = policy;
109        }
110
111        if let Some(time) = last_media_cleanup_time {
112            *self.inner.last_media_cleanup_time.lock() = Some(time);
113        }
114    }
115
116    /// Get the current time from the inner [`TimeProvider`].
117    fn now(&self) -> SystemTime {
118        self.inner.time_provider.now()
119    }
120
121    /// Set the `MediaRetentionPolicy` of this service.
122    ///
123    /// # Arguments
124    ///
125    /// * `store` - The `EventCacheStoreMedia`.
126    ///
127    /// * `policy` - The `MediaRetentionPolicy` to use.
128    pub async fn set_media_retention_policy<Store: EventCacheStoreMedia + 'static>(
129        &self,
130        store: &Store,
131        policy: MediaRetentionPolicy,
132    ) -> Result<(), Store::Error> {
133        store.set_media_retention_policy_inner(policy).await?;
134
135        *self.inner.policy.lock() = policy;
136
137        self.maybe_spawn_automatic_media_cache_cleanup(store, self.now());
138
139        Ok(())
140    }
141
142    /// Get the `MediaRetentionPolicy` of this service.
143    pub fn media_retention_policy(&self) -> MediaRetentionPolicy {
144        *self.inner.policy.lock()
145    }
146
147    /// Add a media file's content in the media store.
148    ///
149    /// # Arguments
150    ///
151    /// * `store` - The `EventCacheStoreMedia`.
152    ///
153    /// * `request` - The `MediaRequestParameters` of the file.
154    ///
155    /// * `content` - The content of the file.
156    ///
157    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
158    ///   ignored.
159    pub async fn add_media_content<Store: EventCacheStoreMedia + 'static>(
160        &self,
161        store: &Store,
162        request: &MediaRequestParameters,
163        content: Vec<u8>,
164        ignore_policy: IgnoreMediaRetentionPolicy,
165    ) -> Result<(), Store::Error> {
166        let policy = self.media_retention_policy();
167
168        if ignore_policy == IgnoreMediaRetentionPolicy::No
169            && policy.exceeds_max_file_size(content.len() as u64)
170        {
171            // We do not cache the content.
172            return Ok(());
173        }
174
175        let current_time = self.now();
176        store
177            .add_media_content_inner(request, content, current_time, policy, ignore_policy)
178            .await?;
179
180        self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
181
182        Ok(())
183    }
184
185    /// Set whether the current [`MediaRetentionPolicy`] should be ignored for
186    /// the media.
187    ///
188    /// The change will be taken into account in the next cleanup.
189    ///
190    /// # Arguments
191    ///
192    /// * `store` - The `EventCacheStoreMedia`.
193    ///
194    /// * `request` - The `MediaRequestParameters` of the file.
195    ///
196    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
197    ///   ignored.
198    pub async fn set_ignore_media_retention_policy<Store: EventCacheStoreMedia>(
199        &self,
200        store: &Store,
201        request: &MediaRequestParameters,
202        ignore_policy: IgnoreMediaRetentionPolicy,
203    ) -> Result<(), Store::Error> {
204        store.set_ignore_media_retention_policy_inner(request, ignore_policy).await
205    }
206
207    /// Get a media file's content out of the media store.
208    ///
209    /// # Arguments
210    ///
211    /// * `store` - The `EventCacheStoreMedia`.
212    ///
213    /// * `request` - The `MediaRequestParameters` of the file.
214    pub async fn get_media_content<Store: EventCacheStoreMedia + 'static>(
215        &self,
216        store: &Store,
217        request: &MediaRequestParameters,
218    ) -> Result<Option<Vec<u8>>, Store::Error> {
219        let current_time = self.now();
220        let content = store.get_media_content_inner(request, current_time).await?;
221
222        self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
223
224        Ok(content)
225    }
226
227    /// Get a media file's content associated to an `MxcUri` from the
228    /// media store.
229    ///
230    /// # Arguments
231    ///
232    /// * `store` - The `EventCacheStoreMedia`.
233    ///
234    /// * `uri` - The `MxcUri` of the media file.
235    pub async fn get_media_content_for_uri<Store: EventCacheStoreMedia + 'static>(
236        &self,
237        store: &Store,
238        uri: &MxcUri,
239    ) -> Result<Option<Vec<u8>>, Store::Error> {
240        let current_time = self.now();
241        let content = store.get_media_content_for_uri_inner(uri, current_time).await?;
242
243        self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
244
245        Ok(content)
246    }
247
248    /// Clean up the media cache with the current `MediaRetentionPolicy`.
249    ///
250    /// If there is already an ongoing cleanup, this is a noop.
251    ///
252    /// # Arguments
253    ///
254    /// * `store` - The `EventCacheStoreMedia`.
255    pub async fn clean_up_media_cache<Store: EventCacheStoreMedia>(
256        &self,
257        store: &Store,
258    ) -> Result<(), Store::Error> {
259        self.clean_up_media_cache_inner(store, self.now()).await
260    }
261
262    async fn clean_up_media_cache_inner<Store: EventCacheStoreMedia>(
263        &self,
264        store: &Store,
265        current_time: SystemTime,
266    ) -> Result<(), Store::Error> {
267        let Ok(_guard) = self.inner.cleanup_guard.try_lock() else {
268            // There is another ongoing cleanup.
269            return Ok(());
270        };
271
272        let policy = self.media_retention_policy();
273
274        if !policy.has_limitations() {
275            // No need to call the backend.
276            return Ok(());
277        }
278
279        store.clean_up_media_cache_inner(policy, current_time).await?;
280
281        *self.inner.last_media_cleanup_time.lock() = Some(current_time);
282
283        Ok(())
284    }
285
286    /// Spawn an automatic media cache cleanup, according to the media retention
287    /// policy.
288    ///
289    /// A cleanup will be spawned if:
290    /// * The media retention policy's `cleanup_frequency` is set and enough
291    ///   time has passed since the last cleanup.
292    /// * No other cleanup is running,
293    fn maybe_spawn_automatic_media_cache_cleanup<Store: EventCacheStoreMedia + 'static>(
294        &self,
295        store: &Store,
296        current_time: SystemTime,
297    ) {
298        let mut join_handle = self.inner.automatic_media_cleanup_join_handle.lock();
299
300        if join_handle.as_ref().is_some_and(|join_handle| !join_handle.is_finished()) {
301            // There is an ongoing automatic media cache cleanup.
302            return;
303        }
304
305        let policy = self.media_retention_policy();
306        if policy.cleanup_frequency.is_none() || !policy.has_limitations() {
307            // Automatic cleanups are disabled or have no effect.
308            return;
309        }
310
311        let last_media_cleanup_time = *self.inner.last_media_cleanup_time.lock();
312        if last_media_cleanup_time.is_some_and(|last_cleanup_time| {
313            !policy.should_clean_up(current_time, last_cleanup_time)
314        }) {
315            // It is not time to clean up.
316            return;
317        }
318
319        let this = self.clone();
320        let store = store.clone();
321
322        let handle = spawn(async move {
323            if let Err(error) = this.clean_up_media_cache_inner(&store, current_time).await {
324                error!("Failed to run automatic media cache cleanup: {error}");
325            }
326        });
327
328        *join_handle = Some(handle);
329    }
330}
331
332impl<Time> Clone for MediaService<Time>
333where
334    Time: TimeProvider,
335{
336    fn clone(&self) -> Self {
337        Self { inner: self.inner.clone() }
338    }
339}
340
341impl<Time> Drop for MediaServiceInner<Time>
342where
343    Time: TimeProvider,
344{
345    fn drop(&mut self) {
346        if let Some(join_handle) = self.automatic_media_cleanup_join_handle.lock().take() {
347            join_handle.abort();
348        }
349    }
350}
351
352/// An abstract trait that can be used to implement different store backends
353/// for the media cache of the SDK.
354///
355/// The main purposes of this trait are to be able to centralize where we handle
356/// [`MediaRetentionPolicy`] by wrapping this in a [`MediaService`], and to
357/// simplify the implementation of tests by being able to have complete control
358/// over the `SystemTime`s provided to the store.
359#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
360#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
361pub trait EventCacheStoreMedia: AsyncTraitDeps + Clone {
362    /// The error type used by this media cache store.
363    type Error: fmt::Debug + fmt::Display + Into<EventCacheStoreError>;
364
365    /// The persisted media retention policy in the media cache.
366    async fn media_retention_policy_inner(
367        &self,
368    ) -> Result<Option<MediaRetentionPolicy>, Self::Error>;
369
370    /// Persist the media retention policy in the media cache.
371    ///
372    /// # Arguments
373    ///
374    /// * `policy` - The `MediaRetentionPolicy` to persist.
375    async fn set_media_retention_policy_inner(
376        &self,
377        policy: MediaRetentionPolicy,
378    ) -> Result<(), Self::Error>;
379
380    /// Add a media file's content in the media cache.
381    ///
382    /// # Arguments
383    ///
384    /// * `request` - The `MediaRequestParameters` of the file.
385    ///
386    /// * `content` - The content of the file.
387    ///
388    /// * `current_time` - The current time, to set the last access time of the
389    ///   media.
390    ///
391    /// * `policy` - The media retention policy, to check whether the media is
392    ///   too big to be cached.
393    ///
394    /// * `ignore_policy` - Whether the `MediaRetentionPolicy` should be ignored
395    ///   for this media. This setting should be persisted alongside the media
396    ///   and taken into account whenever the policy is used.
397    async fn add_media_content_inner(
398        &self,
399        request: &MediaRequestParameters,
400        content: Vec<u8>,
401        current_time: SystemTime,
402        policy: MediaRetentionPolicy,
403        ignore_policy: IgnoreMediaRetentionPolicy,
404    ) -> Result<(), Self::Error>;
405
406    /// Set whether the current [`MediaRetentionPolicy`] should be ignored for
407    /// the media.
408    ///
409    /// If the media of the given request is not found, this should be a noop.
410    ///
411    /// The change will be taken into account in the next cleanup.
412    ///
413    /// # Arguments
414    ///
415    /// * `request` - The `MediaRequestParameters` of the file.
416    ///
417    /// * `ignore_policy` - Whether the current `MediaRetentionPolicy` should be
418    ///   ignored.
419    async fn set_ignore_media_retention_policy_inner(
420        &self,
421        request: &MediaRequestParameters,
422        ignore_policy: IgnoreMediaRetentionPolicy,
423    ) -> Result<(), Self::Error>;
424
425    /// Get a media file's content out of the media cache.
426    ///
427    /// # Arguments
428    ///
429    /// * `request` - The `MediaRequestParameters` of the file.
430    ///
431    /// * `current_time` - The current time, to update the last access time of
432    ///   the media.
433    async fn get_media_content_inner(
434        &self,
435        request: &MediaRequestParameters,
436        current_time: SystemTime,
437    ) -> Result<Option<Vec<u8>>, Self::Error>;
438
439    /// Get a media file's content associated to an `MxcUri` from the
440    /// media store.
441    ///
442    /// # Arguments
443    ///
444    /// * `uri` - The `MxcUri` of the media file.
445    ///
446    /// * `current_time` - The current time, to update the last access time of
447    ///   the media.
448    async fn get_media_content_for_uri_inner(
449        &self,
450        uri: &MxcUri,
451        current_time: SystemTime,
452    ) -> Result<Option<Vec<u8>>, Self::Error>;
453
454    /// Clean up the media cache with the given policy.
455    ///
456    /// For the integration tests, it is expected that content that does not
457    /// pass the last access expiry and max file size criteria will be
458    /// removed first. After that, the remaining cache size should be
459    /// computed to compare against the max cache size criteria.
460    ///
461    /// # Arguments
462    ///
463    /// * `policy` - The media retention policy to use for the cleanup. The
464    ///   `cleanup_frequency` will be ignored.
465    ///
466    /// * `current_time` - The current time, to be used to check for expired
467    ///   content and to be stored as the time of the last media cache cleanup.
468    async fn clean_up_media_cache_inner(
469        &self,
470        policy: MediaRetentionPolicy,
471        current_time: SystemTime,
472    ) -> Result<(), Self::Error>;
473
474    /// The time of the last media cache cleanup.
475    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error>;
476}
477
478/// Whether the [`MediaRetentionPolicy`] should be ignored for the current
479/// content.
480///
481/// Some media cache actions are noops when the media content that is processed
482/// is filtered out by the policy. This can break some features of the SDK, like
483/// the send queue, that expects to be able to persist all media files in the
484/// store to restore them when the client is restored.
485///
486/// This can be converted to a boolean with
487/// [`IgnoreMediaRetentionPolicy::is_yes()`].
488#[derive(Debug, Clone, Copy, PartialEq, Eq)]
489pub enum IgnoreMediaRetentionPolicy {
490    /// The media retention policy will be ignored and the current action will
491    /// not be a noop.
492    ///
493    /// Any media content in this state must NOT be used when applying a
494    /// `MediaRetentionPolicy`. This applies to ANY criteria, like the maximum
495    /// file size, the maximum cache size or the last access expiry.
496    ///
497    /// This state is supposed to be transient, and to only be used internally
498    /// by the SDK.
499    Yes,
500
501    /// The media retention policy will be respected and the current action
502    /// might be a noop.
503    No,
504}
505
506impl IgnoreMediaRetentionPolicy {
507    /// Whether this is an [`IgnoreMediaRetentionPolicy::Yes`] variant.
508    pub fn is_yes(self) -> bool {
509        matches!(self, Self::Yes)
510    }
511}
512
513/// An abstract trait to provide the current `SystemTime` for the
514/// [`MediaService`].
515pub trait TimeProvider: SendOutsideWasm + SyncOutsideWasm {
516    /// The current time.
517    fn now(&self) -> SystemTime;
518}
519
520/// The default time provider, that calls `ruma::time::SystemTime::now()`.
521#[derive(Debug)]
522pub struct DefaultTimeProvider;
523
524impl TimeProvider for DefaultTimeProvider {
525    fn now(&self) -> SystemTime {
526        SystemTime::now()
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use std::{
533        fmt,
534        sync::{Arc, MutexGuard},
535    };
536
537    use async_trait::async_trait;
538    use matrix_sdk_common::locks::Mutex;
539    use matrix_sdk_test::async_test;
540    use ruma::{
541        events::room::MediaSource,
542        mxc_uri,
543        time::{Duration, SystemTime},
544        MxcUri, OwnedMxcUri,
545    };
546
547    use super::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaService, TimeProvider};
548    use crate::{
549        event_cache::store::{media::MediaRetentionPolicy, EventCacheStoreError},
550        media::{MediaFormat, MediaRequestParameters, UniqueKey},
551    };
552
553    #[derive(Debug, Default, Clone)]
554    struct MockEventCacheStoreMedia {
555        inner: Arc<Mutex<MockEventCacheStoreMediaInner>>,
556    }
557
558    impl MockEventCacheStoreMedia {
559        /// Whether the store was accessed.
560        fn accessed(&self) -> bool {
561            self.inner.lock().accessed
562        }
563
564        /// Reset the `accessed` boolean.
565        fn reset_accessed(&self) {
566            self.inner.lock().accessed = false;
567        }
568
569        /// Access the inner store.
570        ///
571        /// Should be called for every access to the inner store as it also sets
572        /// the `accessed` boolean.
573        fn inner(&self) -> MutexGuard<'_, MockEventCacheStoreMediaInner> {
574            let mut inner = self.inner.lock();
575            inner.accessed = true;
576            inner
577        }
578    }
579
580    #[derive(Debug, Default)]
581    struct MockEventCacheStoreMediaInner {
582        /// Whether this store was accessed.
583        ///
584        /// Must be set to `true` for any operation that unlocks the store.
585        accessed: bool,
586
587        /// The persisted media retention policy.
588        media_retention_policy: Option<MediaRetentionPolicy>,
589
590        /// The list of media content.
591        media_list: Vec<MediaContent>,
592
593        /// The time of the last cleanup.
594        cleanup_time: Option<SystemTime>,
595    }
596
597    #[derive(Debug, Clone)]
598    struct MediaContent {
599        /// The unique key for the media content.
600        key: String,
601
602        /// The original URI of the media content.
603        uri: OwnedMxcUri,
604
605        /// The media content.
606        content: Vec<u8>,
607
608        /// Whether the `MediaRetentionPolicy` should be ignored for this media
609        /// content;
610        ignore_policy: bool,
611
612        /// The time of the last access of the media content.
613        last_access: SystemTime,
614    }
615
616    #[derive(Debug)]
617    struct MockEventCacheStoreMediaError;
618
619    impl fmt::Display for MockEventCacheStoreMediaError {
620        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621            write!(f, "MockEventCacheStoreMediaError")
622        }
623    }
624
625    impl std::error::Error for MockEventCacheStoreMediaError {}
626
627    impl From<MockEventCacheStoreMediaError> for EventCacheStoreError {
628        fn from(value: MockEventCacheStoreMediaError) -> Self {
629            Self::backend(value)
630        }
631    }
632
633    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
634    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
635    impl EventCacheStoreMedia for MockEventCacheStoreMedia {
636        type Error = MockEventCacheStoreMediaError;
637
638        async fn media_retention_policy_inner(
639            &self,
640        ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
641            Ok(self.inner().media_retention_policy)
642        }
643
644        async fn set_media_retention_policy_inner(
645            &self,
646            policy: MediaRetentionPolicy,
647        ) -> Result<(), Self::Error> {
648            self.inner().media_retention_policy = Some(policy);
649            Ok(())
650        }
651
652        async fn add_media_content_inner(
653            &self,
654            request: &MediaRequestParameters,
655            content: Vec<u8>,
656            current_time: SystemTime,
657            policy: MediaRetentionPolicy,
658            ignore_policy: IgnoreMediaRetentionPolicy,
659        ) -> Result<(), Self::Error> {
660            let ignore_policy = ignore_policy.is_yes();
661
662            if !ignore_policy && policy.exceeds_max_file_size(content.len() as u64) {
663                return Ok(());
664            }
665
666            let mut inner = self.inner();
667            let key = request.unique_key();
668
669            if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
670                let media_content = &mut inner.media_list[pos];
671                media_content.content = content;
672                media_content.last_access = current_time;
673                media_content.ignore_policy = ignore_policy;
674            } else {
675                inner.media_list.push(MediaContent {
676                    key,
677                    uri: request.uri().to_owned(),
678                    content,
679                    ignore_policy,
680                    last_access: current_time,
681                });
682            }
683
684            Ok(())
685        }
686
687        async fn set_ignore_media_retention_policy_inner(
688            &self,
689            request: &MediaRequestParameters,
690            ignore_policy: IgnoreMediaRetentionPolicy,
691        ) -> Result<(), Self::Error> {
692            let key = request.unique_key();
693            let mut inner = self.inner();
694
695            if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
696                inner.media_list[pos].ignore_policy = ignore_policy.is_yes();
697            }
698
699            Ok(())
700        }
701
702        async fn get_media_content_inner(
703            &self,
704            request: &MediaRequestParameters,
705            current_time: SystemTime,
706        ) -> Result<Option<Vec<u8>>, Self::Error> {
707            let key = request.unique_key();
708            let mut inner = self.inner();
709
710            let Some(media_content) =
711                inner.media_list.iter_mut().find(|content| content.key == key)
712            else {
713                return Ok(None);
714            };
715
716            media_content.last_access = current_time;
717
718            Ok(Some(media_content.content.clone()))
719        }
720
721        async fn get_media_content_for_uri_inner(
722            &self,
723            uri: &MxcUri,
724            current_time: SystemTime,
725        ) -> Result<Option<Vec<u8>>, Self::Error> {
726            let mut inner = self.inner();
727
728            let Some(media_content) =
729                inner.media_list.iter_mut().find(|content| content.uri == uri)
730            else {
731                return Ok(None);
732            };
733
734            media_content.last_access = current_time;
735
736            Ok(Some(media_content.content.clone()))
737        }
738
739        async fn clean_up_media_cache_inner(
740            &self,
741            _policy: MediaRetentionPolicy,
742            current_time: SystemTime,
743        ) -> Result<(), Self::Error> {
744            // This is mostly a noop. We don't care about this test implementation, only
745            // whether this method was called with the right time.
746            self.inner().cleanup_time = Some(current_time);
747
748            Ok(())
749        }
750
751        async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
752            Ok(self.inner().cleanup_time)
753        }
754    }
755
756    #[derive(Debug)]
757    struct MockTimeProvider {
758        now: Mutex<SystemTime>,
759    }
760
761    impl MockTimeProvider {
762        /// Construct a `MockTimeProvider` with the given current time.
763        fn new(now: SystemTime) -> Self {
764            Self { now: Mutex::new(now) }
765        }
766
767        /// Set the current time.
768        fn set_now(&self, now: SystemTime) {
769            *self.now.lock() = now;
770        }
771    }
772
773    impl TimeProvider for MockTimeProvider {
774        fn now(&self) -> SystemTime {
775            *self.now.lock()
776        }
777    }
778
779    #[async_test]
780    async fn test_media_service_empty_policy() {
781        let content = b"some text content";
782        let uri = mxc_uri!("mxc://server.local/AbcDe1234");
783        let request = MediaRequestParameters {
784            source: MediaSource::Plain(uri.to_owned()),
785            format: MediaFormat::File,
786        };
787
788        let now = SystemTime::UNIX_EPOCH;
789
790        let store = MockEventCacheStoreMedia::default();
791        let service = MediaService::with_time_provider(MockTimeProvider::new(now));
792
793        // By default an empty policy is used.
794        assert!(!service.media_retention_policy().has_limitations());
795        service.restore(None, None);
796        assert!(!service.media_retention_policy().has_limitations());
797        assert!(!store.accessed());
798
799        // Add media.
800        service
801            .add_media_content(&store, &request, content.to_vec(), IgnoreMediaRetentionPolicy::No)
802            .await
803            .unwrap();
804        assert!(store.accessed());
805
806        let media_content = store.inner().media_list[0].clone();
807        assert_eq!(media_content.uri, uri);
808        assert_eq!(media_content.content, content);
809        assert!(!media_content.ignore_policy);
810        assert_eq!(media_content.last_access, now);
811
812        let now = now + Duration::from_secs(60);
813        service.inner.time_provider.set_now(now);
814        store.reset_accessed();
815
816        // Get media from request.
817        let loaded_content = service.get_media_content(&store, &request).await.unwrap();
818        assert!(store.accessed());
819        assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
820
821        // The last access time was updated.
822        let media = store.inner().media_list[0].clone();
823        assert_eq!(media.last_access, now);
824
825        let now = now + Duration::from_secs(60);
826        service.inner.time_provider.set_now(now);
827        store.reset_accessed();
828
829        // Get media from URI.
830        let loaded_content = service.get_media_content_for_uri(&store, uri).await.unwrap();
831        assert!(store.accessed());
832        assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
833
834        // The last access time was updated.
835        let media = store.inner().media_list[0].clone();
836        assert_eq!(media.last_access, now);
837
838        // Update ignore_policy.
839        service
840            .set_ignore_media_retention_policy(&store, &request, IgnoreMediaRetentionPolicy::Yes)
841            .await
842            .unwrap();
843        assert!(store.accessed());
844
845        let media_content = store.inner().media_list[0].clone();
846        assert!(media_content.ignore_policy);
847
848        // Try a cleanup. With the empty policy the store should not be accessed.
849        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
850        store.reset_accessed();
851
852        service.clean_up_media_cache(&store).await.unwrap();
853        assert!(!store.accessed());
854        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
855    }
856
857    #[async_test]
858    async fn test_media_service_non_empty_policy() {
859        // Content of less than 32 bytes.
860        let small_content = b"some text content";
861        let small_uri = mxc_uri!("mxc://server.local/small");
862        let small_request = MediaRequestParameters {
863            source: MediaSource::Plain(small_uri.to_owned()),
864            format: MediaFormat::File,
865        };
866
867        // Content of more than 32 bytes.
868        let big_content = b"some much much larger text content";
869        let big_uri = mxc_uri!("mxc://server.local/big");
870        let big_request = MediaRequestParameters {
871            source: MediaSource::Plain(big_uri.to_owned()),
872            format: MediaFormat::File,
873        };
874
875        // Limit the file size to 32 bytes in the retention policy.
876        let policy = MediaRetentionPolicy { max_file_size: Some(32), ..Default::default() };
877
878        let now = SystemTime::UNIX_EPOCH;
879
880        let store = MockEventCacheStoreMedia::default();
881        let service = MediaService::with_time_provider(MockTimeProvider::new(now));
882
883        // Check that restoring the policy works.
884        service.restore(Some(MediaRetentionPolicy::default()), None);
885        assert_eq!(service.media_retention_policy(), MediaRetentionPolicy::default());
886        assert!(!store.accessed());
887
888        // Set the media retention policy.
889        service.set_media_retention_policy(&store, policy).await.unwrap();
890        assert!(store.accessed());
891        assert_eq!(service.media_retention_policy(), policy);
892        assert_eq!(store.inner().media_retention_policy, Some(policy));
893
894        store.reset_accessed();
895
896        // Add small media, it should work because its size is lower than the max file
897        // size.
898        service
899            .add_media_content(
900                &store,
901                &small_request,
902                small_content.to_vec(),
903                IgnoreMediaRetentionPolicy::No,
904            )
905            .await
906            .unwrap();
907        assert!(store.accessed());
908
909        let media_content = store.inner().media_list[0].clone();
910        assert_eq!(media_content.uri, small_uri);
911        assert_eq!(media_content.content, small_content);
912        assert!(!media_content.ignore_policy);
913        assert_eq!(media_content.last_access, now);
914
915        let now = now + Duration::from_secs(60);
916        service.inner.time_provider.set_now(now);
917        store.reset_accessed();
918
919        // Get media from request.
920        let loaded_content = service.get_media_content(&store, &small_request).await.unwrap();
921        assert!(store.accessed());
922        assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
923
924        // The last access time was updated.
925        let media = store.inner().media_list[0].clone();
926        assert_eq!(media.last_access, now);
927
928        let now = now + Duration::from_secs(60);
929        service.inner.time_provider.set_now(now);
930        store.reset_accessed();
931
932        // Get media from URI.
933        let loaded_content = service.get_media_content_for_uri(&store, small_uri).await.unwrap();
934        assert!(store.accessed());
935        assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
936
937        // The last access time was updated.
938        let media = store.inner().media_list[0].clone();
939        assert_eq!(media.last_access, now);
940
941        let now = now + Duration::from_secs(60);
942        service.inner.time_provider.set_now(now);
943        store.reset_accessed();
944
945        // Add big media, it will not work because it is bigger than the max file size.
946        service
947            .add_media_content(
948                &store,
949                &big_request,
950                big_content.to_vec(),
951                IgnoreMediaRetentionPolicy::No,
952            )
953            .await
954            .unwrap();
955        assert!(!store.accessed());
956        assert_eq!(store.inner().media_list.len(), 1);
957
958        store.reset_accessed();
959
960        let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
961        assert!(store.accessed());
962        assert_eq!(loaded_content, None);
963
964        store.reset_accessed();
965
966        let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
967        assert!(store.accessed());
968        assert_eq!(loaded_content, None);
969
970        // Add big media, but this time ignore the policy.
971        service
972            .add_media_content(
973                &store,
974                &big_request,
975                big_content.to_vec(),
976                IgnoreMediaRetentionPolicy::Yes,
977            )
978            .await
979            .unwrap();
980        assert!(store.accessed());
981        assert_eq!(store.inner().media_list.len(), 2);
982
983        store.reset_accessed();
984
985        // Get media from request.
986        let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
987        assert!(store.accessed());
988        assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
989
990        // The last access time was updated.
991        let media = store.inner().media_list[1].clone();
992        assert_eq!(media.last_access, now);
993
994        let now = now + Duration::from_secs(60);
995        service.inner.time_provider.set_now(now);
996        store.reset_accessed();
997
998        // Get media from URI.
999        let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
1000        assert!(store.accessed());
1001        assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
1002
1003        // The last access time was updated.
1004        let media = store.inner().media_list[1].clone();
1005        assert_eq!(media.last_access, now);
1006
1007        // Try a cleanup, the store should be accessed.
1008        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
1009
1010        let now = now + Duration::from_secs(60);
1011        service.inner.time_provider.set_now(now);
1012        store.reset_accessed();
1013
1014        service.clean_up_media_cache(&store).await.unwrap();
1015        assert!(store.accessed());
1016        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
1017    }
1018
1019    #[async_test]
1020    async fn test_media_service_automatic_cleanup() {
1021        // 64 bytes content.
1022        let content = vec![0; 64];
1023
1024        let uri_1 = mxc_uri!("mxc://localhost/media-1");
1025        let request_1 = MediaRequestParameters {
1026            source: MediaSource::Plain(uri_1.to_owned()),
1027            format: MediaFormat::File,
1028        };
1029        let uri_2 = mxc_uri!("mxc://localhost/media-2");
1030        let request_2 = MediaRequestParameters {
1031            source: MediaSource::Plain(uri_2.to_owned()),
1032            format: MediaFormat::File,
1033        };
1034
1035        let now = SystemTime::UNIX_EPOCH;
1036
1037        let store = MockEventCacheStoreMedia::default();
1038        let service = MediaService::with_time_provider(MockTimeProvider::new(now));
1039
1040        // Set an empty policy.
1041        let policy = MediaRetentionPolicy::empty();
1042        service.set_media_retention_policy(&store, policy).await.unwrap();
1043
1044        // Add the contents.
1045        service
1046            .add_media_content(&store, &request_1, content.clone(), IgnoreMediaRetentionPolicy::No)
1047            .await
1048            .unwrap();
1049        service
1050            .add_media_content(&store, &request_2, content, IgnoreMediaRetentionPolicy::No)
1051            .await
1052            .unwrap();
1053        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1054
1055        // Try to launch an automatic cleanup.
1056        let now = now + Duration::from_secs(60);
1057        service.inner.time_provider.set_now(now);
1058        service.maybe_spawn_automatic_media_cache_cleanup(&store, now);
1059
1060        // No cleanup was spawned since automatic cleanups are disabled.
1061        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1062
1063        // Set a policy with automatic cleanup every hour.
1064        let policy = MediaRetentionPolicy::empty()
1065            .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)));
1066        let now = now + Duration::from_secs(60);
1067        service.inner.time_provider.set_now(now);
1068        service.set_media_retention_policy(&store, policy).await.unwrap();
1069
1070        // No cleanup was spawned since the policy has no limitations.
1071        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1072
1073        // Set a policy with automatic cleanup every hour and a max file size.
1074        let policy = MediaRetentionPolicy::empty()
1075            .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)))
1076            .with_max_file_size(Some(512));
1077        let now = now + Duration::from_secs(60);
1078        service.inner.time_provider.set_now(now);
1079        service.set_media_retention_policy(&store, policy).await.unwrap();
1080
1081        // A cleanup was spawned since there was no last_media_cleanup_time.
1082        let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
1083        join_handle.await.unwrap();
1084
1085        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
1086
1087        // Try again one minute in the future, nothing is spawned because we need to
1088        // wait for one hour.
1089        let now = now + Duration::from_secs(60);
1090        service.inner.time_provider.set_now(now);
1091        service.get_media_content(&store, &request_1).await.unwrap();
1092
1093        assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1094
1095        // Try again 2 hours in the future, another cleanup is spawned.
1096        let now = now + Duration::from_secs(2 * 60 * 60);
1097        service.inner.time_provider.set_now(now);
1098        service.get_media_content_for_uri(&store, uri_1).await.unwrap();
1099
1100        let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
1101        join_handle.await.unwrap();
1102
1103        assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
1104    }
1105}