1use std::sync::Arc;
16
17use matrix_sdk_common::{
18 SendOutsideWasm, SyncOutsideWasm,
19 executor::{JoinHandle, spawn},
20 locks::Mutex,
21};
22use ruma::{MxcUri, time::SystemTime};
23use tokio::sync::Mutex as AsyncMutex;
24use tracing::error;
25
26use super::{MediaRetentionPolicy, MediaStoreInner};
27use crate::media::MediaRequestParameters;
28
29#[derive(Debug)]
34pub struct MediaService<Time: TimeProvider = DefaultTimeProvider> {
35 inner: Arc<MediaServiceInner<Time>>,
36}
37
38#[derive(Debug)]
39struct MediaServiceInner<Time: TimeProvider = DefaultTimeProvider> {
40 time_provider: Time,
42
43 policy: Mutex<MediaRetentionPolicy>,
45
46 cleanup_guard: AsyncMutex<()>,
48
49 last_media_cleanup_time: Mutex<Option<SystemTime>>,
51
52 automatic_media_cleanup_join_handle: Mutex<Option<JoinHandle<()>>>,
57}
58
59impl MediaService {
60 pub fn new() -> Self {
65 Self::default()
66 }
67}
68
69impl Default for MediaService {
70 fn default() -> Self {
71 Self::with_time_provider(DefaultTimeProvider)
72 }
73}
74
75impl<Time> MediaService<Time>
76where
77 Time: TimeProvider + 'static,
78{
79 fn with_time_provider(time_provider: Time) -> Self {
82 let inner = MediaServiceInner {
83 time_provider,
84 policy: Mutex::new(MediaRetentionPolicy::empty()),
85 cleanup_guard: AsyncMutex::new(()),
86 last_media_cleanup_time: Mutex::new(None),
87 automatic_media_cleanup_join_handle: Mutex::new(None),
88 };
89
90 Self { inner: Arc::new(inner) }
91 }
92
93 pub fn restore(
102 &self,
103 policy: Option<MediaRetentionPolicy>,
104 last_media_cleanup_time: Option<SystemTime>,
105 ) {
106 if let Some(policy) = policy {
107 *self.inner.policy.lock() = policy;
108 }
109
110 if let Some(time) = last_media_cleanup_time {
111 *self.inner.last_media_cleanup_time.lock() = Some(time);
112 }
113 }
114
115 fn now(&self) -> SystemTime {
117 self.inner.time_provider.now()
118 }
119
120 pub async fn set_media_retention_policy<Store: MediaStoreInner + 'static>(
128 &self,
129 store: &Store,
130 policy: MediaRetentionPolicy,
131 ) -> Result<(), Store::Error> {
132 store.set_media_retention_policy_inner(policy).await?;
133
134 *self.inner.policy.lock() = policy;
135
136 self.maybe_spawn_automatic_media_cache_cleanup(store, self.now());
137
138 Ok(())
139 }
140
141 pub fn media_retention_policy(&self) -> MediaRetentionPolicy {
143 *self.inner.policy.lock()
144 }
145
146 pub async fn add_media_content<Store: MediaStoreInner + 'static>(
159 &self,
160 store: &Store,
161 request: &MediaRequestParameters,
162 content: Vec<u8>,
163 ignore_policy: IgnoreMediaRetentionPolicy,
164 ) -> Result<(), Store::Error> {
165 let policy = self.media_retention_policy();
166
167 if ignore_policy == IgnoreMediaRetentionPolicy::No
168 && policy.exceeds_max_file_size(content.len() as u64)
169 {
170 return Ok(());
172 }
173
174 let current_time = self.now();
175 store
176 .add_media_content_inner(request, content, current_time, policy, ignore_policy)
177 .await?;
178
179 self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
180
181 Ok(())
182 }
183
184 pub async fn set_ignore_media_retention_policy<Store: MediaStoreInner>(
198 &self,
199 store: &Store,
200 request: &MediaRequestParameters,
201 ignore_policy: IgnoreMediaRetentionPolicy,
202 ) -> Result<(), Store::Error> {
203 store.set_ignore_media_retention_policy_inner(request, ignore_policy).await
204 }
205
206 pub async fn get_media_content<Store: MediaStoreInner + 'static>(
214 &self,
215 store: &Store,
216 request: &MediaRequestParameters,
217 ) -> Result<Option<Vec<u8>>, Store::Error> {
218 let current_time = self.now();
219 let content = store.get_media_content_inner(request, current_time).await?;
220
221 self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
222
223 Ok(content)
224 }
225
226 pub async fn get_media_content_for_uri<Store: MediaStoreInner + 'static>(
235 &self,
236 store: &Store,
237 uri: &MxcUri,
238 ) -> Result<Option<Vec<u8>>, Store::Error> {
239 let current_time = self.now();
240 let content = store.get_media_content_for_uri_inner(uri, current_time).await?;
241
242 self.maybe_spawn_automatic_media_cache_cleanup(store, current_time);
243
244 Ok(content)
245 }
246
247 pub async fn clean<Store: MediaStoreInner>(&self, store: &Store) -> Result<(), Store::Error> {
255 self.clean_inner(store, self.now()).await
256 }
257
258 async fn clean_inner<Store: MediaStoreInner>(
259 &self,
260 store: &Store,
261 current_time: SystemTime,
262 ) -> Result<(), Store::Error> {
263 let Ok(_guard) = self.inner.cleanup_guard.try_lock() else {
264 return Ok(());
266 };
267
268 let policy = self.media_retention_policy();
269
270 if !policy.has_limitations() {
271 return Ok(());
273 }
274
275 store.clean_inner(policy, current_time).await?;
276
277 *self.inner.last_media_cleanup_time.lock() = Some(current_time);
278
279 Ok(())
280 }
281
282 fn maybe_spawn_automatic_media_cache_cleanup<Store: MediaStoreInner + 'static>(
290 &self,
291 store: &Store,
292 current_time: SystemTime,
293 ) {
294 let mut join_handle = self.inner.automatic_media_cleanup_join_handle.lock();
295
296 if join_handle.as_ref().is_some_and(|join_handle| !join_handle.is_finished()) {
297 return;
299 }
300
301 let policy = self.media_retention_policy();
302 if policy.cleanup_frequency.is_none() || !policy.has_limitations() {
303 return;
305 }
306
307 let last_media_cleanup_time = *self.inner.last_media_cleanup_time.lock();
308 if last_media_cleanup_time.is_some_and(|last_cleanup_time| {
309 !policy.should_clean_up(current_time, last_cleanup_time)
310 }) {
311 return;
313 }
314
315 let this = self.clone();
316 let store = store.clone();
317
318 let handle = spawn(async move {
319 if let Err(error) = this.clean_inner(&store, current_time).await {
320 error!("Failed to run automatic media cache cleanup: {error}");
321 }
322 });
323
324 *join_handle = Some(handle);
325 }
326}
327
328impl<Time> Clone for MediaService<Time>
329where
330 Time: TimeProvider,
331{
332 fn clone(&self) -> Self {
333 Self { inner: self.inner.clone() }
334 }
335}
336
337impl<Time> Drop for MediaServiceInner<Time>
338where
339 Time: TimeProvider,
340{
341 fn drop(&mut self) {
342 if let Some(join_handle) = self.automatic_media_cleanup_join_handle.lock().take() {
343 join_handle.abort();
344 }
345 }
346}
347
348#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub enum IgnoreMediaRetentionPolicy {
360 Yes,
370
371 No,
374}
375
376impl IgnoreMediaRetentionPolicy {
377 pub fn is_yes(self) -> bool {
379 matches!(self, Self::Yes)
380 }
381}
382
383pub trait TimeProvider: SendOutsideWasm + SyncOutsideWasm {
386 fn now(&self) -> SystemTime;
388}
389
390#[derive(Debug)]
392pub struct DefaultTimeProvider;
393
394impl TimeProvider for DefaultTimeProvider {
395 fn now(&self) -> SystemTime {
396 SystemTime::now()
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::{
403 fmt,
404 sync::{Arc, MutexGuard},
405 };
406
407 use async_trait::async_trait;
408 use matrix_sdk_common::locks::Mutex;
409 use matrix_sdk_test::async_test;
410 use ruma::{
411 MxcUri, OwnedMxcUri,
412 events::room::MediaSource,
413 mxc_uri,
414 time::{Duration, SystemTime},
415 };
416
417 use super::{
418 IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStoreInner,
419 TimeProvider,
420 };
421 use crate::media::{MediaFormat, MediaRequestParameters, UniqueKey, store::MediaStoreError};
422
423 #[derive(Debug, Default, Clone)]
424 struct MockMediaStoreInner {
425 inner: Arc<Mutex<MockMediaStoreInnerInner>>,
426 }
427
428 impl MockMediaStoreInner {
429 fn accessed(&self) -> bool {
431 self.inner.lock().accessed
432 }
433
434 fn reset_accessed(&self) {
436 self.inner.lock().accessed = false;
437 }
438
439 fn inner(&self) -> MutexGuard<'_, MockMediaStoreInnerInner> {
444 let mut inner = self.inner.lock();
445 inner.accessed = true;
446 inner
447 }
448 }
449
450 #[derive(Debug, Default)]
451 struct MockMediaStoreInnerInner {
452 accessed: bool,
456
457 media_retention_policy: Option<MediaRetentionPolicy>,
459
460 media_list: Vec<MediaContent>,
462
463 cleanup_time: Option<SystemTime>,
465 }
466
467 #[derive(Debug, Clone)]
468 struct MediaContent {
469 key: String,
471
472 uri: OwnedMxcUri,
474
475 content: Vec<u8>,
477
478 ignore_policy: bool,
481
482 last_access: SystemTime,
484 }
485
486 #[derive(Debug)]
487 struct MockMediaStoreInnerError;
488
489 impl fmt::Display for MockMediaStoreInnerError {
490 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
491 write!(f, "MockMediaStoreInnerError")
492 }
493 }
494
495 impl std::error::Error for MockMediaStoreInnerError {}
496
497 impl From<MockMediaStoreInnerError> for MediaStoreError {
498 fn from(value: MockMediaStoreInnerError) -> Self {
499 Self::backend(value)
500 }
501 }
502
503 #[cfg_attr(target_family = "wasm", async_trait(?Send))]
504 #[cfg_attr(not(target_family = "wasm"), async_trait)]
505 impl MediaStoreInner for MockMediaStoreInner {
506 type Error = MockMediaStoreInnerError;
507
508 async fn media_retention_policy_inner(
509 &self,
510 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
511 Ok(self.inner().media_retention_policy)
512 }
513
514 async fn set_media_retention_policy_inner(
515 &self,
516 policy: MediaRetentionPolicy,
517 ) -> Result<(), Self::Error> {
518 self.inner().media_retention_policy = Some(policy);
519 Ok(())
520 }
521
522 async fn add_media_content_inner(
523 &self,
524 request: &MediaRequestParameters,
525 content: Vec<u8>,
526 current_time: SystemTime,
527 policy: MediaRetentionPolicy,
528 ignore_policy: IgnoreMediaRetentionPolicy,
529 ) -> Result<(), Self::Error> {
530 let ignore_policy = ignore_policy.is_yes();
531
532 if !ignore_policy && policy.exceeds_max_file_size(content.len() as u64) {
533 return Ok(());
534 }
535
536 let mut inner = self.inner();
537 let key = request.unique_key();
538
539 if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
540 let media_content = &mut inner.media_list[pos];
541 media_content.content = content;
542 media_content.last_access = current_time;
543 media_content.ignore_policy = ignore_policy;
544 } else {
545 inner.media_list.push(MediaContent {
546 key,
547 uri: request.uri().to_owned(),
548 content,
549 ignore_policy,
550 last_access: current_time,
551 });
552 }
553
554 Ok(())
555 }
556
557 async fn set_ignore_media_retention_policy_inner(
558 &self,
559 request: &MediaRequestParameters,
560 ignore_policy: IgnoreMediaRetentionPolicy,
561 ) -> Result<(), Self::Error> {
562 let key = request.unique_key();
563 let mut inner = self.inner();
564
565 if let Some(pos) = inner.media_list.iter().position(|content| content.key == key) {
566 inner.media_list[pos].ignore_policy = ignore_policy.is_yes();
567 }
568
569 Ok(())
570 }
571
572 async fn get_media_content_inner(
573 &self,
574 request: &MediaRequestParameters,
575 current_time: SystemTime,
576 ) -> Result<Option<Vec<u8>>, Self::Error> {
577 let key = request.unique_key();
578 let mut inner = self.inner();
579
580 let Some(media_content) =
581 inner.media_list.iter_mut().find(|content| content.key == key)
582 else {
583 return Ok(None);
584 };
585
586 media_content.last_access = current_time;
587
588 Ok(Some(media_content.content.clone()))
589 }
590
591 async fn get_media_content_for_uri_inner(
592 &self,
593 uri: &MxcUri,
594 current_time: SystemTime,
595 ) -> Result<Option<Vec<u8>>, Self::Error> {
596 let mut inner = self.inner();
597
598 let Some(media_content) =
599 inner.media_list.iter_mut().find(|content| content.uri == uri)
600 else {
601 return Ok(None);
602 };
603
604 media_content.last_access = current_time;
605
606 Ok(Some(media_content.content.clone()))
607 }
608
609 async fn clean_inner(
610 &self,
611 _policy: MediaRetentionPolicy,
612 current_time: SystemTime,
613 ) -> Result<(), Self::Error> {
614 self.inner().cleanup_time = Some(current_time);
617
618 Ok(())
619 }
620
621 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
622 Ok(self.inner().cleanup_time)
623 }
624 }
625
626 #[derive(Debug)]
627 struct MockTimeProvider {
628 now: Mutex<SystemTime>,
629 }
630
631 impl MockTimeProvider {
632 fn new(now: SystemTime) -> Self {
634 Self { now: Mutex::new(now) }
635 }
636
637 fn set_now(&self, now: SystemTime) {
639 *self.now.lock() = now;
640 }
641 }
642
643 impl TimeProvider for MockTimeProvider {
644 fn now(&self) -> SystemTime {
645 *self.now.lock()
646 }
647 }
648
649 #[async_test]
650 async fn test_media_service_empty_policy() {
651 let content = b"some text content";
652 let uri = mxc_uri!("mxc://server.local/AbcDe1234");
653 let request = MediaRequestParameters {
654 source: MediaSource::Plain(uri.to_owned()),
655 format: MediaFormat::File,
656 };
657
658 let now = SystemTime::UNIX_EPOCH;
659
660 let store = MockMediaStoreInner::default();
661 let service = MediaService::with_time_provider(MockTimeProvider::new(now));
662
663 assert!(!service.media_retention_policy().has_limitations());
665 service.restore(None, None);
666 assert!(!service.media_retention_policy().has_limitations());
667 assert!(!store.accessed());
668
669 service
671 .add_media_content(&store, &request, content.to_vec(), IgnoreMediaRetentionPolicy::No)
672 .await
673 .unwrap();
674 assert!(store.accessed());
675
676 let media_content = store.inner().media_list[0].clone();
677 assert_eq!(media_content.uri, uri);
678 assert_eq!(media_content.content, content);
679 assert!(!media_content.ignore_policy);
680 assert_eq!(media_content.last_access, now);
681
682 let now = now + Duration::from_secs(60);
683 service.inner.time_provider.set_now(now);
684 store.reset_accessed();
685
686 let loaded_content = service.get_media_content(&store, &request).await.unwrap();
688 assert!(store.accessed());
689 assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
690
691 let media = store.inner().media_list[0].clone();
693 assert_eq!(media.last_access, now);
694
695 let now = now + Duration::from_secs(60);
696 service.inner.time_provider.set_now(now);
697 store.reset_accessed();
698
699 let loaded_content = service.get_media_content_for_uri(&store, uri).await.unwrap();
701 assert!(store.accessed());
702 assert_eq!(loaded_content.as_deref(), Some(content.as_slice()));
703
704 let media = store.inner().media_list[0].clone();
706 assert_eq!(media.last_access, now);
707
708 service
710 .set_ignore_media_retention_policy(&store, &request, IgnoreMediaRetentionPolicy::Yes)
711 .await
712 .unwrap();
713 assert!(store.accessed());
714
715 let media_content = store.inner().media_list[0].clone();
716 assert!(media_content.ignore_policy);
717
718 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
720 store.reset_accessed();
721
722 service.clean(&store).await.unwrap();
723 assert!(!store.accessed());
724 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
725 }
726
727 #[async_test]
728 async fn test_media_service_non_empty_policy() {
729 let small_content = b"some text content";
731 let small_uri = mxc_uri!("mxc://server.local/small");
732 let small_request = MediaRequestParameters {
733 source: MediaSource::Plain(small_uri.to_owned()),
734 format: MediaFormat::File,
735 };
736
737 let big_content = b"some much much larger text content";
739 let big_uri = mxc_uri!("mxc://server.local/big");
740 let big_request = MediaRequestParameters {
741 source: MediaSource::Plain(big_uri.to_owned()),
742 format: MediaFormat::File,
743 };
744
745 let policy = MediaRetentionPolicy { max_file_size: Some(32), ..Default::default() };
747
748 let now = SystemTime::UNIX_EPOCH;
749
750 let store = MockMediaStoreInner::default();
751 let service = MediaService::with_time_provider(MockTimeProvider::new(now));
752
753 service.restore(Some(MediaRetentionPolicy::default()), None);
755 assert_eq!(service.media_retention_policy(), MediaRetentionPolicy::default());
756 assert!(!store.accessed());
757
758 service.set_media_retention_policy(&store, policy).await.unwrap();
760 assert!(store.accessed());
761 assert_eq!(service.media_retention_policy(), policy);
762 assert_eq!(store.inner().media_retention_policy, Some(policy));
763
764 store.reset_accessed();
765
766 service
769 .add_media_content(
770 &store,
771 &small_request,
772 small_content.to_vec(),
773 IgnoreMediaRetentionPolicy::No,
774 )
775 .await
776 .unwrap();
777 assert!(store.accessed());
778
779 let media_content = store.inner().media_list[0].clone();
780 assert_eq!(media_content.uri, small_uri);
781 assert_eq!(media_content.content, small_content);
782 assert!(!media_content.ignore_policy);
783 assert_eq!(media_content.last_access, now);
784
785 let now = now + Duration::from_secs(60);
786 service.inner.time_provider.set_now(now);
787 store.reset_accessed();
788
789 let loaded_content = service.get_media_content(&store, &small_request).await.unwrap();
791 assert!(store.accessed());
792 assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
793
794 let media = store.inner().media_list[0].clone();
796 assert_eq!(media.last_access, now);
797
798 let now = now + Duration::from_secs(60);
799 service.inner.time_provider.set_now(now);
800 store.reset_accessed();
801
802 let loaded_content = service.get_media_content_for_uri(&store, small_uri).await.unwrap();
804 assert!(store.accessed());
805 assert_eq!(loaded_content.as_deref(), Some(small_content.as_slice()));
806
807 let media = store.inner().media_list[0].clone();
809 assert_eq!(media.last_access, now);
810
811 let now = now + Duration::from_secs(60);
812 service.inner.time_provider.set_now(now);
813 store.reset_accessed();
814
815 service
817 .add_media_content(
818 &store,
819 &big_request,
820 big_content.to_vec(),
821 IgnoreMediaRetentionPolicy::No,
822 )
823 .await
824 .unwrap();
825 assert!(!store.accessed());
826 assert_eq!(store.inner().media_list.len(), 1);
827
828 store.reset_accessed();
829
830 let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
831 assert!(store.accessed());
832 assert_eq!(loaded_content, None);
833
834 store.reset_accessed();
835
836 let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
837 assert!(store.accessed());
838 assert_eq!(loaded_content, None);
839
840 service
842 .add_media_content(
843 &store,
844 &big_request,
845 big_content.to_vec(),
846 IgnoreMediaRetentionPolicy::Yes,
847 )
848 .await
849 .unwrap();
850 assert!(store.accessed());
851 assert_eq!(store.inner().media_list.len(), 2);
852
853 store.reset_accessed();
854
855 let loaded_content = service.get_media_content(&store, &big_request).await.unwrap();
857 assert!(store.accessed());
858 assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
859
860 let media = store.inner().media_list[1].clone();
862 assert_eq!(media.last_access, now);
863
864 let now = now + Duration::from_secs(60);
865 service.inner.time_provider.set_now(now);
866 store.reset_accessed();
867
868 let loaded_content = service.get_media_content_for_uri(&store, big_uri).await.unwrap();
870 assert!(store.accessed());
871 assert_eq!(loaded_content.as_deref(), Some(big_content.as_slice()));
872
873 let media = store.inner().media_list[1].clone();
875 assert_eq!(media.last_access, now);
876
877 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), None);
879
880 let now = now + Duration::from_secs(60);
881 service.inner.time_provider.set_now(now);
882 store.reset_accessed();
883
884 service.clean(&store).await.unwrap();
885 assert!(store.accessed());
886 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
887 }
888
889 #[async_test]
890 async fn test_media_service_automatic_cleanup() {
891 let content = vec![0; 64];
893
894 let uri_1 = mxc_uri!("mxc://localhost/media-1");
895 let request_1 = MediaRequestParameters {
896 source: MediaSource::Plain(uri_1.to_owned()),
897 format: MediaFormat::File,
898 };
899 let uri_2 = mxc_uri!("mxc://localhost/media-2");
900 let request_2 = MediaRequestParameters {
901 source: MediaSource::Plain(uri_2.to_owned()),
902 format: MediaFormat::File,
903 };
904
905 let now = SystemTime::UNIX_EPOCH;
906
907 let store = MockMediaStoreInner::default();
908 let service = MediaService::with_time_provider(MockTimeProvider::new(now));
909
910 let policy = MediaRetentionPolicy::empty();
912 service.set_media_retention_policy(&store, policy).await.unwrap();
913
914 service
916 .add_media_content(&store, &request_1, content.clone(), IgnoreMediaRetentionPolicy::No)
917 .await
918 .unwrap();
919 service
920 .add_media_content(&store, &request_2, content, IgnoreMediaRetentionPolicy::No)
921 .await
922 .unwrap();
923 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
924
925 let now = now + Duration::from_secs(60);
927 service.inner.time_provider.set_now(now);
928 service.maybe_spawn_automatic_media_cache_cleanup(&store, now);
929
930 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
932
933 let policy = MediaRetentionPolicy::empty()
935 .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)));
936 let now = now + Duration::from_secs(60);
937 service.inner.time_provider.set_now(now);
938 service.set_media_retention_policy(&store, policy).await.unwrap();
939
940 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
942
943 let policy = MediaRetentionPolicy::empty()
945 .with_cleanup_frequency(Some(Duration::from_secs(60 * 60)))
946 .with_max_file_size(Some(512));
947 let now = now + Duration::from_secs(60);
948 service.inner.time_provider.set_now(now);
949 service.set_media_retention_policy(&store, policy).await.unwrap();
950
951 let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
953 join_handle.await.unwrap();
954
955 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
956
957 let now = now + Duration::from_secs(60);
960 service.inner.time_provider.set_now(now);
961 service.get_media_content(&store, &request_1).await.unwrap();
962
963 assert!(service.inner.automatic_media_cleanup_join_handle.lock().is_none());
964
965 let now = now + Duration::from_secs(2 * 60 * 60);
967 service.inner.time_provider.set_now(now);
968 service.get_media_content_for_uri(&store, uri_1).await.unwrap();
969
970 let join_handle = service.inner.automatic_media_cleanup_join_handle.lock().take().unwrap();
971 join_handle.await.unwrap();
972
973 assert_eq!(store.last_media_cleanup_time_inner().await.unwrap(), Some(now));
974 }
975}