matrix_sdk_sqlite/
media_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`MediaStore`].
16
17use std::{fmt, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    media::{
23        MediaRequestParameters, UniqueKey,
24        store::{
25            IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
26            MediaStoreInner,
27        },
28    },
29    timer,
30};
31use matrix_sdk_store_encryption::StoreCipher;
32use ruma::{MilliSecondsSinceUnixEpoch, MxcUri, time::SystemTime};
33use rusqlite::{OptionalExtension, params_from_iter};
34use tokio::{
35    fs,
36    sync::{Mutex, OwnedMutexGuard},
37};
38use tracing::{debug, instrument};
39
40use crate::{
41    OpenStoreError, Secret, SqliteStoreConfig,
42    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
43    error::{Error, Result},
44    utils::{
45        EncryptableStore, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
46        SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars, time_to_timestamp,
47    },
48};
49
50mod keys {
51    // Entries in Key-value store
52    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
53    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
54
55    // Tables
56    pub const MEDIA: &str = "media";
57}
58
59/// The database name.
60const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3";
61
62/// Identifier of the latest database version.
63///
64/// This is used to figure whether the SQLite database requires a migration.
65/// Every new SQL migration should imply a bump of this number, and changes in
66/// the [`run_migrations`] function.
67const DATABASE_VERSION: u8 = 2;
68
69/// An SQLite-based media store.
70#[derive(Clone)]
71pub struct SqliteMediaStore {
72    store_cipher: Option<Arc<StoreCipher>>,
73
74    /// The pool of connections.
75    pool: SqlitePool,
76
77    /// We make the difference between connections for read operations, and for
78    /// write operations. We keep a single connection apart from write
79    /// operations. All other connections are used for read operations. The
80    /// lock is used to ensure there is one owner at a time.
81    write_connection: Arc<Mutex<SqliteAsyncConn>>,
82
83    media_service: MediaService,
84}
85
86#[cfg(not(tarpaulin_include))]
87impl fmt::Debug for SqliteMediaStore {
88    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
89        f.debug_struct("SqliteMediaStore").finish_non_exhaustive()
90    }
91}
92
93impl EncryptableStore for SqliteMediaStore {
94    fn get_cypher(&self) -> Option<&StoreCipher> {
95        self.store_cipher.as_deref()
96    }
97}
98
99impl SqliteMediaStore {
100    /// Open the SQLite-based media store at the given path using the
101    /// given passphrase to encrypt private data.
102    pub async fn open(
103        path: impl AsRef<Path>,
104        passphrase: Option<&str>,
105    ) -> Result<Self, OpenStoreError> {
106        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
107    }
108
109    /// Open the SQLite-based media store at the given path using the given
110    /// key to encrypt private data.
111    pub async fn open_with_key(
112        path: impl AsRef<Path>,
113        key: Option<&[u8; 32]>,
114    ) -> Result<Self, OpenStoreError> {
115        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
116    }
117
118    /// Open the SQLite-based media store with the config open config.
119    #[instrument(skip(config), fields(path = ?config.path))]
120    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
121        debug!(?config);
122
123        let _timer = timer!("open_with_config");
124
125        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
126
127        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
128
129        let this = Self::open_with_pool(pool, config.secret).await?;
130        this.write().await?.apply_runtime_config(config.runtime_config).await?;
131
132        Ok(this)
133    }
134
135    /// Open an SQLite-based media store using the given SQLite database
136    /// pool. The given passphrase will be used to encrypt private data.
137    async fn open_with_pool(
138        pool: SqlitePool,
139        secret: Option<Secret>,
140    ) -> Result<Self, OpenStoreError> {
141        let conn = pool.get().await?;
142
143        let version = conn.db_version().await?;
144        run_migrations(&conn, version).await?;
145
146        conn.wal_checkpoint().await;
147
148        let store_cipher = match secret {
149            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
150            None => None,
151        };
152
153        let media_service = MediaService::new();
154        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
155        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
156        media_service.restore(media_retention_policy, last_media_cleanup_time);
157
158        Ok(Self {
159            store_cipher,
160            pool,
161            // Use `conn` as our selected write connections.
162            write_connection: Arc::new(Mutex::new(conn)),
163            media_service,
164        })
165    }
166
167    // Acquire a connection for executing read operations.
168    #[instrument(skip_all)]
169    async fn read(&self) -> Result<SqliteAsyncConn> {
170        let connection = self.pool.get().await?;
171
172        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
173        // support must be enabled on a per-connection basis. Execute it every
174        // time we try to get a connection, since we can't guarantee a previous
175        // connection did enable it before.
176        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
177
178        Ok(connection)
179    }
180
181    // Acquire a connection for executing write operations.
182    #[instrument(skip_all)]
183    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
184        let connection = self.write_connection.clone().lock_owned().await;
185
186        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
187        // support must be enabled on a per-connection basis. Execute it every
188        // time we try to get a connection, since we can't guarantee a previous
189        // connection did enable it before.
190        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
191
192        Ok(connection)
193    }
194
195    pub async fn vacuum(&self) -> Result<()> {
196        self.write_connection.lock().await.vacuum().await
197    }
198
199    async fn get_db_size(&self) -> Result<Option<usize>> {
200        Ok(Some(self.pool.get().await?.get_db_size().await?))
201    }
202}
203
204/// Run migrations for the given version of the database.
205async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
206    if version == 0 {
207        debug!("Creating database");
208    } else if version < DATABASE_VERSION {
209        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
210    } else {
211        return Ok(());
212    }
213
214    // Always enable foreign keys for the current connection.
215    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
216
217    if version < 1 {
218        // First turn on WAL mode, this can't be done in the transaction, it fails with
219        // the error message: "cannot change into wal mode from within a transaction".
220        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
221        conn.with_transaction(|txn| {
222            txn.execute_batch(include_str!("../migrations/media_store/001_init.sql"))?;
223            txn.set_db_version(1)
224        })
225        .await?;
226    }
227
228    if version < 2 {
229        conn.with_transaction(|txn| {
230            txn.execute_batch(include_str!(
231                "../migrations/media_store/002_lease_locks_with_generation.sql"
232            ))?;
233            txn.set_db_version(2)
234        })
235        .await?;
236    }
237
238    Ok(())
239}
240
241#[async_trait]
242impl MediaStore for SqliteMediaStore {
243    type Error = Error;
244
245    #[instrument(skip(self))]
246    async fn try_take_leased_lock(
247        &self,
248        lease_duration_ms: u32,
249        key: &str,
250        holder: &str,
251    ) -> Result<Option<CrossProcessLockGeneration>> {
252        let key = key.to_owned();
253        let holder = holder.to_owned();
254
255        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
256        let expiration = now + lease_duration_ms as u64;
257
258        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
259        let generation = self
260            .write()
261            .await?
262            .with_transaction(move |txn| {
263                txn.query_row(
264                    "INSERT INTO lease_locks (key, holder, expiration)
265                    VALUES (?1, ?2, ?3)
266                    ON CONFLICT (key)
267                    DO
268                        UPDATE SET
269                            holder = excluded.holder,
270                            expiration = excluded.expiration,
271                            generation =
272                                CASE holder
273                                    WHEN excluded.holder THEN generation
274                                    ELSE generation + 1
275                                END
276                        WHERE
277                            holder = excluded.holder
278                            OR expiration < ?4
279                    RETURNING generation
280                    ",
281                    (key, holder, expiration, now),
282                    |row| row.get(0),
283                )
284                .optional()
285            })
286            .await?;
287
288        Ok(generation)
289    }
290
291    async fn add_media_content(
292        &self,
293        request: &MediaRequestParameters,
294        content: Vec<u8>,
295        ignore_policy: IgnoreMediaRetentionPolicy,
296    ) -> Result<()> {
297        let _timer = timer!("method");
298
299        self.media_service.add_media_content(self, request, content, ignore_policy).await
300    }
301
302    #[instrument(skip_all)]
303    async fn replace_media_key(
304        &self,
305        from: &MediaRequestParameters,
306        to: &MediaRequestParameters,
307    ) -> Result<(), Self::Error> {
308        let _timer = timer!("method");
309
310        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
311        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
312
313        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
314        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
315
316        let conn = self.write().await?;
317        conn.execute(
318            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
319            (new_uri, new_format, prev_uri, prev_format),
320        )
321        .await?;
322
323        Ok(())
324    }
325
326    #[instrument(skip_all)]
327    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
328        let _timer = timer!("method");
329
330        self.media_service.get_media_content(self, request).await
331    }
332
333    #[instrument(skip_all)]
334    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
335        let _timer = timer!("method");
336
337        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
338        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
339
340        let conn = self.write().await?;
341        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
342
343        Ok(())
344    }
345
346    #[instrument(skip(self))]
347    async fn get_media_content_for_uri(
348        &self,
349        uri: &MxcUri,
350    ) -> Result<Option<Vec<u8>>, Self::Error> {
351        let _timer = timer!("method");
352
353        self.media_service.get_media_content_for_uri(self, uri).await
354    }
355
356    #[instrument(skip(self))]
357    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
358        let _timer = timer!("method");
359
360        let uri = self.encode_key(keys::MEDIA, uri);
361
362        let conn = self.write().await?;
363        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
364
365        Ok(())
366    }
367
368    #[instrument(skip_all)]
369    async fn set_media_retention_policy(
370        &self,
371        policy: MediaRetentionPolicy,
372    ) -> Result<(), Self::Error> {
373        let _timer = timer!("method");
374
375        self.media_service.set_media_retention_policy(self, policy).await
376    }
377
378    #[instrument(skip_all)]
379    fn media_retention_policy(&self) -> MediaRetentionPolicy {
380        let _timer = timer!("method");
381
382        self.media_service.media_retention_policy()
383    }
384
385    #[instrument(skip_all)]
386    async fn set_ignore_media_retention_policy(
387        &self,
388        request: &MediaRequestParameters,
389        ignore_policy: IgnoreMediaRetentionPolicy,
390    ) -> Result<(), Self::Error> {
391        let _timer = timer!("method");
392
393        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
394    }
395
396    #[instrument(skip_all)]
397    async fn clean(&self) -> Result<(), Self::Error> {
398        let _timer = timer!("method");
399
400        self.media_service.clean(self).await
401    }
402
403    async fn optimize(&self) -> Result<(), Self::Error> {
404        Ok(self.vacuum().await?)
405    }
406
407    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
408        self.get_db_size().await
409    }
410}
411
412#[cfg_attr(target_family = "wasm", async_trait(?Send))]
413#[cfg_attr(not(target_family = "wasm"), async_trait)]
414impl MediaStoreInner for SqliteMediaStore {
415    type Error = Error;
416
417    async fn media_retention_policy_inner(
418        &self,
419    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
420        let conn = self.read().await?;
421        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
422    }
423
424    async fn set_media_retention_policy_inner(
425        &self,
426        policy: MediaRetentionPolicy,
427    ) -> Result<(), Self::Error> {
428        let conn = self.write().await?;
429        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
430        Ok(())
431    }
432
433    async fn add_media_content_inner(
434        &self,
435        request: &MediaRequestParameters,
436        data: Vec<u8>,
437        last_access: SystemTime,
438        policy: MediaRetentionPolicy,
439        ignore_policy: IgnoreMediaRetentionPolicy,
440    ) -> Result<(), Self::Error> {
441        let ignore_policy = ignore_policy.is_yes();
442        let data = self.encode_value(data)?;
443
444        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
445            return Ok(());
446        }
447
448        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
449        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
450        let timestamp = time_to_timestamp(last_access);
451
452        let conn = self.write().await?;
453        conn.execute(
454            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
455            (uri, format, data, timestamp, ignore_policy),
456        )
457        .await?;
458
459        Ok(())
460    }
461
462    async fn set_ignore_media_retention_policy_inner(
463        &self,
464        request: &MediaRequestParameters,
465        ignore_policy: IgnoreMediaRetentionPolicy,
466    ) -> Result<(), Self::Error> {
467        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
468        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
469        let ignore_policy = ignore_policy.is_yes();
470
471        let conn = self.write().await?;
472        conn.execute(
473            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
474            (ignore_policy, uri, format),
475        )
476        .await?;
477
478        Ok(())
479    }
480
481    async fn get_media_content_inner(
482        &self,
483        request: &MediaRequestParameters,
484        current_time: SystemTime,
485    ) -> Result<Option<Vec<u8>>, Self::Error> {
486        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
487        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
488        let timestamp = time_to_timestamp(current_time);
489
490        let conn = self.write().await?;
491        let data = conn
492            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
493                // Update the last access.
494                // We need to do this first so the transaction is in write mode right away.
495                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
496                txn.execute(
497                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
498                    (timestamp, &uri, &format),
499                )?;
500
501                txn.query_row::<Vec<u8>, _, _>(
502                    "SELECT data FROM media WHERE uri = ? AND format = ?",
503                    (&uri, &format),
504                    |row| row.get(0),
505                )
506                .optional()
507            })
508            .await?;
509
510        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
511    }
512
513    async fn get_media_content_for_uri_inner(
514        &self,
515        uri: &MxcUri,
516        current_time: SystemTime,
517    ) -> Result<Option<Vec<u8>>, Self::Error> {
518        let uri = self.encode_key(keys::MEDIA, uri);
519        let timestamp = time_to_timestamp(current_time);
520
521        let conn = self.write().await?;
522        let data = conn
523            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
524                // Update the last access.
525                // We need to do this first so the transaction is in write mode right away.
526                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
527                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
528
529                txn.query_row::<Vec<u8>, _, _>(
530                    "SELECT data FROM media WHERE uri = ?",
531                    (&uri,),
532                    |row| row.get(0),
533                )
534                .optional()
535            })
536            .await?;
537
538        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
539    }
540
541    async fn clean_inner(
542        &self,
543        policy: MediaRetentionPolicy,
544        current_time: SystemTime,
545    ) -> Result<(), Self::Error> {
546        if !policy.has_limitations() {
547            // We can safely skip all the checks.
548            return Ok(());
549        }
550
551        let conn = self.write().await?;
552        let removed = conn
553            .with_transaction::<_, Error, _>(move |txn| {
554                let mut removed = false;
555
556                // First, check media content that exceed the max filesize.
557                if let Some(max_file_size) = policy.computed_max_file_size() {
558                    let count = txn.execute(
559                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
560                        (max_file_size,),
561                    )?;
562
563                    if count > 0 {
564                        removed = true;
565                    }
566                }
567
568                // Then, clean up expired media content.
569                if let Some(last_access_expiry) = policy.last_access_expiry {
570                    let current_timestamp = time_to_timestamp(current_time);
571                    let expiry_secs = last_access_expiry.as_secs();
572                    let count = txn.execute(
573                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
574                        (current_timestamp, expiry_secs),
575                    )?;
576
577                    if count > 0 {
578                        removed = true;
579                    }
580                }
581
582                // Finally, if the cache size is too big, remove old items until it fits.
583                if let Some(max_cache_size) = policy.max_cache_size {
584                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
585                    // during the conversion of the result.
586                    let cache_size = txn
587                        .query_row(
588                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
589                            (),
590                            |row| {
591                                // `sum()` returns `NULL` if there are no rows.
592                                row.get::<_, Option<u64>>(0)
593                            },
594                        )?
595                        .unwrap_or_default();
596
597                    // If the cache size is overflowing or bigger than max cache size, clean up.
598                    if cache_size > max_cache_size {
599                        // Get the sizes of the media contents ordered by last access.
600                        let mut cached_stmt = txn.prepare_cached(
601                            "SELECT rowid, length(data) FROM media \
602                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
603                        )?;
604                        let content_sizes = cached_stmt
605                            .query(())?
606                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
607
608                        let mut accumulated_items_size = 0u64;
609                        let mut limit_reached = false;
610                        let mut rows_to_remove = Vec::new();
611
612                        for result in content_sizes {
613                            let (row_id, size) = match result {
614                                Ok(content_size) => content_size,
615                                Err(error) => {
616                                    return Err(error.into());
617                                }
618                            };
619
620                            if limit_reached {
621                                rows_to_remove.push(row_id);
622                                continue;
623                            }
624
625                            match accumulated_items_size.checked_add(size) {
626                                Some(acc) if acc > max_cache_size => {
627                                    // We can stop accumulating.
628                                    limit_reached = true;
629                                    rows_to_remove.push(row_id);
630                                }
631                                Some(acc) => accumulated_items_size = acc,
632                                None => {
633                                    // The accumulated size is overflowing but the setting cannot be
634                                    // bigger than usize::MAX, we can stop accumulating.
635                                    limit_reached = true;
636                                    rows_to_remove.push(row_id);
637                                }
638                            }
639                        }
640
641                        if !rows_to_remove.is_empty() {
642                            removed = true;
643                        }
644
645                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
646                            let sql_params = repeat_vars(row_ids.len());
647                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
648                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
649                            Ok(Vec::<()>::new())
650                        })?;
651                    }
652                }
653
654                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
655
656                Ok(removed)
657            })
658            .await?;
659
660        // If we removed media, defragment the database and free space on the
661        // filesystem.
662        if removed {
663            conn.vacuum().await?;
664        }
665
666        Ok(())
667    }
668
669    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
670        let conn = self.read().await?;
671        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use std::{
678        path::PathBuf,
679        sync::atomic::{AtomicU32, Ordering::SeqCst},
680        time::Duration,
681    };
682
683    use matrix_sdk_base::{
684        media::{
685            MediaFormat, MediaRequestParameters, MediaThumbnailSettings,
686            store::{IgnoreMediaRetentionPolicy, MediaStore, MediaStoreError},
687        },
688        media_store_inner_integration_tests, media_store_integration_tests,
689        media_store_integration_tests_time,
690    };
691    use matrix_sdk_test::async_test;
692    use once_cell::sync::Lazy;
693    use ruma::{events::room::MediaSource, media::Method, mxc_uri, uint};
694    use tempfile::{TempDir, tempdir};
695
696    use super::SqliteMediaStore;
697    use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
698
699    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
700    static NUM: AtomicU32 = AtomicU32::new(0);
701
702    fn new_media_store_workspace() -> PathBuf {
703        let name = NUM.fetch_add(1, SeqCst).to_string();
704        TMP_DIR.path().join(name)
705    }
706
707    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
708        let tmpdir_path = new_media_store_workspace();
709
710        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
711
712        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
713    }
714
715    media_store_integration_tests!();
716    media_store_integration_tests_time!();
717    media_store_inner_integration_tests!();
718
719    async fn get_media_store_content_sorted_by_last_access(
720        media_store: &SqliteMediaStore,
721    ) -> Vec<Vec<u8>> {
722        let sqlite_db = media_store.read().await.expect("accessing sqlite db failed");
723        sqlite_db
724            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
725                stmt.query(())?.mapped(|row| row.get(0)).collect()
726            })
727            .await
728            .expect("querying media cache content by last access failed")
729    }
730
731    #[async_test]
732    async fn test_pool_size() {
733        let tmpdir_path = new_media_store_workspace();
734        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
735
736        let store = SqliteMediaStore::open_with_config(store_open_config).await.unwrap();
737
738        assert_eq!(store.pool.status().max_size, 42);
739    }
740
741    #[async_test]
742    async fn test_last_access() {
743        let media_store = get_media_store().await.expect("creating media cache failed");
744        let uri = mxc_uri!("mxc://localhost/media");
745        let file_request = MediaRequestParameters {
746            source: MediaSource::Plain(uri.to_owned()),
747            format: MediaFormat::File,
748        };
749        let thumbnail_request = MediaRequestParameters {
750            source: MediaSource::Plain(uri.to_owned()),
751            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
752                Method::Crop,
753                uint!(100),
754                uint!(100),
755            )),
756        };
757
758        let content: Vec<u8> = "hello world".into();
759        let thumbnail_content: Vec<u8> = "hello…".into();
760
761        // Add the media.
762        media_store
763            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
764            .await
765            .expect("adding file failed");
766
767        // Since the precision of the timestamp is in seconds, wait so the timestamps
768        // differ.
769        tokio::time::sleep(Duration::from_secs(3)).await;
770
771        media_store
772            .add_media_content(
773                &thumbnail_request,
774                thumbnail_content.clone(),
775                IgnoreMediaRetentionPolicy::No,
776            )
777            .await
778            .expect("adding thumbnail failed");
779
780        // File's last access is older than thumbnail.
781        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
782
783        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
784        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
785        assert_eq!(contents[1], content, "file is not second-to-last access");
786
787        // Since the precision of the timestamp is in seconds, wait so the timestamps
788        // differ.
789        tokio::time::sleep(Duration::from_secs(3)).await;
790
791        // Access the file so its last access is more recent.
792        let _ = media_store
793            .get_media_content(&file_request)
794            .await
795            .expect("getting file failed")
796            .expect("file is missing");
797
798        // File's last access is more recent than thumbnail.
799        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
800
801        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
802        assert_eq!(contents[0], content, "file is not last access");
803        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
804    }
805}
806
807#[cfg(test)]
808mod encrypted_tests {
809    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
810
811    use matrix_sdk_base::{
812        media::store::MediaStoreError, media_store_inner_integration_tests,
813        media_store_integration_tests, media_store_integration_tests_time,
814    };
815    use once_cell::sync::Lazy;
816    use tempfile::{TempDir, tempdir};
817
818    use super::SqliteMediaStore;
819
820    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
821    static NUM: AtomicU32 = AtomicU32::new(0);
822
823    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
824        let name = NUM.fetch_add(1, SeqCst).to_string();
825        let tmpdir_path = TMP_DIR.path().join(name);
826
827        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
828
829        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
830            .await
831            .unwrap())
832    }
833
834    media_store_integration_tests!();
835    media_store_integration_tests_time!();
836    media_store_inner_integration_tests!();
837}