matrix_sdk_sqlite/
media_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`MediaStore`].
16
17use std::{fmt, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    media::{
23        store::{
24            IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
25            MediaStoreInner,
26        },
27        MediaRequestParameters, UniqueKey,
28    },
29    timer,
30};
31use matrix_sdk_store_encryption::StoreCipher;
32use ruma::{time::SystemTime, MilliSecondsSinceUnixEpoch, MxcUri};
33use rusqlite::{params_from_iter, OptionalExtension};
34use tokio::{
35    fs,
36    sync::{Mutex, OwnedMutexGuard},
37};
38use tracing::{debug, instrument, trace};
39
40use crate::{
41    error::{Error, Result},
42    utils::{
43        repeat_vars, time_to_timestamp, EncryptableStore, SqliteAsyncConnExt,
44        SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt,
45    },
46    OpenStoreError, Secret, SqliteStoreConfig,
47};
48
49mod keys {
50    // Entries in Key-value store
51    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
52    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
53
54    // Tables
55    pub const MEDIA: &str = "media";
56}
57
58/// The database name.
59const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3";
60
61/// Identifier of the latest database version.
62///
63/// This is used to figure whether the SQLite database requires a migration.
64/// Every new SQL migration should imply a bump of this number, and changes in
65/// the [`run_migrations`] function.
66const DATABASE_VERSION: u8 = 1;
67
68/// An SQLite-based media store.
69#[derive(Clone)]
70pub struct SqliteMediaStore {
71    store_cipher: Option<Arc<StoreCipher>>,
72
73    /// The pool of connections.
74    pool: SqlitePool,
75
76    /// We make the difference between connections for read operations, and for
77    /// write operations. We keep a single connection apart from write
78    /// operations. All other connections are used for read operations. The
79    /// lock is used to ensure there is one owner at a time.
80    write_connection: Arc<Mutex<SqliteAsyncConn>>,
81
82    media_service: MediaService,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for SqliteMediaStore {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("SqliteMediaStore").finish_non_exhaustive()
89    }
90}
91
92impl EncryptableStore for SqliteMediaStore {
93    fn get_cypher(&self) -> Option<&StoreCipher> {
94        self.store_cipher.as_deref()
95    }
96}
97
98impl SqliteMediaStore {
99    /// Open the SQLite-based media store at the given path using the
100    /// given passphrase to encrypt private data.
101    pub async fn open(
102        path: impl AsRef<Path>,
103        passphrase: Option<&str>,
104    ) -> Result<Self, OpenStoreError> {
105        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
106    }
107
108    /// Open the SQLite-based media store at the given path using the given
109    /// key to encrypt private data.
110    pub async fn open_with_key(
111        path: impl AsRef<Path>,
112        key: Option<&[u8; 32]>,
113    ) -> Result<Self, OpenStoreError> {
114        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
115    }
116
117    /// Open the SQLite-based media store with the config open config.
118    #[instrument(skip(config), fields(path = ?config.path))]
119    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
120        debug!(?config);
121
122        let _timer = timer!("open_with_config");
123
124        let SqliteStoreConfig { path, secret, pool_config, runtime_config } = config;
125
126        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
127
128        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
129        config.pool = Some(pool_config);
130
131        let pool = config.create_pool(Runtime::Tokio1)?;
132
133        let this = Self::open_with_pool(pool, secret).await?;
134        this.write().await?.apply_runtime_config(runtime_config).await?;
135
136        Ok(this)
137    }
138
139    /// Open an SQLite-based media store using the given SQLite database
140    /// pool. The given passphrase will be used to encrypt private data.
141    async fn open_with_pool(
142        pool: SqlitePool,
143        secret: Option<Secret>,
144    ) -> Result<Self, OpenStoreError> {
145        let conn = pool.get().await?;
146
147        let version = conn.db_version().await?;
148        run_migrations(&conn, version).await?;
149
150        let store_cipher = match secret {
151            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
152            None => None,
153        };
154
155        let media_service = MediaService::new();
156        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
157        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
158        media_service.restore(media_retention_policy, last_media_cleanup_time);
159
160        Ok(Self {
161            store_cipher,
162            pool,
163            // Use `conn` as our selected write connections.
164            write_connection: Arc::new(Mutex::new(conn)),
165            media_service,
166        })
167    }
168
169    // Acquire a connection for executing read operations.
170    #[instrument(skip_all)]
171    async fn read(&self) -> Result<SqliteAsyncConn> {
172        trace!("Taking a `read` connection");
173        let _timer = timer!("connection");
174
175        let connection = self.pool.get().await?;
176
177        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
178        // support must be enabled on a per-connection basis. Execute it every
179        // time we try to get a connection, since we can't guarantee a previous
180        // connection did enable it before.
181        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
182
183        Ok(connection)
184    }
185
186    // Acquire a connection for executing write operations.
187    #[instrument(skip_all)]
188    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
189        trace!("Taking a `write` connection");
190        let _timer = timer!("connection");
191
192        let connection = self.write_connection.clone().lock_owned().await;
193
194        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
195        // support must be enabled on a per-connection basis. Execute it every
196        // time we try to get a connection, since we can't guarantee a previous
197        // connection did enable it before.
198        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
199
200        Ok(connection)
201    }
202}
203
204/// Run migrations for the given version of the database.
205async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
206    if version == 0 {
207        debug!("Creating database");
208    } else if version < DATABASE_VERSION {
209        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
210    } else {
211        return Ok(());
212    }
213
214    // Always enable foreign keys for the current connection.
215    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
216
217    if version < 1 {
218        // First turn on WAL mode, this can't be done in the transaction, it fails with
219        // the error message: "cannot change into wal mode from within a transaction".
220        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
221        conn.with_transaction(|txn| {
222            txn.execute_batch(include_str!("../migrations/media_store/001_init.sql"))?;
223            txn.set_db_version(1)
224        })
225        .await?;
226    }
227
228    Ok(())
229}
230
231#[async_trait]
232impl MediaStore for SqliteMediaStore {
233    type Error = Error;
234
235    #[instrument(skip(self))]
236    async fn try_take_leased_lock(
237        &self,
238        lease_duration_ms: u32,
239        key: &str,
240        holder: &str,
241    ) -> Result<bool> {
242        let _timer = timer!("method");
243
244        let key = key.to_owned();
245        let holder = holder.to_owned();
246
247        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
248        let expiration = now + lease_duration_ms as u64;
249
250        let num_touched = self
251            .write()
252            .await?
253            .with_transaction(move |txn| {
254                txn.execute(
255                    "INSERT INTO lease_locks (key, holder, expiration)
256                    VALUES (?1, ?2, ?3)
257                    ON CONFLICT (key)
258                    DO
259                        UPDATE SET holder = ?2, expiration = ?3
260                        WHERE holder = ?2
261                        OR expiration < ?4
262                ",
263                    (key, holder, expiration, now),
264                )
265            })
266            .await?;
267
268        Ok(num_touched == 1)
269    }
270
271    async fn add_media_content(
272        &self,
273        request: &MediaRequestParameters,
274        content: Vec<u8>,
275        ignore_policy: IgnoreMediaRetentionPolicy,
276    ) -> Result<()> {
277        let _timer = timer!("method");
278
279        self.media_service.add_media_content(self, request, content, ignore_policy).await
280    }
281
282    #[instrument(skip_all)]
283    async fn replace_media_key(
284        &self,
285        from: &MediaRequestParameters,
286        to: &MediaRequestParameters,
287    ) -> Result<(), Self::Error> {
288        let _timer = timer!("method");
289
290        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
291        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
292
293        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
294        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
295
296        let conn = self.write().await?;
297        conn.execute(
298            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
299            (new_uri, new_format, prev_uri, prev_format),
300        )
301        .await?;
302
303        Ok(())
304    }
305
306    #[instrument(skip_all)]
307    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
308        let _timer = timer!("method");
309
310        self.media_service.get_media_content(self, request).await
311    }
312
313    #[instrument(skip_all)]
314    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
315        let _timer = timer!("method");
316
317        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
318        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
319
320        let conn = self.write().await?;
321        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
322
323        Ok(())
324    }
325
326    #[instrument(skip(self))]
327    async fn get_media_content_for_uri(
328        &self,
329        uri: &MxcUri,
330    ) -> Result<Option<Vec<u8>>, Self::Error> {
331        let _timer = timer!("method");
332
333        self.media_service.get_media_content_for_uri(self, uri).await
334    }
335
336    #[instrument(skip(self))]
337    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
338        let _timer = timer!("method");
339
340        let uri = self.encode_key(keys::MEDIA, uri);
341
342        let conn = self.write().await?;
343        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
344
345        Ok(())
346    }
347
348    #[instrument(skip_all)]
349    async fn set_media_retention_policy(
350        &self,
351        policy: MediaRetentionPolicy,
352    ) -> Result<(), Self::Error> {
353        let _timer = timer!("method");
354
355        self.media_service.set_media_retention_policy(self, policy).await
356    }
357
358    #[instrument(skip_all)]
359    fn media_retention_policy(&self) -> MediaRetentionPolicy {
360        let _timer = timer!("method");
361
362        self.media_service.media_retention_policy()
363    }
364
365    #[instrument(skip_all)]
366    async fn set_ignore_media_retention_policy(
367        &self,
368        request: &MediaRequestParameters,
369        ignore_policy: IgnoreMediaRetentionPolicy,
370    ) -> Result<(), Self::Error> {
371        let _timer = timer!("method");
372
373        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
374    }
375
376    #[instrument(skip_all)]
377    async fn clean(&self) -> Result<(), Self::Error> {
378        let _timer = timer!("method");
379
380        self.media_service.clean(self).await
381    }
382}
383
384#[cfg_attr(target_family = "wasm", async_trait(?Send))]
385#[cfg_attr(not(target_family = "wasm"), async_trait)]
386impl MediaStoreInner for SqliteMediaStore {
387    type Error = Error;
388
389    async fn media_retention_policy_inner(
390        &self,
391    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
392        let conn = self.read().await?;
393        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
394    }
395
396    async fn set_media_retention_policy_inner(
397        &self,
398        policy: MediaRetentionPolicy,
399    ) -> Result<(), Self::Error> {
400        let conn = self.write().await?;
401        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
402        Ok(())
403    }
404
405    async fn add_media_content_inner(
406        &self,
407        request: &MediaRequestParameters,
408        data: Vec<u8>,
409        last_access: SystemTime,
410        policy: MediaRetentionPolicy,
411        ignore_policy: IgnoreMediaRetentionPolicy,
412    ) -> Result<(), Self::Error> {
413        let ignore_policy = ignore_policy.is_yes();
414        let data = self.encode_value(data)?;
415
416        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
417            return Ok(());
418        }
419
420        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
421        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
422        let timestamp = time_to_timestamp(last_access);
423
424        let conn = self.write().await?;
425        conn.execute(
426            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
427            (uri, format, data, timestamp, ignore_policy),
428        )
429        .await?;
430
431        Ok(())
432    }
433
434    async fn set_ignore_media_retention_policy_inner(
435        &self,
436        request: &MediaRequestParameters,
437        ignore_policy: IgnoreMediaRetentionPolicy,
438    ) -> Result<(), Self::Error> {
439        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
440        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
441        let ignore_policy = ignore_policy.is_yes();
442
443        let conn = self.write().await?;
444        conn.execute(
445            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
446            (ignore_policy, uri, format),
447        )
448        .await?;
449
450        Ok(())
451    }
452
453    async fn get_media_content_inner(
454        &self,
455        request: &MediaRequestParameters,
456        current_time: SystemTime,
457    ) -> Result<Option<Vec<u8>>, Self::Error> {
458        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
459        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
460        let timestamp = time_to_timestamp(current_time);
461
462        let conn = self.write().await?;
463        let data = conn
464            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
465                // Update the last access.
466                // We need to do this first so the transaction is in write mode right away.
467                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
468                txn.execute(
469                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
470                    (timestamp, &uri, &format),
471                )?;
472
473                txn.query_row::<Vec<u8>, _, _>(
474                    "SELECT data FROM media WHERE uri = ? AND format = ?",
475                    (&uri, &format),
476                    |row| row.get(0),
477                )
478                .optional()
479            })
480            .await?;
481
482        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
483    }
484
485    async fn get_media_content_for_uri_inner(
486        &self,
487        uri: &MxcUri,
488        current_time: SystemTime,
489    ) -> Result<Option<Vec<u8>>, Self::Error> {
490        let uri = self.encode_key(keys::MEDIA, uri);
491        let timestamp = time_to_timestamp(current_time);
492
493        let conn = self.write().await?;
494        let data = conn
495            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
496                // Update the last access.
497                // We need to do this first so the transaction is in write mode right away.
498                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
499                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
500
501                txn.query_row::<Vec<u8>, _, _>(
502                    "SELECT data FROM media WHERE uri = ?",
503                    (&uri,),
504                    |row| row.get(0),
505                )
506                .optional()
507            })
508            .await?;
509
510        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
511    }
512
513    async fn clean_inner(
514        &self,
515        policy: MediaRetentionPolicy,
516        current_time: SystemTime,
517    ) -> Result<(), Self::Error> {
518        if !policy.has_limitations() {
519            // We can safely skip all the checks.
520            return Ok(());
521        }
522
523        let conn = self.write().await?;
524        let removed = conn
525            .with_transaction::<_, Error, _>(move |txn| {
526                let mut removed = false;
527
528                // First, check media content that exceed the max filesize.
529                if let Some(max_file_size) = policy.computed_max_file_size() {
530                    let count = txn.execute(
531                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
532                        (max_file_size,),
533                    )?;
534
535                    if count > 0 {
536                        removed = true;
537                    }
538                }
539
540                // Then, clean up expired media content.
541                if let Some(last_access_expiry) = policy.last_access_expiry {
542                    let current_timestamp = time_to_timestamp(current_time);
543                    let expiry_secs = last_access_expiry.as_secs();
544                    let count = txn.execute(
545                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
546                        (current_timestamp, expiry_secs),
547                    )?;
548
549                    if count > 0 {
550                        removed = true;
551                    }
552                }
553
554                // Finally, if the cache size is too big, remove old items until it fits.
555                if let Some(max_cache_size) = policy.max_cache_size {
556                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
557                    // during the conversion of the result.
558                    let cache_size = txn
559                        .query_row(
560                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
561                            (),
562                            |row| {
563                                // `sum()` returns `NULL` if there are no rows.
564                                row.get::<_, Option<u64>>(0)
565                            },
566                        )?
567                        .unwrap_or_default();
568
569                    // If the cache size is overflowing or bigger than max cache size, clean up.
570                    if cache_size > max_cache_size {
571                        // Get the sizes of the media contents ordered by last access.
572                        let mut cached_stmt = txn.prepare_cached(
573                            "SELECT rowid, length(data) FROM media \
574                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
575                        )?;
576                        let content_sizes = cached_stmt
577                            .query(())?
578                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
579
580                        let mut accumulated_items_size = 0u64;
581                        let mut limit_reached = false;
582                        let mut rows_to_remove = Vec::new();
583
584                        for result in content_sizes {
585                            let (row_id, size) = match result {
586                                Ok(content_size) => content_size,
587                                Err(error) => {
588                                    return Err(error.into());
589                                }
590                            };
591
592                            if limit_reached {
593                                rows_to_remove.push(row_id);
594                                continue;
595                            }
596
597                            match accumulated_items_size.checked_add(size) {
598                                Some(acc) if acc > max_cache_size => {
599                                    // We can stop accumulating.
600                                    limit_reached = true;
601                                    rows_to_remove.push(row_id);
602                                }
603                                Some(acc) => accumulated_items_size = acc,
604                                None => {
605                                    // The accumulated size is overflowing but the setting cannot be
606                                    // bigger than usize::MAX, we can stop accumulating.
607                                    limit_reached = true;
608                                    rows_to_remove.push(row_id);
609                                }
610                            }
611                        }
612
613                        if !rows_to_remove.is_empty() {
614                            removed = true;
615                        }
616
617                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
618                            let sql_params = repeat_vars(row_ids.len());
619                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
620                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
621                            Ok(Vec::<()>::new())
622                        })?;
623                    }
624                }
625
626                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
627
628                Ok(removed)
629            })
630            .await?;
631
632        // If we removed media, defragment the database and free space on the
633        // filesystem.
634        if removed {
635            conn.vacuum().await?;
636        }
637
638        Ok(())
639    }
640
641    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
642        let conn = self.read().await?;
643        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
644    }
645}
646
647#[cfg(test)]
648mod tests {
649    use std::{
650        path::PathBuf,
651        sync::atomic::{AtomicU32, Ordering::SeqCst},
652        time::Duration,
653    };
654
655    use matrix_sdk_base::{
656        media::{
657            store::{IgnoreMediaRetentionPolicy, MediaStore, MediaStoreError},
658            MediaFormat, MediaRequestParameters, MediaThumbnailSettings,
659        },
660        media_store_inner_integration_tests, media_store_integration_tests,
661        media_store_integration_tests_time,
662    };
663    use matrix_sdk_test::async_test;
664    use once_cell::sync::Lazy;
665    use ruma::{events::room::MediaSource, media::Method, mxc_uri, uint};
666    use tempfile::{tempdir, TempDir};
667
668    use super::SqliteMediaStore;
669    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
670
671    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
672    static NUM: AtomicU32 = AtomicU32::new(0);
673
674    fn new_media_store_workspace() -> PathBuf {
675        let name = NUM.fetch_add(1, SeqCst).to_string();
676        TMP_DIR.path().join(name)
677    }
678
679    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
680        let tmpdir_path = new_media_store_workspace();
681
682        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
683
684        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
685    }
686
687    media_store_integration_tests!();
688    media_store_integration_tests_time!();
689    media_store_inner_integration_tests!();
690
691    async fn get_media_store_content_sorted_by_last_access(
692        media_store: &SqliteMediaStore,
693    ) -> Vec<Vec<u8>> {
694        let sqlite_db = media_store.read().await.expect("accessing sqlite db failed");
695        sqlite_db
696            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
697                stmt.query(())?.mapped(|row| row.get(0)).collect()
698            })
699            .await
700            .expect("querying media cache content by last access failed")
701    }
702
703    #[async_test]
704    async fn test_pool_size() {
705        let tmpdir_path = new_media_store_workspace();
706        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
707
708        let store = SqliteMediaStore::open_with_config(store_open_config).await.unwrap();
709
710        assert_eq!(store.pool.status().max_size, 42);
711    }
712
713    #[async_test]
714    async fn test_last_access() {
715        let media_store = get_media_store().await.expect("creating media cache failed");
716        let uri = mxc_uri!("mxc://localhost/media");
717        let file_request = MediaRequestParameters {
718            source: MediaSource::Plain(uri.to_owned()),
719            format: MediaFormat::File,
720        };
721        let thumbnail_request = MediaRequestParameters {
722            source: MediaSource::Plain(uri.to_owned()),
723            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
724                Method::Crop,
725                uint!(100),
726                uint!(100),
727            )),
728        };
729
730        let content: Vec<u8> = "hello world".into();
731        let thumbnail_content: Vec<u8> = "hello…".into();
732
733        // Add the media.
734        media_store
735            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
736            .await
737            .expect("adding file failed");
738
739        // Since the precision of the timestamp is in seconds, wait so the timestamps
740        // differ.
741        tokio::time::sleep(Duration::from_secs(3)).await;
742
743        media_store
744            .add_media_content(
745                &thumbnail_request,
746                thumbnail_content.clone(),
747                IgnoreMediaRetentionPolicy::No,
748            )
749            .await
750            .expect("adding thumbnail failed");
751
752        // File's last access is older than thumbnail.
753        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
754
755        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
756        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
757        assert_eq!(contents[1], content, "file is not second-to-last access");
758
759        // Since the precision of the timestamp is in seconds, wait so the timestamps
760        // differ.
761        tokio::time::sleep(Duration::from_secs(3)).await;
762
763        // Access the file so its last access is more recent.
764        let _ = media_store
765            .get_media_content(&file_request)
766            .await
767            .expect("getting file failed")
768            .expect("file is missing");
769
770        // File's last access is more recent than thumbnail.
771        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
772
773        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
774        assert_eq!(contents[0], content, "file is not last access");
775        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
776    }
777}
778
779#[cfg(test)]
780mod encrypted_tests {
781    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
782
783    use matrix_sdk_base::{
784        media::store::MediaStoreError, media_store_inner_integration_tests,
785        media_store_integration_tests, media_store_integration_tests_time,
786    };
787    use once_cell::sync::Lazy;
788    use tempfile::{tempdir, TempDir};
789
790    use super::SqliteMediaStore;
791
792    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
793    static NUM: AtomicU32 = AtomicU32::new(0);
794
795    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
796        let name = NUM.fetch_add(1, SeqCst).to_string();
797        let tmpdir_path = TMP_DIR.path().join(name);
798
799        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
800
801        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
802            .await
803            .unwrap())
804    }
805
806    media_store_integration_tests!();
807    media_store_integration_tests_time!();
808    media_store_inner_integration_tests!();
809}