Skip to main content

matrix_sdk_sqlite/
media_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`MediaStore`].
16
17use std::{fmt, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    media::{
23        MediaRequestParameters, UniqueKey,
24        store::{
25            IgnoreMediaRetentionPolicy, MediaRetentionPolicy, MediaService, MediaStore,
26            MediaStoreInner,
27        },
28    },
29    timer,
30};
31use matrix_sdk_store_encryption::StoreCipher;
32use ruma::{MilliSecondsSinceUnixEpoch, MxcUri, time::SystemTime};
33use rusqlite::{OptionalExtension, params_from_iter};
34use tokio::{
35    fs,
36    sync::{Mutex, OwnedMutexGuard},
37};
38use tracing::{debug, instrument};
39
40use crate::{
41    OpenStoreError, Secret, SqliteStoreConfig,
42    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
43    error::{Error, Result},
44    utils::{
45        EncryptableStore, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
46        SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars, time_to_timestamp,
47    },
48};
49
50mod keys {
51    // Entries in Key-value store
52    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
53    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
54
55    // Tables
56    pub const MEDIA: &str = "media";
57}
58
59/// The database name.
60const DATABASE_NAME: &str = "matrix-sdk-media.sqlite3";
61
62/// An SQLite-based media store.
63#[derive(Clone)]
64pub struct SqliteMediaStore {
65    store_cipher: Option<Arc<StoreCipher>>,
66
67    /// The pool of connections.
68    pool: SqlitePool,
69
70    /// We make the difference between connections for read operations, and for
71    /// write operations. We keep a single connection apart from write
72    /// operations. All other connections are used for read operations. The
73    /// lock is used to ensure there is one owner at a time.
74    write_connection: Arc<Mutex<SqliteAsyncConn>>,
75
76    media_service: MediaService,
77}
78
79#[cfg(not(tarpaulin_include))]
80impl fmt::Debug for SqliteMediaStore {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        f.debug_struct("SqliteMediaStore").finish_non_exhaustive()
83    }
84}
85
86impl EncryptableStore for SqliteMediaStore {
87    fn get_cypher(&self) -> Option<&StoreCipher> {
88        self.store_cipher.as_deref()
89    }
90}
91
92impl SqliteMediaStore {
93    /// Open the SQLite-based media store at the given path using the
94    /// given passphrase to encrypt private data.
95    pub async fn open(
96        path: impl AsRef<Path>,
97        passphrase: Option<&str>,
98    ) -> Result<Self, OpenStoreError> {
99        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
100    }
101
102    /// Open the SQLite-based media store at the given path using the given
103    /// key to encrypt private data.
104    pub async fn open_with_key(
105        path: impl AsRef<Path>,
106        key: Option<&[u8; 32]>,
107    ) -> Result<Self, OpenStoreError> {
108        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
109    }
110
111    /// Open the SQLite-based media store with the config open config.
112    #[instrument(skip(config), fields(path = ?config.path))]
113    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
114        debug!(?config);
115
116        let _timer = timer!("open_with_config");
117
118        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
119
120        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
121
122        let this = Self::open_with_pool(pool, config.secret).await?;
123        this.write().await?.apply_runtime_config(config.runtime_config).await?;
124
125        Ok(this)
126    }
127
128    /// Open an SQLite-based media store using the given SQLite database
129    /// pool. The given passphrase will be used to encrypt private data.
130    async fn open_with_pool(
131        pool: SqlitePool,
132        secret: Option<Secret>,
133    ) -> Result<Self, OpenStoreError> {
134        let conn = pool.get().await?;
135
136        let version = conn.db_version().await?;
137        run_migrations(&conn, version).await?;
138
139        conn.wal_checkpoint().await;
140
141        let store_cipher = match secret {
142            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
143            None => None,
144        };
145
146        let media_service = MediaService::new();
147        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
148        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
149        media_service.restore(media_retention_policy, last_media_cleanup_time);
150
151        Ok(Self {
152            store_cipher,
153            pool,
154            // Use `conn` as our selected write connections.
155            write_connection: Arc::new(Mutex::new(conn)),
156            media_service,
157        })
158    }
159
160    // Acquire a connection for executing read operations.
161    #[instrument(skip_all)]
162    async fn read(&self) -> Result<SqliteAsyncConn> {
163        let connection = self.pool.get().await?;
164
165        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
166        // support must be enabled on a per-connection basis. Execute it every
167        // time we try to get a connection, since we can't guarantee a previous
168        // connection did enable it before.
169        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
170
171        Ok(connection)
172    }
173
174    // Acquire a connection for executing write operations.
175    #[instrument(skip_all)]
176    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
177        let connection = self.write_connection.clone().lock_owned().await;
178
179        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
180        // support must be enabled on a per-connection basis. Execute it every
181        // time we try to get a connection, since we can't guarantee a previous
182        // connection did enable it before.
183        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
184
185        Ok(connection)
186    }
187
188    pub async fn vacuum(&self) -> Result<()> {
189        self.write_connection.lock().await.vacuum().await
190    }
191
192    async fn get_db_size(&self) -> Result<Option<usize>> {
193        Ok(Some(self.pool.get().await?.get_db_size().await?))
194    }
195}
196
197/// Run migrations for the given version of the database.
198async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
199    // Always enable foreign keys for the current connection.
200    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
201
202    if version < 1 {
203        debug!("Creating database");
204        // First turn on WAL mode, this can't be done in the transaction, it fails with
205        // the error message: "cannot change into wal mode from within a transaction".
206        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
207        conn.with_transaction(|txn| {
208            txn.execute_batch(include_str!("../migrations/media_store/001_init.sql"))?;
209            txn.set_db_version(1)
210        })
211        .await?;
212    }
213
214    if version < 2 {
215        debug!("Upgrading database to version 2");
216        conn.with_transaction(|txn| {
217            txn.execute_batch(include_str!(
218                "../migrations/media_store/002_lease_locks_with_generation.sql"
219            ))?;
220            txn.set_db_version(2)
221        })
222        .await?;
223    }
224
225    Ok(())
226}
227
228#[async_trait]
229impl MediaStore for SqliteMediaStore {
230    type Error = Error;
231
232    #[instrument(skip(self))]
233    async fn try_take_leased_lock(
234        &self,
235        lease_duration_ms: u32,
236        key: &str,
237        holder: &str,
238    ) -> Result<Option<CrossProcessLockGeneration>> {
239        let key = key.to_owned();
240        let holder = holder.to_owned();
241
242        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
243        let expiration = now + lease_duration_ms as u64;
244
245        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
246        let generation = self
247            .write()
248            .await?
249            .with_transaction(move |txn| {
250                txn.query_row(
251                    "INSERT INTO lease_locks (key, holder, expiration)
252                    VALUES (?1, ?2, ?3)
253                    ON CONFLICT (key)
254                    DO
255                        UPDATE SET
256                            holder = excluded.holder,
257                            expiration = excluded.expiration,
258                            generation =
259                                CASE holder
260                                    WHEN excluded.holder THEN generation
261                                    ELSE generation + 1
262                                END
263                        WHERE
264                            holder = excluded.holder
265                            OR expiration < ?4
266                    RETURNING generation
267                    ",
268                    (key, holder, expiration, now),
269                    |row| row.get(0),
270                )
271                .optional()
272            })
273            .await?;
274
275        Ok(generation)
276    }
277
278    async fn add_media_content(
279        &self,
280        request: &MediaRequestParameters,
281        content: Vec<u8>,
282        ignore_policy: IgnoreMediaRetentionPolicy,
283    ) -> Result<()> {
284        let _timer = timer!("method");
285
286        self.media_service.add_media_content(self, request, content, ignore_policy).await
287    }
288
289    #[instrument(skip_all)]
290    async fn replace_media_key(
291        &self,
292        from: &MediaRequestParameters,
293        to: &MediaRequestParameters,
294    ) -> Result<(), Self::Error> {
295        let _timer = timer!("method");
296
297        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
298        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
299
300        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
301        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
302
303        let conn = self.write().await?;
304        conn.execute(
305            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
306            (new_uri, new_format, prev_uri, prev_format),
307        )
308        .await?;
309
310        Ok(())
311    }
312
313    #[instrument(skip_all)]
314    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
315        let _timer = timer!("method");
316
317        self.media_service.get_media_content(self, request).await
318    }
319
320    #[instrument(skip_all)]
321    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
322        let _timer = timer!("method");
323
324        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
325        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
326
327        let conn = self.write().await?;
328        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
329
330        Ok(())
331    }
332
333    #[instrument(skip(self))]
334    async fn get_media_content_for_uri(
335        &self,
336        uri: &MxcUri,
337    ) -> Result<Option<Vec<u8>>, Self::Error> {
338        let _timer = timer!("method");
339
340        self.media_service.get_media_content_for_uri(self, uri).await
341    }
342
343    #[instrument(skip(self))]
344    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
345        let _timer = timer!("method");
346
347        let uri = self.encode_key(keys::MEDIA, uri);
348
349        let conn = self.write().await?;
350        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
351
352        Ok(())
353    }
354
355    #[instrument(skip_all)]
356    async fn set_media_retention_policy(
357        &self,
358        policy: MediaRetentionPolicy,
359    ) -> Result<(), Self::Error> {
360        let _timer = timer!("method");
361
362        self.media_service.set_media_retention_policy(self, policy).await
363    }
364
365    #[instrument(skip_all)]
366    fn media_retention_policy(&self) -> MediaRetentionPolicy {
367        let _timer = timer!("method");
368
369        self.media_service.media_retention_policy()
370    }
371
372    #[instrument(skip_all)]
373    async fn set_ignore_media_retention_policy(
374        &self,
375        request: &MediaRequestParameters,
376        ignore_policy: IgnoreMediaRetentionPolicy,
377    ) -> Result<(), Self::Error> {
378        let _timer = timer!("method");
379
380        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
381    }
382
383    #[instrument(skip_all)]
384    async fn clean(&self) -> Result<(), Self::Error> {
385        let _timer = timer!("method");
386
387        self.media_service.clean(self).await
388    }
389
390    async fn optimize(&self) -> Result<(), Self::Error> {
391        Ok(self.vacuum().await?)
392    }
393
394    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
395        self.get_db_size().await
396    }
397}
398
399#[cfg_attr(target_family = "wasm", async_trait(?Send))]
400#[cfg_attr(not(target_family = "wasm"), async_trait)]
401impl MediaStoreInner for SqliteMediaStore {
402    type Error = Error;
403
404    async fn media_retention_policy_inner(
405        &self,
406    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
407        let conn = self.read().await?;
408        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
409    }
410
411    async fn set_media_retention_policy_inner(
412        &self,
413        policy: MediaRetentionPolicy,
414    ) -> Result<(), Self::Error> {
415        let conn = self.write().await?;
416        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
417        Ok(())
418    }
419
420    async fn add_media_content_inner(
421        &self,
422        request: &MediaRequestParameters,
423        data: Vec<u8>,
424        last_access: SystemTime,
425        policy: MediaRetentionPolicy,
426        ignore_policy: IgnoreMediaRetentionPolicy,
427    ) -> Result<(), Self::Error> {
428        let ignore_policy = ignore_policy.is_yes();
429        let data = self.encode_value(data)?;
430
431        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
432            return Ok(());
433        }
434
435        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
436        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
437        let timestamp = time_to_timestamp(last_access);
438
439        let conn = self.write().await?;
440        conn.execute(
441            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
442            (uri, format, data, timestamp, ignore_policy),
443        )
444        .await?;
445
446        Ok(())
447    }
448
449    async fn set_ignore_media_retention_policy_inner(
450        &self,
451        request: &MediaRequestParameters,
452        ignore_policy: IgnoreMediaRetentionPolicy,
453    ) -> Result<(), Self::Error> {
454        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
455        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
456        let ignore_policy = ignore_policy.is_yes();
457
458        let conn = self.write().await?;
459        conn.execute(
460            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
461            (ignore_policy, uri, format),
462        )
463        .await?;
464
465        Ok(())
466    }
467
468    async fn get_media_content_inner(
469        &self,
470        request: &MediaRequestParameters,
471        current_time: SystemTime,
472    ) -> Result<Option<Vec<u8>>, Self::Error> {
473        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
474        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
475        let timestamp = time_to_timestamp(current_time);
476
477        let conn = self.write().await?;
478        let data = conn
479            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
480                // Update the last access.
481                // We need to do this first so the transaction is in write mode right away.
482                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
483                txn.execute(
484                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
485                    (timestamp, &uri, &format),
486                )?;
487
488                txn.query_row::<Vec<u8>, _, _>(
489                    "SELECT data FROM media WHERE uri = ? AND format = ?",
490                    (&uri, &format),
491                    |row| row.get(0),
492                )
493                .optional()
494            })
495            .await?;
496
497        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
498    }
499
500    async fn get_media_content_for_uri_inner(
501        &self,
502        uri: &MxcUri,
503        current_time: SystemTime,
504    ) -> Result<Option<Vec<u8>>, Self::Error> {
505        let uri = self.encode_key(keys::MEDIA, uri);
506        let timestamp = time_to_timestamp(current_time);
507
508        let conn = self.write().await?;
509        let data = conn
510            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
511                // Update the last access.
512                // We need to do this first so the transaction is in write mode right away.
513                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
514                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
515
516                txn.query_row::<Vec<u8>, _, _>(
517                    "SELECT data FROM media WHERE uri = ?",
518                    (&uri,),
519                    |row| row.get(0),
520                )
521                .optional()
522            })
523            .await?;
524
525        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
526    }
527
528    async fn clean_inner(
529        &self,
530        policy: MediaRetentionPolicy,
531        current_time: SystemTime,
532    ) -> Result<(), Self::Error> {
533        if !policy.has_limitations() {
534            // We can safely skip all the checks.
535            return Ok(());
536        }
537
538        let conn = self.write().await?;
539        let removed = conn
540            .with_transaction::<_, Error, _>(move |txn| {
541                let mut removed = false;
542
543                // First, check media content that exceed the max filesize.
544                if let Some(max_file_size) = policy.computed_max_file_size() {
545                    let count = txn.execute(
546                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
547                        (max_file_size,),
548                    )?;
549
550                    if count > 0 {
551                        removed = true;
552                    }
553                }
554
555                // Then, clean up expired media content.
556                if let Some(last_access_expiry) = policy.last_access_expiry {
557                    let current_timestamp = time_to_timestamp(current_time);
558                    let expiry_secs = last_access_expiry.as_secs();
559                    let count = txn.execute(
560                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
561                        (current_timestamp, expiry_secs),
562                    )?;
563
564                    if count > 0 {
565                        removed = true;
566                    }
567                }
568
569                // Finally, if the cache size is too big, remove old items until it fits.
570                if let Some(max_cache_size) = policy.max_cache_size {
571                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
572                    // during the conversion of the result.
573                    let cache_size = txn
574                        .query_row(
575                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
576                            (),
577                            |row| {
578                                // `sum()` returns `NULL` if there are no rows.
579                                row.get::<_, Option<u64>>(0)
580                            },
581                        )?
582                        .unwrap_or_default();
583
584                    // If the cache size is overflowing or bigger than max cache size, clean up.
585                    if cache_size > max_cache_size {
586                        // Get the sizes of the media contents ordered by last access.
587                        let mut cached_stmt = txn.prepare_cached(
588                            "SELECT rowid, length(data) FROM media \
589                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
590                        )?;
591                        let content_sizes = cached_stmt
592                            .query(())?
593                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
594
595                        let mut accumulated_items_size = 0u64;
596                        let mut limit_reached = false;
597                        let mut rows_to_remove = Vec::new();
598
599                        for result in content_sizes {
600                            let (row_id, size) = match result {
601                                Ok(content_size) => content_size,
602                                Err(error) => {
603                                    return Err(error.into());
604                                }
605                            };
606
607                            if limit_reached {
608                                rows_to_remove.push(row_id);
609                                continue;
610                            }
611
612                            match accumulated_items_size.checked_add(size) {
613                                Some(acc) if acc > max_cache_size => {
614                                    // We can stop accumulating.
615                                    limit_reached = true;
616                                    rows_to_remove.push(row_id);
617                                }
618                                Some(acc) => accumulated_items_size = acc,
619                                None => {
620                                    // The accumulated size is overflowing but the setting cannot be
621                                    // bigger than usize::MAX, we can stop accumulating.
622                                    limit_reached = true;
623                                    rows_to_remove.push(row_id);
624                                }
625                            }
626                        }
627
628                        if !rows_to_remove.is_empty() {
629                            removed = true;
630                        }
631
632                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
633                            let sql_params = repeat_vars(row_ids.len());
634                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
635                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
636                            Ok(Vec::<()>::new())
637                        })?;
638                    }
639                }
640
641                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
642
643                Ok(removed)
644            })
645            .await?;
646
647        // If we removed media, defragment the database and free space on the
648        // filesystem.
649        if removed {
650            conn.vacuum().await?;
651        }
652
653        Ok(())
654    }
655
656    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
657        let conn = self.read().await?;
658        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use std::{
665        path::PathBuf,
666        sync::{
667            LazyLock,
668            atomic::{AtomicU32, Ordering::SeqCst},
669        },
670        time::Duration,
671    };
672
673    use matrix_sdk_base::{
674        media::{
675            MediaFormat, MediaRequestParameters, MediaThumbnailSettings,
676            store::{IgnoreMediaRetentionPolicy, MediaStore, MediaStoreError},
677        },
678        media_store_inner_integration_tests, media_store_integration_tests,
679        media_store_integration_tests_time,
680    };
681    use matrix_sdk_test::async_test;
682    use ruma::{events::room::MediaSource, media::Method, mxc_uri, uint};
683    use tempfile::{TempDir, tempdir};
684
685    use super::SqliteMediaStore;
686    use crate::{SqliteStoreConfig, utils::SqliteAsyncConnExt};
687
688    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
689    static NUM: AtomicU32 = AtomicU32::new(0);
690
691    fn new_media_store_workspace() -> PathBuf {
692        let name = NUM.fetch_add(1, SeqCst).to_string();
693        TMP_DIR.path().join(name)
694    }
695
696    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
697        let tmpdir_path = new_media_store_workspace();
698
699        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
700
701        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
702    }
703
704    media_store_integration_tests!();
705    media_store_integration_tests_time!();
706    media_store_inner_integration_tests!();
707
708    async fn get_media_store_content_sorted_by_last_access(
709        media_store: &SqliteMediaStore,
710    ) -> Vec<Vec<u8>> {
711        let sqlite_db = media_store.read().await.expect("accessing sqlite db failed");
712        sqlite_db
713            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
714                stmt.query(())?.mapped(|row| row.get(0)).collect()
715            })
716            .await
717            .expect("querying media cache content by last access failed")
718    }
719
720    #[async_test]
721    async fn test_pool_size() {
722        let tmpdir_path = new_media_store_workspace();
723        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
724
725        let store = SqliteMediaStore::open_with_config(store_open_config).await.unwrap();
726
727        assert_eq!(store.pool.status().max_size, 42);
728    }
729
730    #[async_test]
731    async fn test_last_access() {
732        let media_store = get_media_store().await.expect("creating media cache failed");
733        let uri = mxc_uri!("mxc://localhost/media");
734        let file_request = MediaRequestParameters {
735            source: MediaSource::Plain(uri.to_owned()),
736            format: MediaFormat::File,
737        };
738        let thumbnail_request = MediaRequestParameters {
739            source: MediaSource::Plain(uri.to_owned()),
740            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
741                Method::Crop,
742                uint!(100),
743                uint!(100),
744            )),
745        };
746
747        let content: Vec<u8> = "hello world".into();
748        let thumbnail_content: Vec<u8> = "hello…".into();
749
750        // Add the media.
751        media_store
752            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
753            .await
754            .expect("adding file failed");
755
756        // Since the precision of the timestamp is in seconds, wait so the timestamps
757        // differ.
758        tokio::time::sleep(Duration::from_secs(3)).await;
759
760        media_store
761            .add_media_content(
762                &thumbnail_request,
763                thumbnail_content.clone(),
764                IgnoreMediaRetentionPolicy::No,
765            )
766            .await
767            .expect("adding thumbnail failed");
768
769        // File's last access is older than thumbnail.
770        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
771
772        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
773        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
774        assert_eq!(contents[1], content, "file is not second-to-last access");
775
776        // Since the precision of the timestamp is in seconds, wait so the timestamps
777        // differ.
778        tokio::time::sleep(Duration::from_secs(3)).await;
779
780        // Access the file so its last access is more recent.
781        let _ = media_store
782            .get_media_content(&file_request)
783            .await
784            .expect("getting file failed")
785            .expect("file is missing");
786
787        // File's last access is more recent than thumbnail.
788        let contents = get_media_store_content_sorted_by_last_access(&media_store).await;
789
790        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
791        assert_eq!(contents[0], content, "file is not last access");
792        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
793    }
794}
795
796#[cfg(test)]
797mod encrypted_tests {
798    use std::sync::{
799        LazyLock,
800        atomic::{AtomicU32, Ordering::SeqCst},
801    };
802
803    use matrix_sdk_base::{
804        media::store::MediaStoreError, media_store_inner_integration_tests,
805        media_store_integration_tests, media_store_integration_tests_time,
806    };
807    use tempfile::{TempDir, tempdir};
808
809    use super::SqliteMediaStore;
810
811    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
812    static NUM: AtomicU32 = AtomicU32::new(0);
813
814    async fn get_media_store() -> Result<SqliteMediaStore, MediaStoreError> {
815        let name = NUM.fetch_add(1, SeqCst).to_string();
816        let tmpdir_path = TMP_DIR.path().join(name);
817
818        tracing::info!("using media store @ {}", tmpdir_path.to_str().unwrap());
819
820        Ok(SqliteMediaStore::open(tmpdir_path.to_str().unwrap(), Some("default_test_password"))
821            .await
822            .unwrap())
823    }
824
825    media_store_integration_tests!();
826    media_store_integration_tests_time!();
827    media_store_inner_integration_tests!();
828}