matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sqlite-based backend for the [`EventCacheStore`].
16
17use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{
25            media::{
26                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
27                MediaService,
28            },
29            EventCacheStore,
30        },
31        Event, Gap,
32    },
33    linked_chunk::{
34        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Position, RawChunk, Update,
35    },
36    media::{MediaRequestParameters, UniqueKey},
37};
38use matrix_sdk_store_encryption::StoreCipher;
39use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
40use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
41use tokio::fs;
42use tracing::{debug, error, trace};
43
44use crate::{
45    error::{Error, Result},
46    utils::{
47        repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
48        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
49    },
50    OpenStoreError, SqliteStoreConfig,
51};
52
53mod keys {
54    // Entries in Key-value store
55    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
56    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
57
58    // Tables
59    pub const LINKED_CHUNKS: &str = "linked_chunks";
60    pub const MEDIA: &str = "media";
61}
62
63/// The database name.
64const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
65
66/// Identifier of the latest database version.
67///
68/// This is used to figure whether the SQLite database requires a migration.
69/// Every new SQL migration should imply a bump of this number, and changes in
70/// the [`run_migrations`] function.
71const DATABASE_VERSION: u8 = 6;
72
73/// The string used to identify a chunk of type events, in the `type` field in
74/// the database.
75const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
76/// The string used to identify a chunk of type gap, in the `type` field in the
77/// database.
78const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
79
80/// A SQLite-based event cache store.
81#[derive(Clone)]
82pub struct SqliteEventCacheStore {
83    store_cipher: Option<Arc<StoreCipher>>,
84    pool: SqlitePool,
85    media_service: MediaService,
86}
87
88#[cfg(not(tarpaulin_include))]
89impl fmt::Debug for SqliteEventCacheStore {
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
92    }
93}
94
95impl SqliteEventCacheStore {
96    /// Open the SQLite-based event cache store at the given path using the
97    /// given passphrase to encrypt private data.
98    pub async fn open(
99        path: impl AsRef<Path>,
100        passphrase: Option<&str>,
101    ) -> Result<Self, OpenStoreError> {
102        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
103    }
104
105    /// Open the sqlite-based event cache store with the config open config.
106    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
107        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
108
109        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
110
111        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
112        config.pool = Some(pool_config);
113
114        let pool = config.create_pool(Runtime::Tokio1)?;
115
116        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
117        this.pool.get().await?.apply_runtime_config(runtime_config).await?;
118
119        Ok(this)
120    }
121
122    /// Open an SQLite-based event cache store using the given SQLite database
123    /// pool. The given passphrase will be used to encrypt private data.
124    async fn open_with_pool(
125        pool: SqlitePool,
126        passphrase: Option<&str>,
127    ) -> Result<Self, OpenStoreError> {
128        let conn = pool.get().await?;
129
130        let version = conn.db_version().await?;
131        run_migrations(&conn, version).await?;
132
133        let store_cipher = match passphrase {
134            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
135            None => None,
136        };
137
138        let media_service = MediaService::new();
139        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
140        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
141        media_service.restore(media_retention_policy, last_media_cleanup_time);
142
143        Ok(Self { store_cipher, pool, media_service })
144    }
145
146    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
147        if let Some(key) = &self.store_cipher {
148            let encrypted = key.encrypt_value_data(value)?;
149            Ok(rmp_serde::to_vec_named(&encrypted)?)
150        } else {
151            Ok(value)
152        }
153    }
154
155    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
156        if let Some(key) = &self.store_cipher {
157            let encrypted = rmp_serde::from_slice(value)?;
158            let decrypted = key.decrypt_value_data(encrypted)?;
159            Ok(Cow::Owned(decrypted))
160        } else {
161            Ok(Cow::Borrowed(value))
162        }
163    }
164
165    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
166        let bytes = key.as_ref();
167        if let Some(store_cipher) = &self.store_cipher {
168            Key::Hashed(store_cipher.hash_key(table_name, bytes))
169        } else {
170            Key::Plain(bytes.to_owned())
171        }
172    }
173
174    async fn acquire(&self) -> Result<SqliteAsyncConn> {
175        let connection = self.pool.get().await?;
176
177        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key support must be
178        // enabled on a per-connection basis. Execute it every time we try to get a
179        // connection, since we can't guarantee a previous connection did enable
180        // it before.
181        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
182
183        Ok(connection)
184    }
185
186    fn map_row_to_chunk(
187        row: &rusqlite::Row<'_>,
188    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
189        Ok((
190            row.get::<_, u64>(0)?,
191            row.get::<_, Option<u64>>(1)?,
192            row.get::<_, Option<u64>>(2)?,
193            row.get::<_, String>(3)?,
194        ))
195    }
196}
197
198trait TransactionExtForLinkedChunks {
199    fn rebuild_chunk(
200        &self,
201        store: &SqliteEventCacheStore,
202        room_id: &Key,
203        previous: Option<u64>,
204        index: u64,
205        next: Option<u64>,
206        chunk_type: &str,
207    ) -> Result<RawChunk<Event, Gap>>;
208
209    fn load_gap_content(
210        &self,
211        store: &SqliteEventCacheStore,
212        room_id: &Key,
213        chunk_id: ChunkIdentifier,
214    ) -> Result<Gap>;
215
216    fn load_events_content(
217        &self,
218        store: &SqliteEventCacheStore,
219        room_id: &Key,
220        chunk_id: ChunkIdentifier,
221    ) -> Result<Vec<Event>>;
222}
223
224impl TransactionExtForLinkedChunks for Transaction<'_> {
225    fn rebuild_chunk(
226        &self,
227        store: &SqliteEventCacheStore,
228        room_id: &Key,
229        previous: Option<u64>,
230        id: u64,
231        next: Option<u64>,
232        chunk_type: &str,
233    ) -> Result<RawChunk<Event, Gap>> {
234        let previous = previous.map(ChunkIdentifier::new);
235        let next = next.map(ChunkIdentifier::new);
236        let id = ChunkIdentifier::new(id);
237
238        match chunk_type {
239            CHUNK_TYPE_GAP_TYPE_STRING => {
240                // It's a gap!
241                let gap = self.load_gap_content(store, room_id, id)?;
242                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
243            }
244
245            CHUNK_TYPE_EVENT_TYPE_STRING => {
246                // It's events!
247                let events = self.load_events_content(store, room_id, id)?;
248                Ok(RawChunk {
249                    content: ChunkContent::Items(events),
250                    previous,
251                    identifier: id,
252                    next,
253                })
254            }
255
256            other => {
257                // It's an error!
258                Err(Error::InvalidData {
259                    details: format!("a linked chunk has an unknown type {other}"),
260                })
261            }
262        }
263    }
264
265    fn load_gap_content(
266        &self,
267        store: &SqliteEventCacheStore,
268        room_id: &Key,
269        chunk_id: ChunkIdentifier,
270    ) -> Result<Gap> {
271        // There's at most one row for it in the database, so a call to `query_row` is
272        // sufficient.
273        let encoded_prev_token: Vec<u8> = self.query_row(
274            "SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
275            (chunk_id.index(), &room_id),
276            |row| row.get(0),
277        )?;
278        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
279        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
280        Ok(Gap { prev_token })
281    }
282
283    fn load_events_content(
284        &self,
285        store: &SqliteEventCacheStore,
286        room_id: &Key,
287        chunk_id: ChunkIdentifier,
288    ) -> Result<Vec<Event>> {
289        // Retrieve all the events from the database.
290        let mut events = Vec::new();
291
292        for event_data in self
293            .prepare(
294                r#"
295                    SELECT content FROM events
296                    WHERE chunk_id = ? AND room_id = ?
297                    ORDER BY position ASC
298                "#,
299            )?
300            .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
301        {
302            let encoded_content = event_data?;
303            let serialized_content = store.decode_value(&encoded_content)?;
304            let event = serde_json::from_slice(&serialized_content)?;
305
306            events.push(event);
307        }
308
309        Ok(events)
310    }
311}
312
313/// Run migrations for the given version of the database.
314async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
315    if version == 0 {
316        debug!("Creating database");
317    } else if version < DATABASE_VERSION {
318        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
319    } else {
320        return Ok(());
321    }
322
323    // Always enable foreign keys for the current connection.
324    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
325
326    if version < 1 {
327        // First turn on WAL mode, this can't be done in the transaction, it fails with
328        // the error message: "cannot change into wal mode from within a transaction".
329        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
330        conn.with_transaction(|txn| {
331            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
332            txn.set_db_version(1)
333        })
334        .await?;
335    }
336
337    if version < 2 {
338        conn.with_transaction(|txn| {
339            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
340            txn.set_db_version(2)
341        })
342        .await?;
343    }
344
345    if version < 3 {
346        conn.with_transaction(|txn| {
347            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
348            txn.set_db_version(3)
349        })
350        .await?;
351    }
352
353    if version < 4 {
354        conn.with_transaction(|txn| {
355            txn.execute_batch(include_str!(
356                "../migrations/event_cache_store/004_ignore_policy.sql"
357            ))?;
358            txn.set_db_version(4)
359        })
360        .await?;
361    }
362
363    if version < 5 {
364        conn.with_transaction(|txn| {
365            txn.execute_batch(include_str!(
366                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
367            ))?;
368            txn.set_db_version(5)
369        })
370        .await?;
371    }
372
373    if version < 6 {
374        conn.with_transaction(|txn| {
375            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
376            txn.set_db_version(6)
377        })
378        .await?;
379    }
380
381    Ok(())
382}
383
384#[async_trait]
385impl EventCacheStore for SqliteEventCacheStore {
386    type Error = Error;
387
388    async fn try_take_leased_lock(
389        &self,
390        lease_duration_ms: u32,
391        key: &str,
392        holder: &str,
393    ) -> Result<bool> {
394        let key = key.to_owned();
395        let holder = holder.to_owned();
396
397        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
398        let expiration = now + lease_duration_ms as u64;
399
400        let num_touched = self
401            .acquire()
402            .await?
403            .with_transaction(move |txn| {
404                txn.execute(
405                    "INSERT INTO lease_locks (key, holder, expiration)
406                    VALUES (?1, ?2, ?3)
407                    ON CONFLICT (key)
408                    DO
409                        UPDATE SET holder = ?2, expiration = ?3
410                        WHERE holder = ?2
411                        OR expiration < ?4
412                ",
413                    (key, holder, expiration, now),
414                )
415            })
416            .await?;
417
418        Ok(num_touched == 1)
419    }
420
421    async fn handle_linked_chunk_updates(
422        &self,
423        room_id: &RoomId,
424        updates: Vec<Update<Event, Gap>>,
425    ) -> Result<(), Self::Error> {
426        // Use a single transaction throughout this function, so that either all updates
427        // work, or none is taken into account.
428        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
429        let room_id = room_id.to_owned();
430        let this = self.clone();
431
432        with_immediate_transaction(self.acquire().await?, move |txn| {
433            for up in updates {
434                match up {
435                    Update::NewItemsChunk { previous, new, next } => {
436                        let previous = previous.as_ref().map(ChunkIdentifier::index);
437                        let new = new.index();
438                        let next = next.as_ref().map(ChunkIdentifier::index);
439
440                        trace!(
441                            %room_id,
442                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
443                        );
444
445                        insert_chunk(
446                            txn,
447                            &hashed_room_id,
448                            previous,
449                            new,
450                            next,
451                            CHUNK_TYPE_EVENT_TYPE_STRING,
452                        )?;
453                    }
454
455                    Update::NewGapChunk { previous, new, next, gap } => {
456                        let serialized = serde_json::to_vec(&gap.prev_token)?;
457                        let prev_token = this.encode_value(serialized)?;
458
459                        let previous = previous.as_ref().map(ChunkIdentifier::index);
460                        let new = new.index();
461                        let next = next.as_ref().map(ChunkIdentifier::index);
462
463                        trace!(
464                            %room_id,
465                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
466                        );
467
468                        // Insert the chunk as a gap.
469                        insert_chunk(
470                            txn,
471                            &hashed_room_id,
472                            previous,
473                            new,
474                            next,
475                            CHUNK_TYPE_GAP_TYPE_STRING,
476                        )?;
477
478                        // Insert the gap's value.
479                        txn.execute(
480                            r#"
481                            INSERT INTO gaps(chunk_id, room_id, prev_token)
482                            VALUES (?, ?, ?)
483                        "#,
484                            (new, &hashed_room_id, prev_token),
485                        )?;
486                    }
487
488                    Update::RemoveChunk(chunk_identifier) => {
489                        let chunk_id = chunk_identifier.index();
490
491                        trace!(%room_id, "removing chunk @ {chunk_id}");
492
493                        // Find chunk to delete.
494                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
495                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
496                            (chunk_id, &hashed_room_id),
497                            |row| Ok((row.get(0)?, row.get(1)?))
498                        )?;
499
500                        // Replace its previous' next to its own next.
501                        if let Some(previous) = previous {
502                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
503                        }
504
505                        // Replace its next' previous to its own previous.
506                        if let Some(next) = next {
507                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
508                        }
509
510                        // Now delete it, and let cascading delete corresponding entries in the
511                        // other data tables.
512                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
513                    }
514
515                    Update::PushItems { at, items } => {
516                        if items.is_empty() {
517                            // Should never happens, but better be safe.
518                            continue;
519                        }
520
521                        let chunk_id = at.chunk_identifier().index();
522
523                        trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
524
525                        let mut statement = txn.prepare(
526                            "INSERT INTO events(chunk_id, room_id, event_id, content, position) VALUES (?, ?, ?, ?, ?)"
527                        )?;
528
529                        let invalid_event = |event: TimelineEvent| {
530                            let Some(event_id) = event.event_id() else {
531                                error!(%room_id, "Trying to push an event with no ID");
532                                return None;
533                            };
534
535                            Some((event_id.to_string(), event))
536                        };
537
538                        for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
539                            let serialized = serde_json::to_vec(&event)?;
540                            let content = this.encode_value(serialized)?;
541
542                            let index = at.index() + i;
543
544                            statement.execute((chunk_id, &hashed_room_id, event_id, content, index))?;
545                        }
546                    }
547
548                    Update::ReplaceItem { at, item: event } => {
549                        let chunk_id = at.chunk_identifier().index();
550                        let index = at.index();
551
552                        trace!(%room_id, "replacing item @ {chunk_id}:{index}");
553
554                        let serialized = serde_json::to_vec(&event)?;
555                        let content = this.encode_value(serialized)?;
556
557                        // The event id should be the same, but just in case it changed…
558                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
559                            error!(%room_id, "Trying to replace an event with a new one that has no ID");
560                            continue;
561                        };
562
563                        txn.execute(
564                            r#"
565                            UPDATE events
566                            SET content = ?, event_id = ?
567                            WHERE room_id = ? AND chunk_id = ? AND position = ?
568                        "#,
569                            (content, event_id, &hashed_room_id, chunk_id, index,)
570                        )?;
571                    }
572
573                    Update::RemoveItem { at } => {
574                        let chunk_id = at.chunk_identifier().index();
575                        let index = at.index();
576
577                        trace!(%room_id, "removing item @ {chunk_id}:{index}");
578
579                        // Remove the entry.
580                        txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
581
582                        // Decrement the index of each item after the one we're going to remove.
583                        txn.execute(
584                            r#"
585                                UPDATE events
586                                SET position = position - 1
587                                WHERE room_id = ? AND chunk_id = ? AND position > ?
588                            "#,
589                            (&hashed_room_id, chunk_id, index)
590                        )?;
591
592                    }
593
594                    Update::DetachLastItems { at } => {
595                        let chunk_id = at.chunk_identifier().index();
596                        let index = at.index();
597
598                        trace!(%room_id, "truncating items >= {chunk_id}:{index}");
599
600                        // Remove these entries.
601                        txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
602                    }
603
604                    Update::Clear => {
605                        trace!(%room_id, "clearing items");
606
607                        // Remove chunks, and let cascading do its job.
608                        txn.execute(
609                            "DELETE FROM linked_chunks WHERE room_id = ?",
610                            (&hashed_room_id,),
611                        )?;
612                    }
613
614                    Update::StartReattachItems | Update::EndReattachItems => {
615                        // Nothing.
616                    }
617                }
618            }
619
620            Ok(())
621        })
622        .await?;
623
624        Ok(())
625    }
626
627    async fn load_all_chunks(
628        &self,
629        room_id: &RoomId,
630    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
631        let room_id = room_id.to_owned();
632        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
633
634        let this = self.clone();
635
636        let result = self
637            .acquire()
638            .await?
639            .with_transaction(move |txn| -> Result<_> {
640                let mut items = Vec::new();
641
642                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
643                for data in txn
644                    .prepare(
645                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
646                    )?
647                    .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
648                {
649                    let (id, previous, next, chunk_type) = data?;
650                    let new = txn.rebuild_chunk(
651                        &this,
652                        &hashed_room_id,
653                        previous,
654                        id,
655                        next,
656                        chunk_type.as_str(),
657                    )?;
658                    items.push(new);
659                }
660
661                Ok(items)
662            })
663            .await?;
664
665        Ok(result)
666    }
667
668    async fn load_last_chunk(
669        &self,
670        room_id: &RoomId,
671    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
672        let room_id = room_id.to_owned();
673        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
674
675        let this = self.clone();
676
677        self
678            .acquire()
679            .await?
680            .with_transaction(move |txn| -> Result<_> {
681                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
682                let (chunk_identifier_generator, number_of_chunks) = txn
683                    .prepare(
684                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE room_id = ?"
685                    )?
686                    .query_row(
687                        (&hashed_room_id,),
688                        |row| {
689                            Ok((
690                                // Read the `MAX(id)` as an `Option<u64>` instead
691                                // of `u64` in case the `SELECT` returns nothing.
692                                // Indeed, if it returns no line, the `MAX(id)` is
693                                // set to `Null`.
694                                row.get::<_, Option<u64>>(0)?,
695                                row.get::<_, u64>(1)?,
696                            ))
697                        }
698                    )?;
699
700                let chunk_identifier_generator = match chunk_identifier_generator {
701                    Some(last_chunk_identifier) => {
702                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
703                            ChunkIdentifier::new(last_chunk_identifier)
704                        )
705                    },
706                    None => ChunkIdentifierGenerator::new_from_scratch(),
707                };
708
709                // Find the last chunk.
710                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
711                    .prepare(
712                        "SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
713                    )?
714                    .query_row(
715                        (&hashed_room_id,),
716                        |row| {
717                            Ok((
718                                row.get::<_, u64>(0)?,
719                                row.get::<_, Option<u64>>(1)?,
720                                row.get::<_, String>(2)?,
721                            ))
722                        }
723                    )
724                    .optional()?
725                else {
726                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
727                    // good.
728                    if number_of_chunks == 0 {
729                        return Ok((None, chunk_identifier_generator));
730                    }
731                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
732                    // linked chunk is malformed.
733                    //
734                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
735                    else {
736                        return Err(Error::InvalidData {
737                            details:
738                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
739                                    .to_owned()
740                            }
741                        )
742                    }
743                };
744
745                // Build the chunk.
746                let last_chunk = txn.rebuild_chunk(
747                    &this,
748                    &hashed_room_id,
749                    previous_chunk,
750                    chunk_identifier,
751                    None,
752                    &chunk_type
753                )?;
754
755                Ok((Some(last_chunk), chunk_identifier_generator))
756            })
757            .await
758    }
759
760    async fn load_previous_chunk(
761        &self,
762        room_id: &RoomId,
763        before_chunk_identifier: ChunkIdentifier,
764    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
765        let room_id = room_id.to_owned();
766        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
767
768        let this = self.clone();
769
770        self
771            .acquire()
772            .await?
773            .with_transaction(move |txn| -> Result<_> {
774                // Find the chunk before the chunk identified by `before_chunk_identifier`.
775                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
776                    .prepare(
777                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
778                    )?
779                    .query_row(
780                        (&hashed_room_id, before_chunk_identifier.index()),
781                        |row| {
782                            Ok((
783                                row.get::<_, u64>(0)?,
784                                row.get::<_, Option<u64>>(1)?,
785                                row.get::<_, Option<u64>>(2)?,
786                                row.get::<_, String>(3)?,
787                            ))
788                        }
789                    )
790                    .optional()?
791                else {
792                    // Chunk is not found.
793                    return Ok(None);
794                };
795
796                // Build the chunk.
797                let last_chunk = txn.rebuild_chunk(
798                    &this,
799                    &hashed_room_id,
800                    previous_chunk,
801                    chunk_identifier,
802                    next_chunk,
803                    &chunk_type
804                )?;
805
806                Ok(Some(last_chunk))
807            })
808            .await
809    }
810
811    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
812        self.acquire()
813            .await?
814            .with_transaction(move |txn| {
815                // Remove all the chunks, and let cascading do its job.
816                txn.execute("DELETE FROM linked_chunks", ())
817            })
818            .await?;
819        Ok(())
820    }
821
822    async fn filter_duplicated_events(
823        &self,
824        room_id: &RoomId,
825        events: Vec<OwnedEventId>,
826    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
827        // If there's no events for which we want to check duplicates, we can return
828        // early. It's not only an optimization to do so: it's required, otherwise the
829        // `repeat_vars` call below will panic.
830        if events.is_empty() {
831            return Ok(Vec::new());
832        }
833
834        // Select all events that exist in the store, i.e. the duplicates.
835        let room_id = room_id.to_owned();
836        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
837
838        self.acquire()
839            .await?
840            .with_transaction(move |txn| -> Result<_> {
841                txn.chunk_large_query_over(events, None, move |txn, events| {
842                    let query = format!(
843                        "SELECT event_id, chunk_id, position FROM events WHERE room_id = ? AND event_id IN ({}) ORDER BY chunk_id ASC, position ASC",
844                        repeat_vars(events.len()),
845                    );
846                    let parameters = params_from_iter(
847                        // parameter for `room_id = ?`
848                        once(
849                            hashed_room_id
850                                .to_sql()
851                                // SAFETY: it cannot fail since `Key::to_sql` never fails
852                                .unwrap(),
853                        )
854                        // parameters for `event_id IN (…)`
855                        .chain(events.iter().map(|event| {
856                            event
857                                .as_str()
858                                .to_sql()
859                                // SAFETY: it cannot fail since `str::to_sql` never fails
860                                .unwrap()
861                        })),
862                    );
863
864                    let mut duplicated_events = Vec::new();
865
866                    for duplicated_event in txn
867                        .prepare(&query)?
868                        .query_map(parameters, |row| {
869                            Ok((
870                                row.get::<_, String>(0)?,
871                                row.get::<_, u64>(1)?,
872                                row.get::<_, usize>(2)?
873                            ))
874                        })?
875                    {
876                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
877
878                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
879                            // Normally unreachable, but the event ID has been stored even if it is
880                            // malformed, let's skip it.
881                            error!(%duplicated_event, %room_id, "Reading an malformed event ID");
882                            continue;
883                        };
884
885                        duplicated_events.push((duplicated_event, Position::new(ChunkIdentifier::new(chunk_identifier), index)));
886                    }
887
888                    Ok(duplicated_events)
889                })
890            })
891            .await
892    }
893
894    async fn find_event(
895        &self,
896        room_id: &RoomId,
897        event_id: &EventId,
898    ) -> Result<Option<(Position, Event)>, Self::Error> {
899        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
900        let event_id = event_id.to_owned();
901        let this = self.clone();
902
903        self.acquire()
904            .await?
905            .with_transaction(move |txn| -> Result<_> {
906                let Some((chunk_identifier, index, event)) = txn
907                    .prepare(
908                        "SELECT chunk_id, position, content FROM events WHERE room_id = ? AND event_id = ?",
909                    )?
910                    .query_row((hashed_room_id, event_id.as_str(),), |row| {
911                        Ok((
912                            row.get::<_, u64>(0)?,
913                            row.get::<_, usize>(1)?,
914                            row.get::<_, Vec<u8>>(2)?,
915                        ))
916                    })
917                    .optional()?
918                else {
919                    // Event is not found.
920                    return Ok(None);
921                };
922
923                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
924
925                Ok(Some((Position::new(ChunkIdentifier::new(chunk_identifier), index), event)))
926            })
927            .await
928    }
929
930    async fn add_media_content(
931        &self,
932        request: &MediaRequestParameters,
933        content: Vec<u8>,
934        ignore_policy: IgnoreMediaRetentionPolicy,
935    ) -> Result<()> {
936        self.media_service.add_media_content(self, request, content, ignore_policy).await
937    }
938
939    async fn replace_media_key(
940        &self,
941        from: &MediaRequestParameters,
942        to: &MediaRequestParameters,
943    ) -> Result<(), Self::Error> {
944        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
945        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
946
947        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
948        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
949
950        let conn = self.acquire().await?;
951        conn.execute(
952            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
953            (new_uri, new_format, prev_uri, prev_format),
954        )
955        .await?;
956
957        Ok(())
958    }
959
960    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
961        self.media_service.get_media_content(self, request).await
962    }
963
964    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
965        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
966        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
967
968        let conn = self.acquire().await?;
969        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
970
971        Ok(())
972    }
973
974    async fn get_media_content_for_uri(
975        &self,
976        uri: &MxcUri,
977    ) -> Result<Option<Vec<u8>>, Self::Error> {
978        self.media_service.get_media_content_for_uri(self, uri).await
979    }
980
981    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
982        let uri = self.encode_key(keys::MEDIA, uri);
983
984        let conn = self.acquire().await?;
985        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
986
987        Ok(())
988    }
989
990    async fn set_media_retention_policy(
991        &self,
992        policy: MediaRetentionPolicy,
993    ) -> Result<(), Self::Error> {
994        self.media_service.set_media_retention_policy(self, policy).await
995    }
996
997    fn media_retention_policy(&self) -> MediaRetentionPolicy {
998        self.media_service.media_retention_policy()
999    }
1000
1001    async fn set_ignore_media_retention_policy(
1002        &self,
1003        request: &MediaRequestParameters,
1004        ignore_policy: IgnoreMediaRetentionPolicy,
1005    ) -> Result<(), Self::Error> {
1006        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1007    }
1008
1009    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1010        self.media_service.clean_up_media_cache(self).await
1011    }
1012}
1013
1014#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1015#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1016impl EventCacheStoreMedia for SqliteEventCacheStore {
1017    type Error = Error;
1018
1019    async fn media_retention_policy_inner(
1020        &self,
1021    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1022        let conn = self.acquire().await?;
1023        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1024    }
1025
1026    async fn set_media_retention_policy_inner(
1027        &self,
1028        policy: MediaRetentionPolicy,
1029    ) -> Result<(), Self::Error> {
1030        let conn = self.acquire().await?;
1031        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1032        Ok(())
1033    }
1034
1035    async fn add_media_content_inner(
1036        &self,
1037        request: &MediaRequestParameters,
1038        data: Vec<u8>,
1039        last_access: SystemTime,
1040        policy: MediaRetentionPolicy,
1041        ignore_policy: IgnoreMediaRetentionPolicy,
1042    ) -> Result<(), Self::Error> {
1043        let ignore_policy = ignore_policy.is_yes();
1044        let data = self.encode_value(data)?;
1045
1046        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1047            return Ok(());
1048        }
1049
1050        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1051        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1052        let timestamp = time_to_timestamp(last_access);
1053
1054        let conn = self.acquire().await?;
1055        conn.execute(
1056            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1057            (uri, format, data, timestamp, ignore_policy),
1058        )
1059        .await?;
1060
1061        Ok(())
1062    }
1063
1064    async fn set_ignore_media_retention_policy_inner(
1065        &self,
1066        request: &MediaRequestParameters,
1067        ignore_policy: IgnoreMediaRetentionPolicy,
1068    ) -> Result<(), Self::Error> {
1069        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1070        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1071        let ignore_policy = ignore_policy.is_yes();
1072
1073        let conn = self.acquire().await?;
1074        conn.execute(
1075            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1076            (ignore_policy, uri, format),
1077        )
1078        .await?;
1079
1080        Ok(())
1081    }
1082
1083    async fn get_media_content_inner(
1084        &self,
1085        request: &MediaRequestParameters,
1086        current_time: SystemTime,
1087    ) -> Result<Option<Vec<u8>>, Self::Error> {
1088        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1089        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1090        let timestamp = time_to_timestamp(current_time);
1091
1092        let conn = self.acquire().await?;
1093        let data = conn
1094            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1095                // Update the last access.
1096                // We need to do this first so the transaction is in write mode right away.
1097                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1098                txn.execute(
1099                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1100                    (timestamp, &uri, &format),
1101                )?;
1102
1103                txn.query_row::<Vec<u8>, _, _>(
1104                    "SELECT data FROM media WHERE uri = ? AND format = ?",
1105                    (&uri, &format),
1106                    |row| row.get(0),
1107                )
1108                .optional()
1109            })
1110            .await?;
1111
1112        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1113    }
1114
1115    async fn get_media_content_for_uri_inner(
1116        &self,
1117        uri: &MxcUri,
1118        current_time: SystemTime,
1119    ) -> Result<Option<Vec<u8>>, Self::Error> {
1120        let uri = self.encode_key(keys::MEDIA, uri);
1121        let timestamp = time_to_timestamp(current_time);
1122
1123        let conn = self.acquire().await?;
1124        let data = conn
1125            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1126                // Update the last access.
1127                // We need to do this first so the transaction is in write mode right away.
1128                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1129                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1130
1131                txn.query_row::<Vec<u8>, _, _>(
1132                    "SELECT data FROM media WHERE uri = ?",
1133                    (&uri,),
1134                    |row| row.get(0),
1135                )
1136                .optional()
1137            })
1138            .await?;
1139
1140        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1141    }
1142
1143    async fn clean_up_media_cache_inner(
1144        &self,
1145        policy: MediaRetentionPolicy,
1146        current_time: SystemTime,
1147    ) -> Result<(), Self::Error> {
1148        if !policy.has_limitations() {
1149            // We can safely skip all the checks.
1150            return Ok(());
1151        }
1152
1153        let conn = self.acquire().await?;
1154        let removed = conn
1155            .with_transaction::<_, Error, _>(move |txn| {
1156                let mut removed = false;
1157
1158                // First, check media content that exceed the max filesize.
1159                if let Some(max_file_size) = policy.computed_max_file_size() {
1160                    let count = txn.execute(
1161                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1162                        (max_file_size,),
1163                    )?;
1164
1165                    if count > 0 {
1166                        removed = true;
1167                    }
1168                }
1169
1170                // Then, clean up expired media content.
1171                if let Some(last_access_expiry) = policy.last_access_expiry {
1172                    let current_timestamp = time_to_timestamp(current_time);
1173                    let expiry_secs = last_access_expiry.as_secs();
1174                    let count = txn.execute(
1175                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1176                        (current_timestamp, expiry_secs),
1177                    )?;
1178
1179                    if count > 0 {
1180                        removed = true;
1181                    }
1182                }
1183
1184                // Finally, if the cache size is too big, remove old items until it fits.
1185                if let Some(max_cache_size) = policy.max_cache_size {
1186                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
1187                    // during the conversion of the result.
1188                    let cache_size = txn
1189                        .query_row(
1190                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1191                            (),
1192                            |row| {
1193                                // `sum()` returns `NULL` if there are no rows.
1194                                row.get::<_, Option<u64>>(0)
1195                            },
1196                        )?
1197                        .unwrap_or_default();
1198
1199                    // If the cache size is overflowing or bigger than max cache size, clean up.
1200                    if cache_size > max_cache_size {
1201                        // Get the sizes of the media contents ordered by last access.
1202                        let mut cached_stmt = txn.prepare_cached(
1203                            "SELECT rowid, length(data) FROM media \
1204                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1205                        )?;
1206                        let content_sizes = cached_stmt
1207                            .query(())?
1208                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1209
1210                        let mut accumulated_items_size = 0u64;
1211                        let mut limit_reached = false;
1212                        let mut rows_to_remove = Vec::new();
1213
1214                        for result in content_sizes {
1215                            let (row_id, size) = match result {
1216                                Ok(content_size) => content_size,
1217                                Err(error) => {
1218                                    return Err(error.into());
1219                                }
1220                            };
1221
1222                            if limit_reached {
1223                                rows_to_remove.push(row_id);
1224                                continue;
1225                            }
1226
1227                            match accumulated_items_size.checked_add(size) {
1228                                Some(acc) if acc > max_cache_size => {
1229                                    // We can stop accumulating.
1230                                    limit_reached = true;
1231                                    rows_to_remove.push(row_id);
1232                                }
1233                                Some(acc) => accumulated_items_size = acc,
1234                                None => {
1235                                    // The accumulated size is overflowing but the setting cannot be
1236                                    // bigger than usize::MAX, we can stop accumulating.
1237                                    limit_reached = true;
1238                                    rows_to_remove.push(row_id);
1239                                }
1240                            };
1241                        }
1242
1243                        if !rows_to_remove.is_empty() {
1244                            removed = true;
1245                        }
1246
1247                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1248                            let sql_params = repeat_vars(row_ids.len());
1249                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1250                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1251                            Ok(Vec::<()>::new())
1252                        })?;
1253                    }
1254                }
1255
1256                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1257
1258                Ok(removed)
1259            })
1260            .await?;
1261
1262        // If we removed media, defragment the database and free space on the
1263        // filesystem.
1264        if removed {
1265            conn.vacuum().await?;
1266        }
1267
1268        Ok(())
1269    }
1270
1271    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1272        let conn = self.acquire().await?;
1273        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1274    }
1275}
1276
1277/// Like `deadpool::managed::Object::with_transaction`, but starts the
1278/// transaction in immediate (write) mode from the beginning, precluding errors
1279/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1280/// both reads and writes, and start with a write.
1281async fn with_immediate_transaction<
1282    T: Send + 'static,
1283    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1284>(
1285    conn: SqliteAsyncConn,
1286    f: F,
1287) -> Result<T, Error> {
1288    conn.interact(move |conn| -> Result<T, Error> {
1289        // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1290        // to avoid read transactions upgrading to write mode and causing
1291        // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1292        conn.set_transaction_behavior(TransactionBehavior::Immediate);
1293
1294        let code = || -> Result<T, Error> {
1295            let txn = conn.transaction()?;
1296            let res = f(&txn)?;
1297            txn.commit()?;
1298            Ok(res)
1299        };
1300
1301        let res = code();
1302
1303        // Reset the transaction behavior to use Deferred, after this transaction has
1304        // been run, whether it was successful or not.
1305        conn.set_transaction_behavior(TransactionBehavior::Deferred);
1306
1307        res
1308    })
1309    .await
1310    // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1311    .unwrap()
1312}
1313
1314fn insert_chunk(
1315    txn: &Transaction<'_>,
1316    room_id: &Key,
1317    previous: Option<u64>,
1318    new: u64,
1319    next: Option<u64>,
1320    type_str: &str,
1321) -> rusqlite::Result<()> {
1322    // First, insert the new chunk.
1323    txn.execute(
1324        r#"
1325            INSERT INTO linked_chunks(id, room_id, previous, next, type)
1326            VALUES (?, ?, ?, ?, ?)
1327        "#,
1328        (new, room_id, previous, next, type_str),
1329    )?;
1330
1331    // If this chunk has a previous one, update its `next` field.
1332    if let Some(previous) = previous {
1333        txn.execute(
1334            r#"
1335                UPDATE linked_chunks
1336                SET next = ?
1337                WHERE id = ? AND room_id = ?
1338            "#,
1339            (new, previous, room_id),
1340        )?;
1341    }
1342
1343    // If this chunk has a next one, update its `previous` field.
1344    if let Some(next) = next {
1345        txn.execute(
1346            r#"
1347                UPDATE linked_chunks
1348                SET previous = ?
1349                WHERE id = ? AND room_id = ?
1350            "#,
1351            (new, next, room_id),
1352        )?;
1353    }
1354
1355    Ok(())
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use std::{
1361        path::PathBuf,
1362        sync::atomic::{AtomicU32, Ordering::SeqCst},
1363        time::Duration,
1364    };
1365
1366    use assert_matches::assert_matches;
1367    use matrix_sdk_base::{
1368        event_cache::{
1369            store::{
1370                integration_tests::{check_test_event, make_test_event},
1371                media::IgnoreMediaRetentionPolicy,
1372                EventCacheStore, EventCacheStoreError,
1373            },
1374            Gap,
1375        },
1376        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1377        event_cache_store_media_integration_tests,
1378        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1379        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1380    };
1381    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1382    use once_cell::sync::Lazy;
1383    use ruma::{events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1384    use tempfile::{tempdir, TempDir};
1385
1386    use super::SqliteEventCacheStore;
1387    use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
1388
1389    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1390    static NUM: AtomicU32 = AtomicU32::new(0);
1391
1392    fn new_event_cache_store_workspace() -> PathBuf {
1393        let name = NUM.fetch_add(1, SeqCst).to_string();
1394        TMP_DIR.path().join(name)
1395    }
1396
1397    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1398        let tmpdir_path = new_event_cache_store_workspace();
1399
1400        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1401
1402        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1403    }
1404
1405    event_cache_store_integration_tests!();
1406    event_cache_store_integration_tests_time!();
1407    event_cache_store_media_integration_tests!(with_media_size_tests);
1408
1409    async fn get_event_cache_store_content_sorted_by_last_access(
1410        event_cache_store: &SqliteEventCacheStore,
1411    ) -> Vec<Vec<u8>> {
1412        let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1413        sqlite_db
1414            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1415                stmt.query(())?.mapped(|row| row.get(0)).collect()
1416            })
1417            .await
1418            .expect("querying media cache content by last access failed")
1419    }
1420
1421    #[async_test]
1422    async fn test_pool_size() {
1423        let tmpdir_path = new_event_cache_store_workspace();
1424        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1425
1426        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1427
1428        assert_eq!(store.pool.status().max_size, 42);
1429    }
1430
1431    #[async_test]
1432    async fn test_last_access() {
1433        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1434        let uri = mxc_uri!("mxc://localhost/media");
1435        let file_request = MediaRequestParameters {
1436            source: MediaSource::Plain(uri.to_owned()),
1437            format: MediaFormat::File,
1438        };
1439        let thumbnail_request = MediaRequestParameters {
1440            source: MediaSource::Plain(uri.to_owned()),
1441            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1442                Method::Crop,
1443                uint!(100),
1444                uint!(100),
1445            )),
1446        };
1447
1448        let content: Vec<u8> = "hello world".into();
1449        let thumbnail_content: Vec<u8> = "hello…".into();
1450
1451        // Add the media.
1452        event_cache_store
1453            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1454            .await
1455            .expect("adding file failed");
1456
1457        // Since the precision of the timestamp is in seconds, wait so the timestamps
1458        // differ.
1459        tokio::time::sleep(Duration::from_secs(3)).await;
1460
1461        event_cache_store
1462            .add_media_content(
1463                &thumbnail_request,
1464                thumbnail_content.clone(),
1465                IgnoreMediaRetentionPolicy::No,
1466            )
1467            .await
1468            .expect("adding thumbnail failed");
1469
1470        // File's last access is older than thumbnail.
1471        let contents =
1472            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1473
1474        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1475        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1476        assert_eq!(contents[1], content, "file is not second-to-last access");
1477
1478        // Since the precision of the timestamp is in seconds, wait so the timestamps
1479        // differ.
1480        tokio::time::sleep(Duration::from_secs(3)).await;
1481
1482        // Access the file so its last access is more recent.
1483        let _ = event_cache_store
1484            .get_media_content(&file_request)
1485            .await
1486            .expect("getting file failed")
1487            .expect("file is missing");
1488
1489        // File's last access is more recent than thumbnail.
1490        let contents =
1491            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1492
1493        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1494        assert_eq!(contents[0], content, "file is not last access");
1495        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1496    }
1497
1498    #[async_test]
1499    async fn test_linked_chunk_new_items_chunk() {
1500        let store = get_event_cache_store().await.expect("creating cache store failed");
1501
1502        let room_id = &DEFAULT_TEST_ROOM_ID;
1503
1504        store
1505            .handle_linked_chunk_updates(
1506                room_id,
1507                vec![
1508                    Update::NewItemsChunk {
1509                        previous: None,
1510                        new: ChunkIdentifier::new(42),
1511                        next: None, // Note: the store must link the next entry itself.
1512                    },
1513                    Update::NewItemsChunk {
1514                        previous: Some(ChunkIdentifier::new(42)),
1515                        new: ChunkIdentifier::new(13),
1516                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1517                                                               * the next link ahead of time. */
1518                    },
1519                    Update::NewItemsChunk {
1520                        previous: Some(ChunkIdentifier::new(13)),
1521                        new: ChunkIdentifier::new(37),
1522                        next: None,
1523                    },
1524                ],
1525            )
1526            .await
1527            .unwrap();
1528
1529        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1530
1531        assert_eq!(chunks.len(), 3);
1532
1533        {
1534            // Chunks are ordered from smaller to bigger IDs.
1535            let c = chunks.remove(0);
1536            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1537            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1538            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1539            assert_matches!(c.content, ChunkContent::Items(events) => {
1540                assert!(events.is_empty());
1541            });
1542
1543            let c = chunks.remove(0);
1544            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1545            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1546            assert_eq!(c.next, None);
1547            assert_matches!(c.content, ChunkContent::Items(events) => {
1548                assert!(events.is_empty());
1549            });
1550
1551            let c = chunks.remove(0);
1552            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1553            assert_eq!(c.previous, None);
1554            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1555            assert_matches!(c.content, ChunkContent::Items(events) => {
1556                assert!(events.is_empty());
1557            });
1558        }
1559    }
1560
1561    #[async_test]
1562    async fn test_linked_chunk_new_gap_chunk() {
1563        let store = get_event_cache_store().await.expect("creating cache store failed");
1564
1565        let room_id = &DEFAULT_TEST_ROOM_ID;
1566
1567        store
1568            .handle_linked_chunk_updates(
1569                room_id,
1570                vec![Update::NewGapChunk {
1571                    previous: None,
1572                    new: ChunkIdentifier::new(42),
1573                    next: None,
1574                    gap: Gap { prev_token: "raclette".to_owned() },
1575                }],
1576            )
1577            .await
1578            .unwrap();
1579
1580        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1581
1582        assert_eq!(chunks.len(), 1);
1583
1584        // Chunks are ordered from smaller to bigger IDs.
1585        let c = chunks.remove(0);
1586        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1587        assert_eq!(c.previous, None);
1588        assert_eq!(c.next, None);
1589        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1590            assert_eq!(gap.prev_token, "raclette");
1591        });
1592    }
1593
1594    #[async_test]
1595    async fn test_linked_chunk_replace_item() {
1596        let store = get_event_cache_store().await.expect("creating cache store failed");
1597
1598        let room_id = &DEFAULT_TEST_ROOM_ID;
1599
1600        store
1601            .handle_linked_chunk_updates(
1602                room_id,
1603                vec![
1604                    Update::NewItemsChunk {
1605                        previous: None,
1606                        new: ChunkIdentifier::new(42),
1607                        next: None,
1608                    },
1609                    Update::PushItems {
1610                        at: Position::new(ChunkIdentifier::new(42), 0),
1611                        items: vec![
1612                            make_test_event(room_id, "hello"),
1613                            make_test_event(room_id, "world"),
1614                        ],
1615                    },
1616                    Update::ReplaceItem {
1617                        at: Position::new(ChunkIdentifier::new(42), 1),
1618                        item: make_test_event(room_id, "yolo"),
1619                    },
1620                ],
1621            )
1622            .await
1623            .unwrap();
1624
1625        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1626
1627        assert_eq!(chunks.len(), 1);
1628
1629        let c = chunks.remove(0);
1630        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1631        assert_eq!(c.previous, None);
1632        assert_eq!(c.next, None);
1633        assert_matches!(c.content, ChunkContent::Items(events) => {
1634            assert_eq!(events.len(), 2);
1635            check_test_event(&events[0], "hello");
1636            check_test_event(&events[1], "yolo");
1637        });
1638    }
1639
1640    #[async_test]
1641    async fn test_linked_chunk_remove_chunk() {
1642        let store = get_event_cache_store().await.expect("creating cache store failed");
1643
1644        let room_id = &DEFAULT_TEST_ROOM_ID;
1645
1646        store
1647            .handle_linked_chunk_updates(
1648                room_id,
1649                vec![
1650                    Update::NewGapChunk {
1651                        previous: None,
1652                        new: ChunkIdentifier::new(42),
1653                        next: None,
1654                        gap: Gap { prev_token: "raclette".to_owned() },
1655                    },
1656                    Update::NewGapChunk {
1657                        previous: Some(ChunkIdentifier::new(42)),
1658                        new: ChunkIdentifier::new(43),
1659                        next: None,
1660                        gap: Gap { prev_token: "fondue".to_owned() },
1661                    },
1662                    Update::NewGapChunk {
1663                        previous: Some(ChunkIdentifier::new(43)),
1664                        new: ChunkIdentifier::new(44),
1665                        next: None,
1666                        gap: Gap { prev_token: "tartiflette".to_owned() },
1667                    },
1668                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1669                ],
1670            )
1671            .await
1672            .unwrap();
1673
1674        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1675
1676        assert_eq!(chunks.len(), 2);
1677
1678        // Chunks are ordered from smaller to bigger IDs.
1679        let c = chunks.remove(0);
1680        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1681        assert_eq!(c.previous, None);
1682        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1683        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1684            assert_eq!(gap.prev_token, "raclette");
1685        });
1686
1687        let c = chunks.remove(0);
1688        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1689        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1690        assert_eq!(c.next, None);
1691        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1692            assert_eq!(gap.prev_token, "tartiflette");
1693        });
1694
1695        // Check that cascading worked. Yes, sqlite, I doubt you.
1696        let gaps = store
1697            .acquire()
1698            .await
1699            .unwrap()
1700            .with_transaction(|txn| -> rusqlite::Result<_> {
1701                let mut gaps = Vec::new();
1702                for data in txn
1703                    .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
1704                    .query_map((), |row| row.get::<_, u64>(0))?
1705                {
1706                    gaps.push(data?);
1707                }
1708                Ok(gaps)
1709            })
1710            .await
1711            .unwrap();
1712
1713        assert_eq!(gaps, vec![42, 44]);
1714    }
1715
1716    #[async_test]
1717    async fn test_linked_chunk_push_items() {
1718        let store = get_event_cache_store().await.expect("creating cache store failed");
1719
1720        let room_id = &DEFAULT_TEST_ROOM_ID;
1721
1722        store
1723            .handle_linked_chunk_updates(
1724                room_id,
1725                vec![
1726                    Update::NewItemsChunk {
1727                        previous: None,
1728                        new: ChunkIdentifier::new(42),
1729                        next: None,
1730                    },
1731                    Update::PushItems {
1732                        at: Position::new(ChunkIdentifier::new(42), 0),
1733                        items: vec![
1734                            make_test_event(room_id, "hello"),
1735                            make_test_event(room_id, "world"),
1736                        ],
1737                    },
1738                    Update::PushItems {
1739                        at: Position::new(ChunkIdentifier::new(42), 2),
1740                        items: vec![make_test_event(room_id, "who?")],
1741                    },
1742                ],
1743            )
1744            .await
1745            .unwrap();
1746
1747        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1748
1749        assert_eq!(chunks.len(), 1);
1750
1751        let c = chunks.remove(0);
1752        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1753        assert_eq!(c.previous, None);
1754        assert_eq!(c.next, None);
1755        assert_matches!(c.content, ChunkContent::Items(events) => {
1756            assert_eq!(events.len(), 3);
1757
1758            check_test_event(&events[0], "hello");
1759            check_test_event(&events[1], "world");
1760            check_test_event(&events[2], "who?");
1761        });
1762    }
1763
1764    #[async_test]
1765    async fn test_linked_chunk_remove_item() {
1766        let store = get_event_cache_store().await.expect("creating cache store failed");
1767
1768        let room_id = *DEFAULT_TEST_ROOM_ID;
1769
1770        store
1771            .handle_linked_chunk_updates(
1772                room_id,
1773                vec![
1774                    Update::NewItemsChunk {
1775                        previous: None,
1776                        new: ChunkIdentifier::new(42),
1777                        next: None,
1778                    },
1779                    Update::PushItems {
1780                        at: Position::new(ChunkIdentifier::new(42), 0),
1781                        items: vec![
1782                            make_test_event(room_id, "hello"),
1783                            make_test_event(room_id, "world"),
1784                        ],
1785                    },
1786                    Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1787                ],
1788            )
1789            .await
1790            .unwrap();
1791
1792        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1793
1794        assert_eq!(chunks.len(), 1);
1795
1796        let c = chunks.remove(0);
1797        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1798        assert_eq!(c.previous, None);
1799        assert_eq!(c.next, None);
1800        assert_matches!(c.content, ChunkContent::Items(events) => {
1801            assert_eq!(events.len(), 1);
1802            check_test_event(&events[0], "world");
1803        });
1804
1805        // Make sure the position has been updated for the remaining event.
1806        let num_rows: u64 = store
1807            .acquire()
1808            .await
1809            .unwrap()
1810            .with_transaction(move |txn| {
1811                txn.query_row(
1812                    "SELECT COUNT(*) FROM events WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1813                    (room_id.as_bytes(),),
1814                    |row| row.get(0),
1815                )
1816            })
1817            .await
1818            .unwrap();
1819        assert_eq!(num_rows, 1);
1820    }
1821
1822    #[async_test]
1823    async fn test_linked_chunk_detach_last_items() {
1824        let store = get_event_cache_store().await.expect("creating cache store failed");
1825
1826        let room_id = *DEFAULT_TEST_ROOM_ID;
1827
1828        store
1829            .handle_linked_chunk_updates(
1830                room_id,
1831                vec![
1832                    Update::NewItemsChunk {
1833                        previous: None,
1834                        new: ChunkIdentifier::new(42),
1835                        next: None,
1836                    },
1837                    Update::PushItems {
1838                        at: Position::new(ChunkIdentifier::new(42), 0),
1839                        items: vec![
1840                            make_test_event(room_id, "hello"),
1841                            make_test_event(room_id, "world"),
1842                            make_test_event(room_id, "howdy"),
1843                        ],
1844                    },
1845                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1846                ],
1847            )
1848            .await
1849            .unwrap();
1850
1851        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1852
1853        assert_eq!(chunks.len(), 1);
1854
1855        let c = chunks.remove(0);
1856        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1857        assert_eq!(c.previous, None);
1858        assert_eq!(c.next, None);
1859        assert_matches!(c.content, ChunkContent::Items(events) => {
1860            assert_eq!(events.len(), 1);
1861            check_test_event(&events[0], "hello");
1862        });
1863    }
1864
1865    #[async_test]
1866    async fn test_linked_chunk_start_end_reattach_items() {
1867        let store = get_event_cache_store().await.expect("creating cache store failed");
1868
1869        let room_id = *DEFAULT_TEST_ROOM_ID;
1870
1871        // Same updates and checks as test_linked_chunk_push_items, but with extra
1872        // `StartReattachItems` and `EndReattachItems` updates, which must have no
1873        // effects.
1874        store
1875            .handle_linked_chunk_updates(
1876                room_id,
1877                vec![
1878                    Update::NewItemsChunk {
1879                        previous: None,
1880                        new: ChunkIdentifier::new(42),
1881                        next: None,
1882                    },
1883                    Update::PushItems {
1884                        at: Position::new(ChunkIdentifier::new(42), 0),
1885                        items: vec![
1886                            make_test_event(room_id, "hello"),
1887                            make_test_event(room_id, "world"),
1888                            make_test_event(room_id, "howdy"),
1889                        ],
1890                    },
1891                    Update::StartReattachItems,
1892                    Update::EndReattachItems,
1893                ],
1894            )
1895            .await
1896            .unwrap();
1897
1898        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1899
1900        assert_eq!(chunks.len(), 1);
1901
1902        let c = chunks.remove(0);
1903        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1904        assert_eq!(c.previous, None);
1905        assert_eq!(c.next, None);
1906        assert_matches!(c.content, ChunkContent::Items(events) => {
1907            assert_eq!(events.len(), 3);
1908            check_test_event(&events[0], "hello");
1909            check_test_event(&events[1], "world");
1910            check_test_event(&events[2], "howdy");
1911        });
1912    }
1913
1914    #[async_test]
1915    async fn test_linked_chunk_clear() {
1916        let store = get_event_cache_store().await.expect("creating cache store failed");
1917
1918        let room_id = *DEFAULT_TEST_ROOM_ID;
1919        let event_0 = make_test_event(room_id, "hello");
1920        let event_1 = make_test_event(room_id, "world");
1921        let event_2 = make_test_event(room_id, "howdy");
1922
1923        store
1924            .handle_linked_chunk_updates(
1925                room_id,
1926                vec![
1927                    Update::NewItemsChunk {
1928                        previous: None,
1929                        new: ChunkIdentifier::new(42),
1930                        next: None,
1931                    },
1932                    Update::NewGapChunk {
1933                        previous: Some(ChunkIdentifier::new(42)),
1934                        new: ChunkIdentifier::new(54),
1935                        next: None,
1936                        gap: Gap { prev_token: "fondue".to_owned() },
1937                    },
1938                    Update::PushItems {
1939                        at: Position::new(ChunkIdentifier::new(42), 0),
1940                        items: vec![event_0.clone(), event_1, event_2],
1941                    },
1942                    Update::Clear,
1943                ],
1944            )
1945            .await
1946            .unwrap();
1947
1948        let chunks = store.load_all_chunks(room_id).await.unwrap();
1949        assert!(chunks.is_empty());
1950
1951        // Check that cascading worked. Yes, sqlite, I doubt you.
1952        store
1953            .acquire()
1954            .await
1955            .unwrap()
1956            .with_transaction(|txn| -> rusqlite::Result<_> {
1957                let num_gaps = txn
1958                    .prepare("SELECT COUNT(chunk_id) FROM gaps ORDER BY chunk_id")?
1959                    .query_row((), |row| row.get::<_, u64>(0))?;
1960                assert_eq!(num_gaps, 0);
1961
1962                let num_events = txn
1963                    .prepare("SELECT COUNT(event_id) FROM events ORDER BY chunk_id")?
1964                    .query_row((), |row| row.get::<_, u64>(0))?;
1965                assert_eq!(num_events, 0);
1966
1967                Ok(())
1968            })
1969            .await
1970            .unwrap();
1971
1972        // It's okay to re-insert a past event.
1973        store
1974            .handle_linked_chunk_updates(
1975                room_id,
1976                vec![
1977                    Update::NewItemsChunk {
1978                        previous: None,
1979                        new: ChunkIdentifier::new(42),
1980                        next: None,
1981                    },
1982                    Update::PushItems {
1983                        at: Position::new(ChunkIdentifier::new(42), 0),
1984                        items: vec![event_0],
1985                    },
1986                ],
1987            )
1988            .await
1989            .unwrap();
1990    }
1991
1992    #[async_test]
1993    async fn test_linked_chunk_multiple_rooms() {
1994        let store = get_event_cache_store().await.expect("creating cache store failed");
1995
1996        let room1 = room_id!("!realcheeselovers:raclette.fr");
1997        let room2 = room_id!("!realcheeselovers:fondue.ch");
1998
1999        // Check that applying updates to one room doesn't affect the others.
2000        // Use the same chunk identifier in both rooms to battle-test search.
2001
2002        store
2003            .handle_linked_chunk_updates(
2004                room1,
2005                vec![
2006                    Update::NewItemsChunk {
2007                        previous: None,
2008                        new: ChunkIdentifier::new(42),
2009                        next: None,
2010                    },
2011                    Update::PushItems {
2012                        at: Position::new(ChunkIdentifier::new(42), 0),
2013                        items: vec![
2014                            make_test_event(room1, "best cheese is raclette"),
2015                            make_test_event(room1, "obviously"),
2016                        ],
2017                    },
2018                ],
2019            )
2020            .await
2021            .unwrap();
2022
2023        store
2024            .handle_linked_chunk_updates(
2025                room2,
2026                vec![
2027                    Update::NewItemsChunk {
2028                        previous: None,
2029                        new: ChunkIdentifier::new(42),
2030                        next: None,
2031                    },
2032                    Update::PushItems {
2033                        at: Position::new(ChunkIdentifier::new(42), 0),
2034                        items: vec![make_test_event(room1, "beaufort is the best")],
2035                    },
2036                ],
2037            )
2038            .await
2039            .unwrap();
2040
2041        // Check chunks from room 1.
2042        let mut chunks_room1 = store.load_all_chunks(room1).await.unwrap();
2043        assert_eq!(chunks_room1.len(), 1);
2044
2045        let c = chunks_room1.remove(0);
2046        assert_matches!(c.content, ChunkContent::Items(events) => {
2047            assert_eq!(events.len(), 2);
2048            check_test_event(&events[0], "best cheese is raclette");
2049            check_test_event(&events[1], "obviously");
2050        });
2051
2052        // Check chunks from room 2.
2053        let mut chunks_room2 = store.load_all_chunks(room2).await.unwrap();
2054        assert_eq!(chunks_room2.len(), 1);
2055
2056        let c = chunks_room2.remove(0);
2057        assert_matches!(c.content, ChunkContent::Items(events) => {
2058            assert_eq!(events.len(), 1);
2059            check_test_event(&events[0], "beaufort is the best");
2060        });
2061    }
2062
2063    #[async_test]
2064    async fn test_linked_chunk_update_is_a_transaction() {
2065        let store = get_event_cache_store().await.expect("creating cache store failed");
2066
2067        let room_id = *DEFAULT_TEST_ROOM_ID;
2068
2069        // Trigger a violation of the unique constraint on the (room id, chunk id)
2070        // couple.
2071        let err = store
2072            .handle_linked_chunk_updates(
2073                room_id,
2074                vec![
2075                    Update::NewItemsChunk {
2076                        previous: None,
2077                        new: ChunkIdentifier::new(42),
2078                        next: None,
2079                    },
2080                    Update::NewItemsChunk {
2081                        previous: None,
2082                        new: ChunkIdentifier::new(42),
2083                        next: None,
2084                    },
2085                ],
2086            )
2087            .await
2088            .unwrap_err();
2089
2090        // The operation fails with a constraint violation error.
2091        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2092            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2093        });
2094
2095        // If the updates have been handled transactionally, then no new chunks should
2096        // have been added; failure of the second update leads to the first one being
2097        // rolled back.
2098        let chunks = store.load_all_chunks(room_id).await.unwrap();
2099        assert!(chunks.is_empty());
2100    }
2101
2102    #[async_test]
2103    async fn test_filter_duplicate_events_no_events() {
2104        let store = get_event_cache_store().await.expect("creating cache store failed");
2105
2106        let room_id = *DEFAULT_TEST_ROOM_ID;
2107        let duplicates = store.filter_duplicated_events(room_id, Vec::new()).await.unwrap();
2108        assert!(duplicates.is_empty());
2109    }
2110
2111    #[async_test]
2112    async fn test_load_last_chunk() {
2113        let room_id = room_id!("!r0:matrix.org");
2114        let event = |msg: &str| make_test_event(room_id, msg);
2115        let store = get_event_cache_store().await.expect("creating cache store failed");
2116
2117        // Case #1: no last chunk.
2118        {
2119            let (last_chunk, chunk_identifier_generator) =
2120                store.load_last_chunk(room_id).await.unwrap();
2121
2122            assert!(last_chunk.is_none());
2123            assert_eq!(chunk_identifier_generator.current(), 0);
2124        }
2125
2126        // Case #2: only one chunk is present.
2127        {
2128            store
2129                .handle_linked_chunk_updates(
2130                    room_id,
2131                    vec![
2132                        Update::NewItemsChunk {
2133                            previous: None,
2134                            new: ChunkIdentifier::new(42),
2135                            next: None,
2136                        },
2137                        Update::PushItems {
2138                            at: Position::new(ChunkIdentifier::new(42), 0),
2139                            items: vec![event("saucisse de morteau"), event("comté")],
2140                        },
2141                    ],
2142                )
2143                .await
2144                .unwrap();
2145
2146            let (last_chunk, chunk_identifier_generator) =
2147                store.load_last_chunk(room_id).await.unwrap();
2148
2149            assert_matches!(last_chunk, Some(last_chunk) => {
2150                assert_eq!(last_chunk.identifier, 42);
2151                assert!(last_chunk.previous.is_none());
2152                assert!(last_chunk.next.is_none());
2153                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2154                    assert_eq!(items.len(), 2);
2155                    check_test_event(&items[0], "saucisse de morteau");
2156                    check_test_event(&items[1], "comté");
2157                });
2158            });
2159            assert_eq!(chunk_identifier_generator.current(), 42);
2160        }
2161
2162        // Case #3: more chunks are present.
2163        {
2164            store
2165                .handle_linked_chunk_updates(
2166                    room_id,
2167                    vec![
2168                        Update::NewItemsChunk {
2169                            previous: Some(ChunkIdentifier::new(42)),
2170                            new: ChunkIdentifier::new(7),
2171                            next: None,
2172                        },
2173                        Update::PushItems {
2174                            at: Position::new(ChunkIdentifier::new(7), 0),
2175                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2176                        },
2177                    ],
2178                )
2179                .await
2180                .unwrap();
2181
2182            let (last_chunk, chunk_identifier_generator) =
2183                store.load_last_chunk(room_id).await.unwrap();
2184
2185            assert_matches!(last_chunk, Some(last_chunk) => {
2186                assert_eq!(last_chunk.identifier, 7);
2187                assert_matches!(last_chunk.previous, Some(previous) => {
2188                    assert_eq!(previous, 42);
2189                });
2190                assert!(last_chunk.next.is_none());
2191                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2192                    assert_eq!(items.len(), 3);
2193                    check_test_event(&items[0], "fondue");
2194                    check_test_event(&items[1], "gruyère");
2195                    check_test_event(&items[2], "mont d'or");
2196                });
2197            });
2198            assert_eq!(chunk_identifier_generator.current(), 42);
2199        }
2200    }
2201
2202    #[async_test]
2203    async fn test_load_last_chunk_with_a_cycle() {
2204        let room_id = room_id!("!r0:matrix.org");
2205        let store = get_event_cache_store().await.expect("creating cache store failed");
2206
2207        store
2208            .handle_linked_chunk_updates(
2209                room_id,
2210                vec![
2211                    Update::NewItemsChunk {
2212                        previous: None,
2213                        new: ChunkIdentifier::new(0),
2214                        next: None,
2215                    },
2216                    Update::NewItemsChunk {
2217                        // Because `previous` connects to chunk #0, it will create a cycle.
2218                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2219                        // **does not exist**. We have to detect this cycle.
2220                        previous: Some(ChunkIdentifier::new(0)),
2221                        new: ChunkIdentifier::new(1),
2222                        next: Some(ChunkIdentifier::new(0)),
2223                    },
2224                ],
2225            )
2226            .await
2227            .unwrap();
2228
2229        store.load_last_chunk(room_id).await.unwrap_err();
2230    }
2231
2232    #[async_test]
2233    async fn test_load_previous_chunk() {
2234        let room_id = room_id!("!r0:matrix.org");
2235        let event = |msg: &str| make_test_event(room_id, msg);
2236        let store = get_event_cache_store().await.expect("creating cache store failed");
2237
2238        // Case #1: no chunk at all, equivalent to having an nonexistent
2239        // `before_chunk_identifier`.
2240        {
2241            let previous_chunk =
2242                store.load_previous_chunk(room_id, ChunkIdentifier::new(153)).await.unwrap();
2243
2244            assert!(previous_chunk.is_none());
2245        }
2246
2247        // Case #2: there is one chunk only: we request the previous on this
2248        // one, it doesn't exist.
2249        {
2250            store
2251                .handle_linked_chunk_updates(
2252                    room_id,
2253                    vec![Update::NewItemsChunk {
2254                        previous: None,
2255                        new: ChunkIdentifier::new(42),
2256                        next: None,
2257                    }],
2258                )
2259                .await
2260                .unwrap();
2261
2262            let previous_chunk =
2263                store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2264
2265            assert!(previous_chunk.is_none());
2266        }
2267
2268        // Case #3: there are two chunks.
2269        {
2270            store
2271                .handle_linked_chunk_updates(
2272                    room_id,
2273                    vec![
2274                        // new chunk before the one that exists.
2275                        Update::NewItemsChunk {
2276                            previous: None,
2277                            new: ChunkIdentifier::new(7),
2278                            next: Some(ChunkIdentifier::new(42)),
2279                        },
2280                        Update::PushItems {
2281                            at: Position::new(ChunkIdentifier::new(7), 0),
2282                            items: vec![event("brigand du jorat"), event("morbier")],
2283                        },
2284                    ],
2285                )
2286                .await
2287                .unwrap();
2288
2289            let previous_chunk =
2290                store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2291
2292            assert_matches!(previous_chunk, Some(previous_chunk) => {
2293                assert_eq!(previous_chunk.identifier, 7);
2294                assert!(previous_chunk.previous.is_none());
2295                assert_matches!(previous_chunk.next, Some(next) => {
2296                    assert_eq!(next, 42);
2297                });
2298                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2299                    assert_eq!(items.len(), 2);
2300                    check_test_event(&items[0], "brigand du jorat");
2301                    check_test_event(&items[1], "morbier");
2302                });
2303            });
2304        }
2305    }
2306}
2307
2308#[cfg(test)]
2309mod encrypted_tests {
2310    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2311
2312    use matrix_sdk_base::{
2313        event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2314        event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2315    };
2316    use once_cell::sync::Lazy;
2317    use tempfile::{tempdir, TempDir};
2318
2319    use super::SqliteEventCacheStore;
2320
2321    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2322    static NUM: AtomicU32 = AtomicU32::new(0);
2323
2324    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2325        let name = NUM.fetch_add(1, SeqCst).to_string();
2326        let tmpdir_path = TMP_DIR.path().join(name);
2327
2328        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2329
2330        Ok(SqliteEventCacheStore::open(
2331            tmpdir_path.to_str().unwrap(),
2332            Some("default_test_password"),
2333        )
2334        .await
2335        .unwrap())
2336    }
2337
2338    event_cache_store_integration_tests!();
2339    event_cache_store_integration_tests_time!();
2340    event_cache_store_media_integration_tests!();
2341}