1use std::{fmt, sync::Arc};
16
17use async_trait::async_trait;
18use matrix_sdk_common::{
19 executor::{spawn, JoinHandle},
20 locks::Mutex,
21 AsyncTraitDeps, SendOutsideWasm, SyncOutsideWasm,
22};
23use ruma::{time::SystemTime, MxcUri};
24use tokio::sync::Mutex as AsyncMutex;
25use tracing::error;
26
27use super::MediaRetentionPolicy;
28use crate::{event_cache::store::EventCacheStoreError, media::MediaRequestParameters};
29
30#[derive(Debug)]
35pub struct MediaService<Time: TimeProvider = DefaultTimeProvider> {
36 inner: Arc<MediaServiceInner<Time>>,
37}
38
39#[derive(Debug)]
40struct MediaServiceInner<Time: TimeProvider = DefaultTimeProvider> {
41 time_provider: Time,
43
44 policy: Mutex<MediaRetentionPolicy>,
46
47 cleanup_guard: AsyncMutex<()>,
49
50 last_media_cleanup_time: Mutex<Option<SystemTime>>,
52
53 automatic_media_cleanup_join_handle: Mutex<Option<JoinHandle<()>>>,
58}
59
60impl MediaService {
61 pub fn new() -> Self {
66 Self::default()
67 }
68}
69
70impl Default for MediaService {
71 fn default() -> Self {
72 Self::with_time_provider(DefaultTimeProvider)
73 }
74}
75
76impl<Time> MediaService<Time>
77where
78 Time: TimeProvider + 'static,
79{
80 fn with_time_provider(time_provider: Time) -> Self {
83 let inner = MediaServiceInner {
84 time_provider,
85 policy: Mutex::new(MediaRetentionPolicy::empty()),
86 cleanup_guard: AsyncMutex::new(()),
87 last_media_cleanup_time: Mutex::new(None),
88 automatic_media_cleanup_join_handle: Mutex::new(None),
89 };
90
91 Self { inner: Arc::new(inner) }
92 }
93
94 pub fn restore(
103 &self,
104 policy: Option<MediaRetentionPolicy>,
105 last_media_cleanup_time: Option<SystemTime>,
106 ) {
107 if let Some(policy) = policy {
108 *self.inner.policy.lock() = policy;
109 }
110
111 if let Some(time) = last_media_cleanup_time {
112 *self.inner.last_media_cleanup_time.lock() = Some(time);
113 }
114 }
115
116 fn now(&self) -> SystemTime {
118 self.inner.time_provider.now()
119 }
120
121 pub async fn set_media_retention_policy<Store: EventCacheStoreMedia + 'static>(
129 &self,
130 store: &Store,
131 policy: MediaRetentionPolicy,
132 ) -> Result<(), Store::Error> {
133 store.set_media_retention_policy_inner(policy).await?;
134
135 *self.inner.policy.lock() = policy;
136
137 self.maybe_spawn_automatic_media_cache_cleanup(store, self.now());
138
139 Ok(())
140 }
141
142 pub fn media_retention_policy(&self) -> MediaRetentionPolicy {
144 *self.inner.policy.lock()
145 }
146
147 pub async fn add_media_content<Store: EventCacheStoreMedia + 'static>(
160 &self,
161 store: &Store,
162 request: &MediaRequestParameters,
163 content: Vec<u8>,
164 ignore_policy: IgnoreMediaRetentionPolicy,
165 ) -> Result<(), Store::Error> {
166 let policy = self.media_retention_policy();
167
168 if ignore_policy == IgnoreMediaRetentionPolicy::No
169 && policy.exceeds_max_file_size(content.len() as u64)
170 {
171 return Ok(());
173 }
174
175 let current_time = self.now();
176 store
177 .add_media_content_inner(request, content, current_time, policy, ignore_policy)
178 .await?;
179
180 self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
181
182 Ok(())
183 }
184
185 pub async fn set_ignore_media_retention_policy<Store: EventCacheStoreMedia>(
199 &self,
200 store: &Store,
201 request: &MediaRequestParameters,
202 ignore_policy: IgnoreMediaRetentionPolicy,
203 ) -> Result<(), Store::Error> {
204 store.set_ignore_media_retention_policy_inner(request, ignore_policy).await
205 }
206
207 pub async fn get_media_content<Store: EventCacheStoreMedia + 'static>(
215 &self,
216 store: &Store,
217 request: &MediaRequestParameters,
218 ) -> Result<Option<Vec<u8>>, Store::Error> {
219 let current_time = self.now();
220 let content = store.get_media_content_inner(request, current_time).await?;
221
222 self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
223
224 Ok(content)
225 }
226
227 pub async fn get_media_content_for_uri<Store: EventCacheStoreMedia + 'static>(
236 &self,
237 store: &Store,
238 uri: &MxcUri,
239 ) -> Result<Option<Vec<u8>>, Store::Error> {
240 let current_time = self.now();
241 let content = store.get_media_content_for_uri_inner(uri, current_time).await?;
242
243 self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
244
245 Ok(content)
246 }
247
248 pub async fn clean_up_media_cache<Store: EventCacheStoreMedia>(
256 &self,
257 store: &Store,
258 ) -> Result<(), Store::Error> {
259 self.clean_up_media_cache_inner(store, self.now()).await
260 }
261
262 async fn clean_up_media_cache_inner<Store: EventCacheStoreMedia>(
263 &self,
264 store: &Store,
265 current_time: SystemTime,
266 ) -> Result<(), Store::Error> {
267 let Ok(_guard) = self.inner.cleanup_guard.try_lock() else {
268 return Ok(());
270 };
271
272 let policy = self.media_retention_policy();
273
274 if !policy.has_limitations() {
275 return Ok(());
277 }
278
279 store.clean_up_media_cache_inner(policy, current_time).await?;
280
281 *self.inner.last_media_cleanup_time.lock() = Some(current_time);
282
283 Ok(())
284 }
285
286 fn maybe_spawn_automatic_media_cache_cleanup<Store: EventCacheStoreMedia + 'static>(
294 &self,
295 store: &Store,
296 current_time: SystemTime,
297 ) {
298 let mut join_handle = self.inner.automatic_media_cleanup_join_handle.lock();
299
300 if join_handle.as_ref().is_some_and(|join_handle| !join_handle.is_finished()) {
301 return;
303 }
304
305 let policy = self.media_retention_policy();
306 if policy.cleanup_frequency.is_none() || !policy.has_limitations() {
307 return;
309 }
310
311 let last_media_cleanup_time = *self.inner.last_media_cleanup_time.lock();
312 if last_media_cleanup_time.is_some_and(|last_cleanup_time| {
313 !policy.should_clean_up(current_time, last_cleanup_time)
314 }) {
315 return;
317 }
318
319 let this = self.clone();
320 let store = store.clone();
321
322 let handle = spawn(async move {
323 if let Err(error) = this.clean_up_media_cache_inner(&store, current_time).await {
324 error!("Failed to run automatic media cache cleanup: {error}");
325 }
326 });
327
328 *join_handle = Some(handle);
329 }
330}
331
332impl<Time> Clone for MediaService<Time>
333where
334 Time: TimeProvider,
335{
336 fn clone(&self) -> Self {
337 Self { inner: self.inner.clone() }
338 }
339}
340
341impl<Time> Drop for MediaServiceInner<Time>
342where
343 Time: TimeProvider,
344{
345 fn drop(&mut self) {
346 if let Some(join_handle) = self.automatic_media_cleanup_join_handle.lock().take() {
347 join_handle.abort();
348 }
349 }
350}
351
352#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
360#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
361pub trait EventCacheStoreMedia: AsyncTraitDeps + Clone {
362 type Error: fmt::Debug + fmt::Display + Into<EventCacheStoreError>;
364
365 async fn media_retention_policy_inner(
367 &self,
368 ) -> Result<Option<MediaRetentionPolicy>, Self::Error>;
369
370 async fn set_media_retention_policy_inner(
376 &self,
377 policy: MediaRetentionPolicy,
378 ) -> Result<(), Self::Error>;
379
380 async fn add_media_content_inner(
398 &self,
399 request: &MediaRequestParameters,
400 content: Vec<u8>,
401 current_time: SystemTime,
402 policy: MediaRetentionPolicy,
403 ignore_policy: IgnoreMediaRetentionPolicy,
404 ) -> Result<(), Self::Error>;
405
406 async fn set_ignore_media_retention_policy_inner(
420 &self,
421 request: &MediaRequestParameters,
422 ignore_policy: IgnoreMediaRetentionPolicy,
423 ) -> Result<(), Self::Error>;
424
425 async fn get_media_content_inner(
434 &self,
435 request: &MediaRequestParameters,
436 current_time: SystemTime,
437 ) -> Result<Option<Vec<u8>>, Self::Error>;
438
439 async fn get_media_content_for_uri_inner(
449 &self,
450 uri: &MxcUri,
451 current_time: SystemTime,
452 ) -> Result<Option<Vec<u8>>, Self::Error>;
453
454 async fn clean_up_media_cache_inner(
469 &self,
470 policy: MediaRetentionPolicy,
471 current_time: SystemTime,
472 ) -> Result<(), Self::Error>;
473
474 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error>;
476}
477
478#[derive(Debug, Clone, Copy, PartialEq, Eq)]
489pub enum IgnoreMediaRetentionPolicy {
490 Yes,
500
501 No,
504}
505
506impl IgnoreMediaRetentionPolicy {
507 pub fn is_yes(self) -> bool {
509 matches!(self, Self::Yes)
510 }
511}
512
513pub trait TimeProvider: SendOutsideWasm + SyncOutsideWasm {
516 fn now(&self) -> SystemTime;
518}
519
520#[derive(Debug)]
522pub struct DefaultTimeProvider;
523
524impl TimeProvider for DefaultTimeProvider {
525 fn now(&self) -> SystemTime {
526 SystemTime::now()
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use std::{
533 fmt,
534 sync::{Arc, MutexGuard},
535 };
536
537 use async_trait::async_trait;
538 use matrix_sdk_common::locks::Mutex;
539 use matrix_sdk_test::async_test;
540 use ruma::{
541 events::room::MediaSource,
542 mxc_uri,
543 time::{Duration, SystemTime},
544 MxcUri, OwnedMxcUri,
545 };
546
547 use super::{EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaService, TimeProvider};
548 use crate::{
549 event_cache::store::{media::MediaRetentionPolicy, EventCacheStoreError},
550 media::{MediaFormat, MediaRequestParameters, UniqueKey},
551 };
552
553 #[derive(Debug, Default, Clone)]
554 struct MockEventCacheStoreMedia {
555 inner: Arc<Mutex<MockEventCacheStoreMediaInner>>,
556 }
557
558 impl MockEventCacheStoreMedia {
559 fn accessed(&self) -> bool {
561 self.inner.lock().accessed
562 }
563
564 fn reset_accessed(&self) {
566 self.inner.lock().accessed = false;
567 }
568
569 fn inner(&self) -> MutexGuard<'_, MockEventCacheStoreMediaInner> {
574 let mut inner = self.inner.lock();
575 inner.accessed = true;
576 inner
577 }
578 }
579
580 #[derive(Debug, Default)]
581 struct MockEventCacheStoreMediaInner {
582 accessed: bool,
586
587 media_retention_policy: Option<MediaRetentionPolicy>,
589
590 media_list: Vec<MediaContent>,
592
593 cleanup_time: Option<SystemTime>,
595 }
596
597 #[derive(Debug, Clone)]
598 struct MediaContent {
599 key: String,
601
602 uri: OwnedMxcUri,
604
605 content: Vec<u8>,
607
608 ignore_policy: bool,
611
612 last_access: SystemTime,
614 }
615
616 #[derive(Debug)]
617 struct MockEventCacheStoreMediaError;
618
619 impl fmt::Display for MockEventCacheStoreMediaError {
620 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621 write!(f, "MockEventCacheStoreMediaError")
622 }
623 }
624
625 impl std::error::Error for MockEventCacheStoreMediaError {}
626
627 impl From<MockEventCacheStoreMediaError> for EventCacheStoreError {
628 fn from(value: MockEventCacheStoreMediaError) -> Self {
629 Self::backend(value)
630 }
631 }
632
633 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
634 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
635 impl EventCacheStoreMedia for MockEventCacheStoreMedia {
636 type Error = MockEventCacheStoreMediaError;
637
638 async fn media_retention_policy_inner(
639 &self,
640 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
641 Ok(self.inner().media_retention_policy)
642 }
643
644 async fn set_media_retention_policy_inner(
645 &self,
646 policy: MediaRetentionPolicy,
647 ) -> Result<(), Self::Error> {
648 self.inner().media_retention_policy = Some(policy);
649 Ok(())
650 }
651
652 async fn add_media_content_inner(
653 &self,
654 request: &MediaRequestParameters,
655 content: Vec<u8>,
656 current_time: SystemTime,
657 policy: MediaRetentionPolicy,
658 ignore_policy: IgnoreMediaRetentionPolicy,
659 ) -> Result<(), Self::Error> {
660 let ignore_policy = ignore_policy.is_yes();
661
662 if !ignore_policy && policy.exceeds_max_file_size(content.len() as u64) {
663 return Ok(());
664 }
665
666 let mut inner = self.inner();
667 let key = request.unique_key();
668
669 if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
670 let media_content = &mut inner.media_list[pos];
671 media_content.content = content;
672 media_content.last_access = current_time;
673 media_content.ignore_policy = ignore_policy;
674 } else {
675 inner.media_list.push(MediaContent {
676 key,
677 uri: request.uri().to_owned(),
678 content,
679 ignore_policy,
680 last_access: current_time,
681 });
682 }
683
684 Ok(())
685 }
686
687 async fn set_ignore_media_retention_policy_inner(
688 &self,
689 request: &MediaRequestParameters,
690 ignore_policy: IgnoreMediaRetentionPolicy,
691 ) -> Result<(), Self::Error> {
692 let key = request.unique_key();
693 let mut inner = self.inner();
694
695 if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
696 inner.media_list[pos].ignore_policy = ignore_policy.is_yes();
697 }
698
699 Ok(())
700 }
701
702 async fn get_media_content_inner(
703 &self,
704 request: &MediaRequestParameters,
705 current_time: SystemTime,
706 ) -> Result<Option<Vec<u8>>, Self::Error> {
707 let key = request.unique_key();
708 let mut inner = self.inner();
709
710 let Some(media_content) =
711 inner.media_list.iter_mut().find(|content| content.key == key)
712 else {
713 return Ok(None);
714 };
715
716 media_content.last_access = current_time;
717
718 Ok(Some(media_content.content.clone()))
719 }
720
721 async fn get_media_content_for_uri_inner(
722 &self,
723 uri: &MxcUri,
724 current_time: SystemTime,
725 ) -> Result<Option<Vec<u8>>, Self::Error> {
726 let mut inner = self.inner();
727
728 let Some(media_content) =
729 inner.media_list.iter_mut().find(|content| content.uri == uri)
730 else {
731 return Ok(None);
732 };
733
734 media_content.last_access = current_time;
735
736 Ok(Some(media_content.content.clone()))
737 }
738
739 async fn clean_up_media_cache_inner(
740 &self,
741 _policy: MediaRetentionPolicy,
742 current_time: SystemTime,
743 ) -> Result<(), Self::Error> {
744 self.inner().cleanup_time = Some(current_time);
747
748 Ok(())
749 }
750
751 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
752 Ok(self.inner().cleanup_time)
753 }
754 }
755
756 #[derive(Debug)]
757 struct MockTimeProvider {
758 now: Mutex<SystemTime>,
759 }
760
761 impl MockTimeProvider {
762 fn new(now: SystemTime) -> Self {
764 Self { now: Mutex::new(now) }
765 }
766
767 fn set_now(&self, now: SystemTime) {
769 *self.now.lock() = now;
770 }
771 }
772
773 impl TimeProvider for MockTimeProvider {
774 fn now(&self) -> SystemTime {
775 *self.now.lock()
776 }
777 }
778
779 #[async_test]
780 async fn test_media_service_empty_policy() {
781 let content = b"some text content";
782 let uri = mxc_uri!("mxc://server.local/AbcDe1234");
783 let request = MediaRequestParameters {
784 source: MediaSource::Plain(uri.to_owned()),
785 format: MediaFormat::File,
786 };
787
788 let now = SystemTime::UNIX_EPOCH;
789
790 let store = MockEventCacheStoreMedia::default();
791 let service = MediaService::with_time_provider(MockTimeProvider::new(now));
792
793 assert!(!service.media_retention_policy().has_limitations());
795 service.restore(None, None);
796 assert!(!service.media_retention_policy().has_limitations());
797 assert!(!store.accessed());
798
799 service
801 .add_media_content(&store, &request, content.to_vec(), IgnoreMediaRetentionPolicy::No)
802 .await
803 .unwrap();
804 assert!(store.accessed());
805
806 let media_content = store.inner().media_list[0].clone();
807 assert_eq!(media_content.uri, uri);
808 assert_eq!(media_content.content, content);
809 assert!(!media_content.ignore_policy);
810 assert_eq!(media_content.last_access, now);
811
812 let now = now + Duration::from_secs(60);
813 service.inner.time_provider.set_now(now);
814 store.reset_accessed();
815
816 let loaded_content = service.get_media_content(&store, &request).await.unwrap();
818 assert!(store.accessed());
819 assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
820
821 let media = store.inner().media_list[0].clone();
823 assert_eq!(media.last_access, now);
824
825 let now = now + Duration::from_secs(60);
826 service.inner.time_provider.set_now(now);
827 store.reset_accessed();
828
829 let loaded_content = service.get_media_content_for_uri(&store, uri).await.unwrap();
831 assert!(store.accessed());
832 assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
833
834 let media = store.inner().media_list[0].clone();
836 assert_eq!(media.last_access, now);
837
838 service
840 .set_ignore_media_retention_policy(&store, &request, IgnoreMediaRetentionPolicy::Yes)
841 .await
842 .unwrap();
843 assert!(store.accessed());
844
845 let media_content = store.inner().media_list[0].clone();
846 assert!(media_content.ignore_policy);
847
848 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
850 store.reset_accessed();
851
852 service.clean_up_media_cache(&store).await.unwrap();
853 assert!(!store.accessed());
854 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
855 }
856
857 #[async_test]
858 async fn test_media_service_non_empty_policy() {
859 let small_content = b"some text content";
861 let small_uri = mxc_uri!("mxc://server.local/small");
862 let small_request = MediaRequestParameters {
863 source: MediaSource::Plain(small_uri.to_owned()),
864 format: MediaFormat::File,
865 };
866
867 let big_content = b"some much much larger text content";
869 let big_uri = mxc_uri!("mxc://server.local/big");
870 let big_request = MediaRequestParameters {
871 source: MediaSource::Plain(big_uri.to_owned()),
872 format: MediaFormat::File,
873 };
874
875 let policy = MediaRetentionPolicy { max_file_size: Some(32), ..Default::default() };
877
878 let now = SystemTime::UNIX_EPOCH;
879
880 let store = MockEventCacheStoreMedia::default();
881 let service = MediaService::with_time_provider(MockTimeProvider::new(now));
882
883 service.restore(Some(MediaRetentionPolicy::default()), None);
885 assert_eq!(service.media_retention_policy(), MediaRetentionPolicy::default());
886 assert!(!store.accessed());
887
888 service.set_media_retention_policy(&store, policy).await.unwrap();
890 assert!(store.accessed());
891 assert_eq!(service.media_retention_policy(), policy);
892 assert_eq!(store.inner().media_retention_policy, Some(policy));
893
894 store.reset_accessed();
895
896 service
899 .add_media_content(
900 &store,
901 &small_request,
902 small_content.to_vec(),
903 IgnoreMediaRetentionPolicy::No,
904 )
905 .await
906 .unwrap();
907 assert!(store.accessed());
908
909 let media_content = store.inner().media_list[0].clone();
910 assert_eq!(media_content.uri, small_uri);
911 assert_eq!(media_content.content, small_content);
912 assert!(!media_content.ignore_policy);
913 assert_eq!(media_content.last_access, now);
914
915 let now = now + Duration::from_secs(60);
916 service.inner.time_provider.set_now(now);
917 store.reset_accessed();
918
919 let loaded_content = service.get_media_content(&store, &small_request).await.unwrap();
921 assert!(store.accessed());
922 assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
923
924 let media = store.inner().media_list[0].clone();
926 assert_eq!(media.last_access, now);
927
928 let now = now + Duration::from_secs(60);
929 service.inner.time_provider.set_now(now);
930 store.reset_accessed();
931
932 let loaded_content = service.get_media_content_for_uri(&store, small_uri).await.unwrap();
934 assert!(store.accessed());
935 assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
936
937 let media = store.inner().media_list[0].clone();
939 assert_eq!(media.last_access, now);
940
941 let now = now + Duration::from_secs(60);
942 service.inner.time_provider.set_now(now);
943 store.reset_accessed();
944
945 service
947 .add_media_content(
948 &store,
949 &big_request,
950 big_content.to_vec(),
951 IgnoreMediaRetentionPolicy::No,
952 )
953 .await
954 .unwrap();
955 assert!(!store.accessed());
956 assert_eq!(store.inner().media_list.len(), 1);
957
958 store.reset_accessed();
959
960 let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
961 assert!(store.accessed());
962 assert_eq!(loaded_content, None);
963
964 store.reset_accessed();
965
966 let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
967 assert!(store.accessed());
968 assert_eq!(loaded_content, None);
969
970 service
972 .add_media_content(
973 &store,
974 &big_request,
975 big_content.to_vec(),
976 IgnoreMediaRetentionPolicy::Yes,
977 )
978 .await
979 .unwrap();
980 assert!(store.accessed());
981 assert_eq!(store.inner().media_list.len(), 2);
982
983 store.reset_accessed();
984
985 let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
987 assert!(store.accessed());
988 assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
989
990 let media = store.inner().media_list[1].clone();
992 assert_eq!(media.last_access, now);
993
994 let now = now + Duration::from_secs(60);
995 service.inner.time_provider.set_now(now);
996 store.reset_accessed();
997
998 let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
1000 assert!(store.accessed());
1001 assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
1002
1003 let media = store.inner().media_list[1].clone();
1005 assert_eq!(media.last_access, now);
1006
1007 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
1009
1010 let now = now + Duration::from_secs(60);
1011 service.inner.time_provider.set_now(now);
1012 store.reset_accessed();
1013
1014 service.clean_up_media_cache(&store).await.unwrap();
1015 assert!(store.accessed());
1016 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
1017 }
1018
1019 #[async_test]
1020 async fn test_media_service_automatic_cleanup() {
1021 let content = vec![0; 64];
1023
1024 let uri_1 = mxc_uri!("mxc://localhost/media-1");
1025 let request_1 = MediaRequestParameters {
1026 source: MediaSource::Plain(uri_1.to_owned()),
1027 format: MediaFormat::File,
1028 };
1029 let uri_2 = mxc_uri!("mxc://localhost/media-2");
1030 let request_2 = MediaRequestParameters {
1031 source: MediaSource::Plain(uri_2.to_owned()),
1032 format: MediaFormat::File,
1033 };
1034
1035 let now = SystemTime::UNIX_EPOCH;
1036
1037 let store = MockEventCacheStoreMedia::default();
1038 let service = MediaService::with_time_provider(MockTimeProvider::new(now));
1039
1040 let policy = MediaRetentionPolicy::empty();
1042 service.set_media_retention_policy(&store, policy).await.unwrap();
1043
1044 service
1046 .add_media_content(&store, &request_1, content.clone(), IgnoreMediaRetentionPolicy::No)
1047 .await
1048 .unwrap();
1049 service
1050 .add_media_content(&store, &request_2, content, IgnoreMediaRetentionPolicy::No)
1051 .await
1052 .unwrap();
1053 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1054
1055 let now = now + Duration::from_secs(60);
1057 service.inner.time_provider.set_now(now);
1058 service.maybe_spawn_automatic_media_cache_cleanup(&store, now);
1059
1060 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1062
1063 let policy = MediaRetentionPolicy::empty()
1065 .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)));
1066 let now = now + Duration::from_secs(60);
1067 service.inner.time_provider.set_now(now);
1068 service.set_media_retention_policy(&store, policy).await.unwrap();
1069
1070 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1072
1073 let policy = MediaRetentionPolicy::empty()
1075 .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)))
1076 .with_max_file_size(Some(512));
1077 let now = now + Duration::from_secs(60);
1078 service.inner.time_provider.set_now(now);
1079 service.set_media_retention_policy(&store, policy).await.unwrap();
1080
1081 let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
1083 join_handle.await.unwrap();
1084
1085 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
1086
1087 let now = now + Duration::from_secs(60);
1090 service.inner.time_provider.set_now(now);
1091 service.get_media_content(&store, &request_1).await.unwrap();
1092
1093 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
1094
1095 let now = now + Duration::from_secs(2 * 60 * 60);
1097 service.inner.time_provider.set_now(now);
1098 service.get_media_content_for_uri(&store, uri_1).await.unwrap();
1099
1100 let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
1101 join_handle.await.unwrap();
1102
1103 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
1104 }
1105}