matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A sqlite-based backend for the [`EventCacheStore`].
16
17use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    event_cache::{
23        store::{
24            media::{
25                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
26                MediaService,
27            },
28            EventCacheStore,
29        },
30        Event, Gap,
31    },
32    linked_chunk::{ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, RawChunk, Update},
33    media::{MediaRequestParameters, UniqueKey},
34};
35use matrix_sdk_store_encryption::StoreCipher;
36use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
37use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
38use tokio::fs;
39use tracing::{debug, trace};
40
41use crate::{
42    error::{Error, Result},
43    utils::{
44        repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
45        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
46    },
47    OpenStoreError,
48};
49
50mod keys {
51    // Entries in Key-value store
52    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
53    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
54
55    // Tables
56    pub const LINKED_CHUNKS: &str = "linked_chunks";
57    pub const MEDIA: &str = "media";
58}
59
60/// Identifier of the latest database version.
61///
62/// This is used to figure whether the SQLite database requires a migration.
63/// Every new SQL migration should imply a bump of this number, and changes in
64/// the [`run_migrations`] function.
65const DATABASE_VERSION: u8 = 5;
66
67/// The string used to identify a chunk of type events, in the `type` field in
68/// the database.
69const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
70/// The string used to identify a chunk of type gap, in the `type` field in the
71/// database.
72const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
73
74/// A SQLite-based event cache store.
75#[derive(Clone)]
76pub struct SqliteEventCacheStore {
77    store_cipher: Option<Arc<StoreCipher>>,
78    pool: SqlitePool,
79    media_service: MediaService,
80}
81
82#[cfg(not(tarpaulin_include))]
83impl fmt::Debug for SqliteEventCacheStore {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
86    }
87}
88
89impl SqliteEventCacheStore {
90    /// Open the SQLite-based event cache store at the given path using the
91    /// given passphrase to encrypt private data.
92    pub async fn open(
93        path: impl AsRef<Path>,
94        passphrase: Option<&str>,
95    ) -> Result<Self, OpenStoreError> {
96        let pool = create_pool(path.as_ref()).await?;
97
98        Self::open_with_pool(pool, passphrase).await
99    }
100
101    /// Open an SQLite-based event cache store using the given SQLite database
102    /// pool. The given passphrase will be used to encrypt private data.
103    pub async fn open_with_pool(
104        pool: SqlitePool,
105        passphrase: Option<&str>,
106    ) -> Result<Self, OpenStoreError> {
107        let conn = pool.get().await?;
108        conn.set_journal_size_limit().await?;
109
110        let version = conn.db_version().await?;
111        run_migrations(&conn, version).await?;
112        conn.optimize().await?;
113
114        let store_cipher = match passphrase {
115            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
116            None => None,
117        };
118
119        let media_service = MediaService::new();
120        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
121        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
122        media_service.restore(media_retention_policy, last_media_cleanup_time);
123
124        Ok(Self { store_cipher, pool, media_service })
125    }
126
127    fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
128        if let Some(key) = &self.store_cipher {
129            let encrypted = key.encrypt_value_data(value)?;
130            Ok(rmp_serde::to_vec_named(&encrypted)?)
131        } else {
132            Ok(value)
133        }
134    }
135
136    fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
137        if let Some(key) = &self.store_cipher {
138            let encrypted = rmp_serde::from_slice(value)?;
139            let decrypted = key.decrypt_value_data(encrypted)?;
140            Ok(Cow::Owned(decrypted))
141        } else {
142            Ok(Cow::Borrowed(value))
143        }
144    }
145
146    fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
147        let bytes = key.as_ref();
148        if let Some(store_cipher) = &self.store_cipher {
149            Key::Hashed(store_cipher.hash_key(table_name, bytes))
150        } else {
151            Key::Plain(bytes.to_owned())
152        }
153    }
154
155    async fn acquire(&self) -> Result<SqliteAsyncConn> {
156        Ok(self.pool.get().await?)
157    }
158
159    fn map_row_to_chunk(
160        row: &rusqlite::Row<'_>,
161    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
162        Ok((
163            row.get::<_, u64>(0)?,
164            row.get::<_, Option<u64>>(1)?,
165            row.get::<_, Option<u64>>(2)?,
166            row.get::<_, String>(3)?,
167        ))
168    }
169}
170
171trait TransactionExtForLinkedChunks {
172    fn rebuild_chunk(
173        &self,
174        store: &SqliteEventCacheStore,
175        room_id: &Key,
176        previous: Option<u64>,
177        index: u64,
178        next: Option<u64>,
179        chunk_type: &str,
180    ) -> Result<RawChunk<Event, Gap>>;
181
182    fn load_gap_content(
183        &self,
184        store: &SqliteEventCacheStore,
185        room_id: &Key,
186        chunk_id: ChunkIdentifier,
187    ) -> Result<Gap>;
188
189    fn load_events_content(
190        &self,
191        store: &SqliteEventCacheStore,
192        room_id: &Key,
193        chunk_id: ChunkIdentifier,
194    ) -> Result<Vec<Event>>;
195}
196
197impl TransactionExtForLinkedChunks for Transaction<'_> {
198    fn rebuild_chunk(
199        &self,
200        store: &SqliteEventCacheStore,
201        room_id: &Key,
202        previous: Option<u64>,
203        id: u64,
204        next: Option<u64>,
205        chunk_type: &str,
206    ) -> Result<RawChunk<Event, Gap>> {
207        let previous = previous.map(ChunkIdentifier::new);
208        let next = next.map(ChunkIdentifier::new);
209        let id = ChunkIdentifier::new(id);
210
211        match chunk_type {
212            CHUNK_TYPE_GAP_TYPE_STRING => {
213                // It's a gap!
214                let gap = self.load_gap_content(store, room_id, id)?;
215                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
216            }
217
218            CHUNK_TYPE_EVENT_TYPE_STRING => {
219                // It's events!
220                let events = self.load_events_content(store, room_id, id)?;
221                Ok(RawChunk {
222                    content: ChunkContent::Items(events),
223                    previous,
224                    identifier: id,
225                    next,
226                })
227            }
228
229            other => {
230                // It's an error!
231                Err(Error::InvalidData {
232                    details: format!("a linked chunk has an unknown type {other}"),
233                })
234            }
235        }
236    }
237
238    fn load_gap_content(
239        &self,
240        store: &SqliteEventCacheStore,
241        room_id: &Key,
242        chunk_id: ChunkIdentifier,
243    ) -> Result<Gap> {
244        // There's at most one row for it in the database, so a call to `query_row` is
245        // sufficient.
246        let encoded_prev_token: Vec<u8> = self.query_row(
247            "SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
248            (chunk_id.index(), &room_id),
249            |row| row.get(0),
250        )?;
251        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
252        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
253        Ok(Gap { prev_token })
254    }
255
256    fn load_events_content(
257        &self,
258        store: &SqliteEventCacheStore,
259        room_id: &Key,
260        chunk_id: ChunkIdentifier,
261    ) -> Result<Vec<Event>> {
262        // Retrieve all the events from the database.
263        let mut events = Vec::new();
264
265        for event_data in self
266            .prepare(
267                r#"
268                    SELECT content FROM events
269                    WHERE chunk_id = ? AND room_id = ?
270                    ORDER BY position ASC
271                "#,
272            )?
273            .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
274        {
275            let encoded_content = event_data?;
276            let serialized_content = store.decode_value(&encoded_content)?;
277            let event = serde_json::from_slice(&serialized_content)?;
278
279            events.push(event);
280        }
281
282        Ok(events)
283    }
284}
285
286async fn create_pool(path: &Path) -> Result<SqlitePool, OpenStoreError> {
287    fs::create_dir_all(path).await.map_err(OpenStoreError::CreateDir)?;
288    let cfg = deadpool_sqlite::Config::new(path.join("matrix-sdk-event-cache.sqlite3"));
289    Ok(cfg.create_pool(Runtime::Tokio1)?)
290}
291
292/// Run migrations for the given version of the database.
293async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
294    if version == 0 {
295        debug!("Creating database");
296    } else if version < DATABASE_VERSION {
297        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
298    } else {
299        return Ok(());
300    }
301
302    if version < 1 {
303        // First turn on WAL mode, this can't be done in the transaction, it fails with
304        // the error message: "cannot change into wal mode from within a transaction".
305        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
306        conn.with_transaction(|txn| {
307            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
308            txn.set_db_version(1)
309        })
310        .await?;
311    }
312
313    if version < 2 {
314        conn.with_transaction(|txn| {
315            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
316            txn.set_db_version(2)
317        })
318        .await?;
319    }
320
321    if version < 3 {
322        // Enable foreign keys for this database.
323        conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
324
325        conn.with_transaction(|txn| {
326            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
327            txn.set_db_version(3)
328        })
329        .await?;
330    }
331
332    if version < 4 {
333        conn.with_transaction(|txn| {
334            txn.execute_batch(include_str!(
335                "../migrations/event_cache_store/004_ignore_policy.sql"
336            ))?;
337            txn.set_db_version(4)
338        })
339        .await?;
340    }
341
342    if version < 5 {
343        conn.with_transaction(|txn| {
344            txn.execute_batch(include_str!(
345                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
346            ))?;
347            txn.set_db_version(5)
348        })
349        .await?;
350    }
351
352    Ok(())
353}
354
355#[async_trait]
356impl EventCacheStore for SqliteEventCacheStore {
357    type Error = Error;
358
359    async fn try_take_leased_lock(
360        &self,
361        lease_duration_ms: u32,
362        key: &str,
363        holder: &str,
364    ) -> Result<bool> {
365        let key = key.to_owned();
366        let holder = holder.to_owned();
367
368        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
369        let expiration = now + lease_duration_ms as u64;
370
371        let num_touched = self
372            .acquire()
373            .await?
374            .with_transaction(move |txn| {
375                txn.execute(
376                    "INSERT INTO lease_locks (key, holder, expiration)
377                    VALUES (?1, ?2, ?3)
378                    ON CONFLICT (key)
379                    DO
380                        UPDATE SET holder = ?2, expiration = ?3
381                        WHERE holder = ?2
382                        OR expiration < ?4
383                ",
384                    (key, holder, expiration, now),
385                )
386            })
387            .await?;
388
389        Ok(num_touched == 1)
390    }
391
392    async fn handle_linked_chunk_updates(
393        &self,
394        room_id: &RoomId,
395        updates: Vec<Update<Event, Gap>>,
396    ) -> Result<(), Self::Error> {
397        // Use a single transaction throughout this function, so that either all updates
398        // work, or none is taken into account.
399        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
400        let room_id = room_id.to_owned();
401        let this = self.clone();
402
403        with_immediate_transaction(self.acquire().await?, move |txn| {
404            for up in updates {
405                match up {
406                    Update::NewItemsChunk { previous, new, next } => {
407                        let previous = previous.as_ref().map(ChunkIdentifier::index);
408                        let new = new.index();
409                        let next = next.as_ref().map(ChunkIdentifier::index);
410
411                        trace!(
412                            %room_id,
413                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
414                        );
415
416                        insert_chunk(
417                            txn,
418                            &hashed_room_id,
419                            previous,
420                            new,
421                            next,
422                            CHUNK_TYPE_EVENT_TYPE_STRING,
423                        )?;
424                    }
425
426                    Update::NewGapChunk { previous, new, next, gap } => {
427                        let serialized = serde_json::to_vec(&gap.prev_token)?;
428                        let prev_token = this.encode_value(serialized)?;
429
430                        let previous = previous.as_ref().map(ChunkIdentifier::index);
431                        let new = new.index();
432                        let next = next.as_ref().map(ChunkIdentifier::index);
433
434                        trace!(
435                            %room_id,
436                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
437                        );
438
439                        // Insert the chunk as a gap.
440                        insert_chunk(
441                            txn,
442                            &hashed_room_id,
443                            previous,
444                            new,
445                            next,
446                            CHUNK_TYPE_GAP_TYPE_STRING,
447                        )?;
448
449                        // Insert the gap's value.
450                        txn.execute(
451                            r#"
452                            INSERT INTO gaps(chunk_id, room_id, prev_token)
453                            VALUES (?, ?, ?)
454                        "#,
455                            (new, &hashed_room_id, prev_token),
456                        )?;
457                    }
458
459                    Update::RemoveChunk(chunk_identifier) => {
460                        let chunk_id = chunk_identifier.index();
461
462                        trace!(%room_id, "removing chunk @ {chunk_id}");
463
464                        // Find chunk to delete.
465                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
466                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
467                            (chunk_id, &hashed_room_id),
468                            |row| Ok((row.get(0)?, row.get(1)?))
469                        )?;
470
471                        // Replace its previous' next to its own next.
472                        if let Some(previous) = previous {
473                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
474                        }
475
476                        // Replace its next' previous to its own previous.
477                        if let Some(next) = next {
478                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
479                        }
480
481                        // Now delete it, and let cascading delete corresponding entries in the
482                        // other data tables.
483                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
484                    }
485
486                    Update::PushItems { at, items } => {
487                        let chunk_id = at.chunk_identifier().index();
488
489                        trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
490
491                        for (i, event) in items.into_iter().enumerate() {
492                            let serialized = serde_json::to_vec(&event)?;
493                            let content = this.encode_value(serialized)?;
494
495                            let event_id = event.event_id().map(|event_id| event_id.to_string());
496                            let index = at.index() + i;
497
498                            txn.execute(
499                                r#"
500                                INSERT INTO events(chunk_id, room_id, event_id, content, position)
501                                VALUES (?, ?, ?, ?, ?)
502                            "#,
503                                (chunk_id, &hashed_room_id, event_id, content, index),
504                            )?;
505                        }
506                    }
507
508                    Update::ReplaceItem { at, item: event } => {
509                        let chunk_id = at.chunk_identifier().index();
510                        let index = at.index();
511
512                        trace!(%room_id, "replacing item @ {chunk_id}:{index}");
513
514                        let serialized = serde_json::to_vec(&event)?;
515                        let content = this.encode_value(serialized)?;
516
517                        // The event id should be the same, but just in case it changed…
518                        let event_id = event.event_id().map(|event_id| event_id.to_string());
519
520                        txn.execute(
521                            r#"
522                            UPDATE events
523                            SET content = ?, event_id = ?
524                            WHERE room_id = ? AND chunk_id = ? AND position = ?
525                        "#,
526                            (content, event_id, &hashed_room_id, chunk_id, index,)
527                        )?;
528                    }
529
530                    Update::RemoveItem { at } => {
531                        let chunk_id = at.chunk_identifier().index();
532                        let index = at.index();
533
534                        trace!(%room_id, "removing item @ {chunk_id}:{index}");
535
536                        // Remove the entry.
537                        txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
538
539                        // Decrement the index of each item after the one we're going to remove.
540                        txn.execute(
541                            r#"
542                                UPDATE events
543                                SET position = position - 1
544                                WHERE room_id = ? AND chunk_id = ? AND position > ?
545                            "#,
546                            (&hashed_room_id, chunk_id, index)
547                        )?;
548
549                    }
550
551                    Update::DetachLastItems { at } => {
552                        let chunk_id = at.chunk_identifier().index();
553                        let index = at.index();
554
555                        trace!(%room_id, "truncating items >= {chunk_id}:{index}");
556
557                        // Remove these entries.
558                        txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
559                    }
560
561                    Update::Clear => {
562                        trace!(%room_id, "clearing items");
563
564                        // Remove chunks, and let cascading do its job.
565                        txn.execute(
566                            "DELETE FROM linked_chunks WHERE room_id = ?",
567                            (&hashed_room_id,),
568                        )?;
569                    }
570
571                    Update::StartReattachItems | Update::EndReattachItems => {
572                        // Nothing.
573                    }
574                }
575            }
576
577            Ok(())
578        })
579        .await?;
580
581        Ok(())
582    }
583
584    async fn load_all_chunks(
585        &self,
586        room_id: &RoomId,
587    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
588        let room_id = room_id.to_owned();
589        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
590
591        let this = self.clone();
592
593        let result = self
594            .acquire()
595            .await?
596            .with_transaction(move |txn| -> Result<_> {
597                let mut items = Vec::new();
598
599                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
600                for data in txn
601                    .prepare(
602                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
603                    )?
604                    .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
605                {
606                    let (id, previous, next, chunk_type) = data?;
607                    let new = txn.rebuild_chunk(
608                        &this,
609                        &hashed_room_id,
610                        previous,
611                        id,
612                        next,
613                        chunk_type.as_str(),
614                    )?;
615                    items.push(new);
616                }
617
618                Ok(items)
619            })
620            .await?;
621
622        Ok(result)
623    }
624
625    async fn load_last_chunk(
626        &self,
627        room_id: &RoomId,
628    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
629        let room_id = room_id.to_owned();
630        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
631
632        let this = self.clone();
633
634        self
635            .acquire()
636            .await?
637            .with_transaction(move |txn| -> Result<_> {
638                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
639                let (chunk_identifier_generator, number_of_chunks) = txn
640                    .prepare(
641                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE room_id = ?"
642                    )?
643                    .query_row(
644                        (&hashed_room_id,),
645                        |row| {
646                            Ok((
647                                // Read the `MAX(id)` as an `Option<u64>` instead
648                                // of `u64` in case the `SELECT` returns nothing.
649                                // Indeed, if it returns no line, the `MAX(id)` is
650                                // set to `Null`.
651                                row.get::<_, Option<u64>>(0)?,
652                                row.get::<_, u64>(1)?,
653                            ))
654                        }
655                    )?;
656
657                let chunk_identifier_generator = match chunk_identifier_generator {
658                    Some(last_chunk_identifier) => {
659                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
660                            ChunkIdentifier::new(last_chunk_identifier)
661                        )
662                    },
663                    None => ChunkIdentifierGenerator::new_from_scratch(),
664                };
665
666                // Find the last chunk.
667                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
668                    .prepare(
669                        "SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
670                    )?
671                    .query_row(
672                        (&hashed_room_id,),
673                        |row| {
674                            Ok((
675                                row.get::<_, u64>(0)?,
676                                row.get::<_, Option<u64>>(1)?,
677                                row.get::<_, String>(2)?,
678                            ))
679                        }
680                    )
681                    .optional()?
682                else {
683                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
684                    // good.
685                    if number_of_chunks == 0 {
686                        return Ok((None, chunk_identifier_generator));
687                    }
688                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
689                    // linked chunk is malformed.
690                    //
691                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
692                    else {
693                        return Err(Error::InvalidData {
694                            details:
695                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
696                                    .to_owned()
697                            }
698                        )
699                    }
700                };
701
702                // Build the chunk.
703                let last_chunk = txn.rebuild_chunk(
704                    &this,
705                    &hashed_room_id,
706                    previous_chunk,
707                    chunk_identifier,
708                    None,
709                    &chunk_type
710                )?;
711
712                Ok((Some(last_chunk), chunk_identifier_generator))
713            })
714            .await
715    }
716
717    async fn load_previous_chunk(
718        &self,
719        room_id: &RoomId,
720        before_chunk_identifier: ChunkIdentifier,
721    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
722        let room_id = room_id.to_owned();
723        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
724
725        let this = self.clone();
726
727        self
728            .acquire()
729            .await?
730            .with_transaction(move |txn| -> Result<_> {
731                // Find the chunk before the chunk identified by `before_chunk_identifier`.
732                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
733                    .prepare(
734                        "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
735                    )?
736                    .query_row(
737                        (&hashed_room_id, before_chunk_identifier.index()),
738                        |row| {
739                            Ok((
740                                row.get::<_, u64>(0)?,
741                                row.get::<_, Option<u64>>(1)?,
742                                row.get::<_, Option<u64>>(2)?,
743                                row.get::<_, String>(3)?,
744                            ))
745                        }
746                    )
747                    .optional()?
748                else {
749                    // Chunk is not found.
750                    return Ok(None);
751                };
752
753                // Build the chunk.
754                let last_chunk = txn.rebuild_chunk(
755                    &this,
756                    &hashed_room_id,
757                    previous_chunk,
758                    chunk_identifier,
759                    next_chunk,
760                    &chunk_type
761                )?;
762
763                Ok(Some(last_chunk))
764            })
765            .await
766    }
767
768    async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
769        self.acquire()
770            .await?
771            .with_transaction(move |txn| {
772                // Remove all the chunks, and let cascading do its job.
773                txn.execute("DELETE FROM linked_chunks", ())
774            })
775            .await?;
776        Ok(())
777    }
778
779    async fn filter_duplicated_events(
780        &self,
781        room_id: &RoomId,
782        events: Vec<OwnedEventId>,
783    ) -> Result<Vec<OwnedEventId>, Self::Error> {
784        // If there's no events for which we want to check duplicates, we can return
785        // early. It's not only an optimization to do so: it's required, otherwise the
786        // `repeat_vars` call below will panic.
787        if events.is_empty() {
788            return Ok(Vec::new());
789        }
790
791        // Select all events that exist in the store, i.e. the duplicates.
792        let room_id = room_id.to_owned();
793        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
794
795        self.acquire()
796            .await?
797            .with_transaction(move |txn| -> Result<_> {
798                txn.chunk_large_query_over(events, None, move |txn, events| {
799                    let query = format!(
800                        "SELECT event_id FROM events WHERE room_id = ? AND event_id IN ({}) ORDER BY chunk_id ASC, position ASC",
801                        repeat_vars(events.len()),
802                    );
803                    let parameters = params_from_iter(
804                        // parameter for `room_id = ?`
805                        once(
806                            hashed_room_id
807                                .to_sql()
808                                // SAFETY: it cannot fail since `Key::to_sql` never fails
809                                .unwrap(),
810                        )
811                        // parameters for `event_id IN (…)`
812                        .chain(events.iter().map(|event| {
813                            event
814                                .as_str()
815                                .to_sql()
816                                // SAFETY: it cannot fail since `str::to_sql` never fails
817                                .unwrap()
818                        })),
819                    );
820
821                    let mut duplicated_events = Vec::new();
822
823                    for duplicated_event in txn
824                        .prepare(&query)?
825                        .query_map(parameters, |row| row.get::<_, Option<String>>(0))?
826                    {
827                        let duplicated_event = duplicated_event?;
828
829                        let Some(duplicated_event) = duplicated_event else {
830                            // Event ID is malformed, let's skip it.
831                            continue;
832                        };
833
834                        let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
835                            // Normally unreachable, but the event ID has been stored even if it is
836                            // malformed, let's skip it.
837                            continue;
838                        };
839
840                        duplicated_events.push(duplicated_event);
841                    }
842
843                    Ok(duplicated_events)
844                })
845            })
846            .await
847    }
848
849    async fn add_media_content(
850        &self,
851        request: &MediaRequestParameters,
852        content: Vec<u8>,
853        ignore_policy: IgnoreMediaRetentionPolicy,
854    ) -> Result<()> {
855        self.media_service.add_media_content(self, request, content, ignore_policy).await
856    }
857
858    async fn replace_media_key(
859        &self,
860        from: &MediaRequestParameters,
861        to: &MediaRequestParameters,
862    ) -> Result<(), Self::Error> {
863        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
864        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
865
866        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
867        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
868
869        let conn = self.acquire().await?;
870        conn.execute(
871            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
872            (new_uri, new_format, prev_uri, prev_format),
873        )
874        .await?;
875
876        Ok(())
877    }
878
879    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
880        self.media_service.get_media_content(self, request).await
881    }
882
883    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
884        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
885        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
886
887        let conn = self.acquire().await?;
888        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
889
890        Ok(())
891    }
892
893    async fn get_media_content_for_uri(
894        &self,
895        uri: &MxcUri,
896    ) -> Result<Option<Vec<u8>>, Self::Error> {
897        self.media_service.get_media_content_for_uri(self, uri).await
898    }
899
900    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
901        let uri = self.encode_key(keys::MEDIA, uri);
902
903        let conn = self.acquire().await?;
904        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
905
906        Ok(())
907    }
908
909    async fn set_media_retention_policy(
910        &self,
911        policy: MediaRetentionPolicy,
912    ) -> Result<(), Self::Error> {
913        self.media_service.set_media_retention_policy(self, policy).await
914    }
915
916    fn media_retention_policy(&self) -> MediaRetentionPolicy {
917        self.media_service.media_retention_policy()
918    }
919
920    async fn set_ignore_media_retention_policy(
921        &self,
922        request: &MediaRequestParameters,
923        ignore_policy: IgnoreMediaRetentionPolicy,
924    ) -> Result<(), Self::Error> {
925        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
926    }
927
928    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
929        self.media_service.clean_up_media_cache(self).await
930    }
931}
932
933#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
934#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
935impl EventCacheStoreMedia for SqliteEventCacheStore {
936    type Error = Error;
937
938    async fn media_retention_policy_inner(
939        &self,
940    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
941        let conn = self.acquire().await?;
942        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
943    }
944
945    async fn set_media_retention_policy_inner(
946        &self,
947        policy: MediaRetentionPolicy,
948    ) -> Result<(), Self::Error> {
949        let conn = self.acquire().await?;
950        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
951        Ok(())
952    }
953
954    async fn add_media_content_inner(
955        &self,
956        request: &MediaRequestParameters,
957        data: Vec<u8>,
958        last_access: SystemTime,
959        policy: MediaRetentionPolicy,
960        ignore_policy: IgnoreMediaRetentionPolicy,
961    ) -> Result<(), Self::Error> {
962        let ignore_policy = ignore_policy.is_yes();
963        let data = self.encode_value(data)?;
964
965        if !ignore_policy && policy.exceeds_max_file_size(data.len()) {
966            return Ok(());
967        }
968
969        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
970        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
971        let timestamp = time_to_timestamp(last_access);
972
973        let conn = self.acquire().await?;
974        conn.execute(
975            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
976            (uri, format, data, timestamp, ignore_policy),
977        )
978        .await?;
979
980        Ok(())
981    }
982
983    async fn set_ignore_media_retention_policy_inner(
984        &self,
985        request: &MediaRequestParameters,
986        ignore_policy: IgnoreMediaRetentionPolicy,
987    ) -> Result<(), Self::Error> {
988        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
989        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
990        let ignore_policy = ignore_policy.is_yes();
991
992        let conn = self.acquire().await?;
993        conn.execute(
994            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
995            (ignore_policy, uri, format),
996        )
997        .await?;
998
999        Ok(())
1000    }
1001
1002    async fn get_media_content_inner(
1003        &self,
1004        request: &MediaRequestParameters,
1005        current_time: SystemTime,
1006    ) -> Result<Option<Vec<u8>>, Self::Error> {
1007        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1008        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1009        let timestamp = time_to_timestamp(current_time);
1010
1011        let conn = self.acquire().await?;
1012        let data = conn
1013            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1014                // Update the last access.
1015                // We need to do this first so the transaction is in write mode right away.
1016                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1017                txn.execute(
1018                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1019                    (timestamp, &uri, &format),
1020                )?;
1021
1022                txn.query_row::<Vec<u8>, _, _>(
1023                    "SELECT data FROM media WHERE uri = ? AND format = ?",
1024                    (&uri, &format),
1025                    |row| row.get(0),
1026                )
1027                .optional()
1028            })
1029            .await?;
1030
1031        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1032    }
1033
1034    async fn get_media_content_for_uri_inner(
1035        &self,
1036        uri: &MxcUri,
1037        current_time: SystemTime,
1038    ) -> Result<Option<Vec<u8>>, Self::Error> {
1039        let uri = self.encode_key(keys::MEDIA, uri);
1040        let timestamp = time_to_timestamp(current_time);
1041
1042        let conn = self.acquire().await?;
1043        let data = conn
1044            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1045                // Update the last access.
1046                // We need to do this first so the transaction is in write mode right away.
1047                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1048                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1049
1050                txn.query_row::<Vec<u8>, _, _>(
1051                    "SELECT data FROM media WHERE uri = ?",
1052                    (&uri,),
1053                    |row| row.get(0),
1054                )
1055                .optional()
1056            })
1057            .await?;
1058
1059        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1060    }
1061
1062    async fn clean_up_media_cache_inner(
1063        &self,
1064        policy: MediaRetentionPolicy,
1065        current_time: SystemTime,
1066    ) -> Result<(), Self::Error> {
1067        if !policy.has_limitations() {
1068            // We can safely skip all the checks.
1069            return Ok(());
1070        }
1071
1072        let conn = self.acquire().await?;
1073        let removed = conn
1074            .with_transaction::<_, Error, _>(move |txn| {
1075                let mut removed = false;
1076
1077                // First, check media content that exceed the max filesize.
1078                if let Some(max_file_size) = policy.computed_max_file_size() {
1079                    let count = txn.execute(
1080                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1081                        (max_file_size,),
1082                    )?;
1083
1084                    if count > 0 {
1085                        removed = true;
1086                    }
1087                }
1088
1089                // Then, clean up expired media content.
1090                if let Some(last_access_expiry) = policy.last_access_expiry {
1091                    let current_timestamp = time_to_timestamp(current_time);
1092                    let expiry_secs = last_access_expiry.as_secs();
1093                    let count = txn.execute(
1094                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1095                        (current_timestamp, expiry_secs),
1096                    )?;
1097
1098                    if count > 0 {
1099                        removed = true;
1100                    }
1101                }
1102
1103                // Finally, if the cache size is too big, remove old items until it fits.
1104                if let Some(max_cache_size) = policy.max_cache_size {
1105                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
1106                    // during the conversion of the result.
1107                    let cache_size_int = txn
1108                        .query_row(
1109                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1110                            (),
1111                            |row| {
1112                                // `sum()` returns `NULL` if there are no rows.
1113                                row.get::<_, Option<i64>>(0)
1114                            },
1115                        )?
1116                        .unwrap_or_default();
1117                    let cache_size_usize = usize::try_from(cache_size_int);
1118
1119                    // If the cache size is overflowing or bigger than max cache size, clean up.
1120                    if cache_size_usize.is_err()
1121                        || cache_size_usize.is_ok_and(|cache_size| cache_size > max_cache_size)
1122                    {
1123                        // Get the sizes of the media contents ordered by last access.
1124                        let mut cached_stmt = txn.prepare_cached(
1125                            "SELECT rowid, length(data) FROM media \
1126                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1127                        )?;
1128                        let content_sizes = cached_stmt
1129                            .query(())?
1130                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, usize>(1)?)));
1131
1132                        let mut accumulated_items_size = 0usize;
1133                        let mut limit_reached = false;
1134                        let mut rows_to_remove = Vec::new();
1135
1136                        for result in content_sizes {
1137                            let (row_id, size) = match result {
1138                                Ok(content_size) => content_size,
1139                                Err(error) => {
1140                                    return Err(error.into());
1141                                }
1142                            };
1143
1144                            if limit_reached {
1145                                rows_to_remove.push(row_id);
1146                                continue;
1147                            }
1148
1149                            match accumulated_items_size.checked_add(size) {
1150                                Some(acc) if acc > max_cache_size => {
1151                                    // We can stop accumulating.
1152                                    limit_reached = true;
1153                                    rows_to_remove.push(row_id);
1154                                }
1155                                Some(acc) => accumulated_items_size = acc,
1156                                None => {
1157                                    // The accumulated size is overflowing but the setting cannot be
1158                                    // bigger than usize::MAX, we can stop accumulating.
1159                                    limit_reached = true;
1160                                    rows_to_remove.push(row_id);
1161                                }
1162                            };
1163                        }
1164
1165                        if !rows_to_remove.is_empty() {
1166                            removed = true;
1167                        }
1168
1169                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1170                            let sql_params = repeat_vars(row_ids.len());
1171                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1172                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1173                            Ok(Vec::<()>::new())
1174                        })?;
1175                    }
1176                }
1177
1178                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1179
1180                Ok(removed)
1181            })
1182            .await?;
1183
1184        // If we removed media, defragment the database and free space on the
1185        // filesystem.
1186        if removed {
1187            conn.vacuum().await?;
1188        }
1189
1190        Ok(())
1191    }
1192
1193    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1194        let conn = self.acquire().await?;
1195        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1196    }
1197}
1198
1199/// Like `deadpool::managed::Object::with_transaction`, but starts the
1200/// transaction in immediate (write) mode from the beginning, precluding errors
1201/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1202/// both reads and writes, and start with a write.
1203async fn with_immediate_transaction<
1204    T: Send + 'static,
1205    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1206>(
1207    conn: SqliteAsyncConn,
1208    f: F,
1209) -> Result<T, Error> {
1210    conn.interact(move |conn| -> Result<T, Error> {
1211        // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1212        // to avoid read transactions upgrading to write mode and causing
1213        // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1214        conn.set_transaction_behavior(TransactionBehavior::Immediate);
1215
1216        let code = || -> Result<T, Error> {
1217            let txn = conn.transaction()?;
1218            let res = f(&txn)?;
1219            txn.commit()?;
1220            Ok(res)
1221        };
1222
1223        let res = code();
1224
1225        // Reset the transaction behavior to use Deferred, after this transaction has
1226        // been run, whether it was successful or not.
1227        conn.set_transaction_behavior(TransactionBehavior::Deferred);
1228
1229        res
1230    })
1231    .await
1232    // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1233    .unwrap()
1234}
1235
1236fn insert_chunk(
1237    txn: &Transaction<'_>,
1238    room_id: &Key,
1239    previous: Option<u64>,
1240    new: u64,
1241    next: Option<u64>,
1242    type_str: &str,
1243) -> rusqlite::Result<()> {
1244    // First, insert the new chunk.
1245    txn.execute(
1246        r#"
1247            INSERT INTO linked_chunks(id, room_id, previous, next, type)
1248            VALUES (?, ?, ?, ?, ?)
1249        "#,
1250        (new, room_id, previous, next, type_str),
1251    )?;
1252
1253    // If this chunk has a previous one, update its `next` field.
1254    if let Some(previous) = previous {
1255        txn.execute(
1256            r#"
1257                UPDATE linked_chunks
1258                SET next = ?
1259                WHERE id = ? AND room_id = ?
1260            "#,
1261            (new, previous, room_id),
1262        )?;
1263    }
1264
1265    // If this chunk has a next one, update its `previous` field.
1266    if let Some(next) = next {
1267        txn.execute(
1268            r#"
1269                UPDATE linked_chunks
1270                SET previous = ?
1271                WHERE id = ? AND room_id = ?
1272            "#,
1273            (new, next, room_id),
1274        )?;
1275    }
1276
1277    Ok(())
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282    use std::{
1283        sync::atomic::{AtomicU32, Ordering::SeqCst},
1284        time::Duration,
1285    };
1286
1287    use assert_matches::assert_matches;
1288    use matrix_sdk_base::{
1289        event_cache::{
1290            store::{
1291                integration_tests::{check_test_event, make_test_event},
1292                media::IgnoreMediaRetentionPolicy,
1293                EventCacheStore, EventCacheStoreError,
1294            },
1295            Gap,
1296        },
1297        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1298        event_cache_store_media_integration_tests,
1299        linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1300        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1301    };
1302    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1303    use once_cell::sync::Lazy;
1304    use ruma::{events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1305    use tempfile::{tempdir, TempDir};
1306
1307    use super::SqliteEventCacheStore;
1308    use crate::utils::SqliteAsyncConnExt;
1309
1310    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1311    static NUM: AtomicU32 = AtomicU32::new(0);
1312
1313    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1314        let name = NUM.fetch_add(1, SeqCst).to_string();
1315        let tmpdir_path = TMP_DIR.path().join(name);
1316
1317        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1318
1319        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1320    }
1321
1322    event_cache_store_integration_tests!();
1323    event_cache_store_integration_tests_time!();
1324    event_cache_store_media_integration_tests!(with_media_size_tests);
1325
1326    async fn get_event_cache_store_content_sorted_by_last_access(
1327        event_cache_store: &SqliteEventCacheStore,
1328    ) -> Vec<Vec<u8>> {
1329        let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1330        sqlite_db
1331            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1332                stmt.query(())?.mapped(|row| row.get(0)).collect()
1333            })
1334            .await
1335            .expect("querying media cache content by last access failed")
1336    }
1337
1338    #[async_test]
1339    async fn test_last_access() {
1340        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1341        let uri = mxc_uri!("mxc://localhost/media");
1342        let file_request = MediaRequestParameters {
1343            source: MediaSource::Plain(uri.to_owned()),
1344            format: MediaFormat::File,
1345        };
1346        let thumbnail_request = MediaRequestParameters {
1347            source: MediaSource::Plain(uri.to_owned()),
1348            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1349                Method::Crop,
1350                uint!(100),
1351                uint!(100),
1352            )),
1353        };
1354
1355        let content: Vec<u8> = "hello world".into();
1356        let thumbnail_content: Vec<u8> = "hello…".into();
1357
1358        // Add the media.
1359        event_cache_store
1360            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1361            .await
1362            .expect("adding file failed");
1363
1364        // Since the precision of the timestamp is in seconds, wait so the timestamps
1365        // differ.
1366        tokio::time::sleep(Duration::from_secs(3)).await;
1367
1368        event_cache_store
1369            .add_media_content(
1370                &thumbnail_request,
1371                thumbnail_content.clone(),
1372                IgnoreMediaRetentionPolicy::No,
1373            )
1374            .await
1375            .expect("adding thumbnail failed");
1376
1377        // File's last access is older than thumbnail.
1378        let contents =
1379            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1380
1381        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1382        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1383        assert_eq!(contents[1], content, "file is not second-to-last access");
1384
1385        // Since the precision of the timestamp is in seconds, wait so the timestamps
1386        // differ.
1387        tokio::time::sleep(Duration::from_secs(3)).await;
1388
1389        // Access the file so its last access is more recent.
1390        let _ = event_cache_store
1391            .get_media_content(&file_request)
1392            .await
1393            .expect("getting file failed")
1394            .expect("file is missing");
1395
1396        // File's last access is more recent than thumbnail.
1397        let contents =
1398            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1399
1400        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1401        assert_eq!(contents[0], content, "file is not last access");
1402        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1403    }
1404
1405    #[async_test]
1406    async fn test_linked_chunk_new_items_chunk() {
1407        let store = get_event_cache_store().await.expect("creating cache store failed");
1408
1409        let room_id = &DEFAULT_TEST_ROOM_ID;
1410
1411        store
1412            .handle_linked_chunk_updates(
1413                room_id,
1414                vec![
1415                    Update::NewItemsChunk {
1416                        previous: None,
1417                        new: ChunkIdentifier::new(42),
1418                        next: None, // Note: the store must link the next entry itself.
1419                    },
1420                    Update::NewItemsChunk {
1421                        previous: Some(ChunkIdentifier::new(42)),
1422                        new: ChunkIdentifier::new(13),
1423                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1424                                                               * the next link ahead of time. */
1425                    },
1426                    Update::NewItemsChunk {
1427                        previous: Some(ChunkIdentifier::new(13)),
1428                        new: ChunkIdentifier::new(37),
1429                        next: None,
1430                    },
1431                ],
1432            )
1433            .await
1434            .unwrap();
1435
1436        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1437
1438        assert_eq!(chunks.len(), 3);
1439
1440        {
1441            // Chunks are ordered from smaller to bigger IDs.
1442            let c = chunks.remove(0);
1443            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1444            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1445            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1446            assert_matches!(c.content, ChunkContent::Items(events) => {
1447                assert!(events.is_empty());
1448            });
1449
1450            let c = chunks.remove(0);
1451            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1452            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1453            assert_eq!(c.next, None);
1454            assert_matches!(c.content, ChunkContent::Items(events) => {
1455                assert!(events.is_empty());
1456            });
1457
1458            let c = chunks.remove(0);
1459            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1460            assert_eq!(c.previous, None);
1461            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1462            assert_matches!(c.content, ChunkContent::Items(events) => {
1463                assert!(events.is_empty());
1464            });
1465        }
1466    }
1467
1468    #[async_test]
1469    async fn test_linked_chunk_new_gap_chunk() {
1470        let store = get_event_cache_store().await.expect("creating cache store failed");
1471
1472        let room_id = &DEFAULT_TEST_ROOM_ID;
1473
1474        store
1475            .handle_linked_chunk_updates(
1476                room_id,
1477                vec![Update::NewGapChunk {
1478                    previous: None,
1479                    new: ChunkIdentifier::new(42),
1480                    next: None,
1481                    gap: Gap { prev_token: "raclette".to_owned() },
1482                }],
1483            )
1484            .await
1485            .unwrap();
1486
1487        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1488
1489        assert_eq!(chunks.len(), 1);
1490
1491        // Chunks are ordered from smaller to bigger IDs.
1492        let c = chunks.remove(0);
1493        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1494        assert_eq!(c.previous, None);
1495        assert_eq!(c.next, None);
1496        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1497            assert_eq!(gap.prev_token, "raclette");
1498        });
1499    }
1500
1501    #[async_test]
1502    async fn test_linked_chunk_replace_item() {
1503        let store = get_event_cache_store().await.expect("creating cache store failed");
1504
1505        let room_id = &DEFAULT_TEST_ROOM_ID;
1506
1507        store
1508            .handle_linked_chunk_updates(
1509                room_id,
1510                vec![
1511                    Update::NewItemsChunk {
1512                        previous: None,
1513                        new: ChunkIdentifier::new(42),
1514                        next: None,
1515                    },
1516                    Update::PushItems {
1517                        at: Position::new(ChunkIdentifier::new(42), 0),
1518                        items: vec![
1519                            make_test_event(room_id, "hello"),
1520                            make_test_event(room_id, "world"),
1521                        ],
1522                    },
1523                    Update::ReplaceItem {
1524                        at: Position::new(ChunkIdentifier::new(42), 1),
1525                        item: make_test_event(room_id, "yolo"),
1526                    },
1527                ],
1528            )
1529            .await
1530            .unwrap();
1531
1532        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1533
1534        assert_eq!(chunks.len(), 1);
1535
1536        let c = chunks.remove(0);
1537        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1538        assert_eq!(c.previous, None);
1539        assert_eq!(c.next, None);
1540        assert_matches!(c.content, ChunkContent::Items(events) => {
1541            assert_eq!(events.len(), 2);
1542            check_test_event(&events[0], "hello");
1543            check_test_event(&events[1], "yolo");
1544        });
1545    }
1546
1547    #[async_test]
1548    async fn test_linked_chunk_remove_chunk() {
1549        let store = get_event_cache_store().await.expect("creating cache store failed");
1550
1551        let room_id = &DEFAULT_TEST_ROOM_ID;
1552
1553        store
1554            .handle_linked_chunk_updates(
1555                room_id,
1556                vec![
1557                    Update::NewGapChunk {
1558                        previous: None,
1559                        new: ChunkIdentifier::new(42),
1560                        next: None,
1561                        gap: Gap { prev_token: "raclette".to_owned() },
1562                    },
1563                    Update::NewGapChunk {
1564                        previous: Some(ChunkIdentifier::new(42)),
1565                        new: ChunkIdentifier::new(43),
1566                        next: None,
1567                        gap: Gap { prev_token: "fondue".to_owned() },
1568                    },
1569                    Update::NewGapChunk {
1570                        previous: Some(ChunkIdentifier::new(43)),
1571                        new: ChunkIdentifier::new(44),
1572                        next: None,
1573                        gap: Gap { prev_token: "tartiflette".to_owned() },
1574                    },
1575                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1576                ],
1577            )
1578            .await
1579            .unwrap();
1580
1581        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1582
1583        assert_eq!(chunks.len(), 2);
1584
1585        // Chunks are ordered from smaller to bigger IDs.
1586        let c = chunks.remove(0);
1587        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1588        assert_eq!(c.previous, None);
1589        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1590        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1591            assert_eq!(gap.prev_token, "raclette");
1592        });
1593
1594        let c = chunks.remove(0);
1595        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1596        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1597        assert_eq!(c.next, None);
1598        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1599            assert_eq!(gap.prev_token, "tartiflette");
1600        });
1601
1602        // Check that cascading worked. Yes, sqlite, I doubt you.
1603        let gaps = store
1604            .acquire()
1605            .await
1606            .unwrap()
1607            .with_transaction(|txn| -> rusqlite::Result<_> {
1608                let mut gaps = Vec::new();
1609                for data in txn
1610                    .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
1611                    .query_map((), |row| row.get::<_, u64>(0))?
1612                {
1613                    gaps.push(data?);
1614                }
1615                Ok(gaps)
1616            })
1617            .await
1618            .unwrap();
1619
1620        assert_eq!(gaps, vec![42, 44]);
1621    }
1622
1623    #[async_test]
1624    async fn test_linked_chunk_push_items() {
1625        let store = get_event_cache_store().await.expect("creating cache store failed");
1626
1627        let room_id = &DEFAULT_TEST_ROOM_ID;
1628
1629        store
1630            .handle_linked_chunk_updates(
1631                room_id,
1632                vec![
1633                    Update::NewItemsChunk {
1634                        previous: None,
1635                        new: ChunkIdentifier::new(42),
1636                        next: None,
1637                    },
1638                    Update::PushItems {
1639                        at: Position::new(ChunkIdentifier::new(42), 0),
1640                        items: vec![
1641                            make_test_event(room_id, "hello"),
1642                            make_test_event(room_id, "world"),
1643                        ],
1644                    },
1645                    Update::PushItems {
1646                        at: Position::new(ChunkIdentifier::new(42), 2),
1647                        items: vec![make_test_event(room_id, "who?")],
1648                    },
1649                ],
1650            )
1651            .await
1652            .unwrap();
1653
1654        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1655
1656        assert_eq!(chunks.len(), 1);
1657
1658        let c = chunks.remove(0);
1659        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1660        assert_eq!(c.previous, None);
1661        assert_eq!(c.next, None);
1662        assert_matches!(c.content, ChunkContent::Items(events) => {
1663            assert_eq!(events.len(), 3);
1664
1665            check_test_event(&events[0], "hello");
1666            check_test_event(&events[1], "world");
1667            check_test_event(&events[2], "who?");
1668        });
1669    }
1670
1671    #[async_test]
1672    async fn test_linked_chunk_remove_item() {
1673        let store = get_event_cache_store().await.expect("creating cache store failed");
1674
1675        let room_id = *DEFAULT_TEST_ROOM_ID;
1676
1677        store
1678            .handle_linked_chunk_updates(
1679                room_id,
1680                vec![
1681                    Update::NewItemsChunk {
1682                        previous: None,
1683                        new: ChunkIdentifier::new(42),
1684                        next: None,
1685                    },
1686                    Update::PushItems {
1687                        at: Position::new(ChunkIdentifier::new(42), 0),
1688                        items: vec![
1689                            make_test_event(room_id, "hello"),
1690                            make_test_event(room_id, "world"),
1691                        ],
1692                    },
1693                    Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1694                ],
1695            )
1696            .await
1697            .unwrap();
1698
1699        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1700
1701        assert_eq!(chunks.len(), 1);
1702
1703        let c = chunks.remove(0);
1704        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1705        assert_eq!(c.previous, None);
1706        assert_eq!(c.next, None);
1707        assert_matches!(c.content, ChunkContent::Items(events) => {
1708            assert_eq!(events.len(), 1);
1709            check_test_event(&events[0], "world");
1710        });
1711
1712        // Make sure the position has been updated for the remaining event.
1713        let num_rows: u64 = store
1714            .acquire()
1715            .await
1716            .unwrap()
1717            .with_transaction(move |txn| {
1718                txn.query_row(
1719                    "SELECT COUNT(*) FROM events WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1720                    (room_id.as_bytes(),),
1721                    |row| row.get(0),
1722                )
1723            })
1724            .await
1725            .unwrap();
1726        assert_eq!(num_rows, 1);
1727    }
1728
1729    #[async_test]
1730    async fn test_linked_chunk_detach_last_items() {
1731        let store = get_event_cache_store().await.expect("creating cache store failed");
1732
1733        let room_id = *DEFAULT_TEST_ROOM_ID;
1734
1735        store
1736            .handle_linked_chunk_updates(
1737                room_id,
1738                vec![
1739                    Update::NewItemsChunk {
1740                        previous: None,
1741                        new: ChunkIdentifier::new(42),
1742                        next: None,
1743                    },
1744                    Update::PushItems {
1745                        at: Position::new(ChunkIdentifier::new(42), 0),
1746                        items: vec![
1747                            make_test_event(room_id, "hello"),
1748                            make_test_event(room_id, "world"),
1749                            make_test_event(room_id, "howdy"),
1750                        ],
1751                    },
1752                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1753                ],
1754            )
1755            .await
1756            .unwrap();
1757
1758        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1759
1760        assert_eq!(chunks.len(), 1);
1761
1762        let c = chunks.remove(0);
1763        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1764        assert_eq!(c.previous, None);
1765        assert_eq!(c.next, None);
1766        assert_matches!(c.content, ChunkContent::Items(events) => {
1767            assert_eq!(events.len(), 1);
1768            check_test_event(&events[0], "hello");
1769        });
1770    }
1771
1772    #[async_test]
1773    async fn test_linked_chunk_start_end_reattach_items() {
1774        let store = get_event_cache_store().await.expect("creating cache store failed");
1775
1776        let room_id = *DEFAULT_TEST_ROOM_ID;
1777
1778        // Same updates and checks as test_linked_chunk_push_items, but with extra
1779        // `StartReattachItems` and `EndReattachItems` updates, which must have no
1780        // effects.
1781        store
1782            .handle_linked_chunk_updates(
1783                room_id,
1784                vec![
1785                    Update::NewItemsChunk {
1786                        previous: None,
1787                        new: ChunkIdentifier::new(42),
1788                        next: None,
1789                    },
1790                    Update::PushItems {
1791                        at: Position::new(ChunkIdentifier::new(42), 0),
1792                        items: vec![
1793                            make_test_event(room_id, "hello"),
1794                            make_test_event(room_id, "world"),
1795                            make_test_event(room_id, "howdy"),
1796                        ],
1797                    },
1798                    Update::StartReattachItems,
1799                    Update::EndReattachItems,
1800                ],
1801            )
1802            .await
1803            .unwrap();
1804
1805        let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1806
1807        assert_eq!(chunks.len(), 1);
1808
1809        let c = chunks.remove(0);
1810        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1811        assert_eq!(c.previous, None);
1812        assert_eq!(c.next, None);
1813        assert_matches!(c.content, ChunkContent::Items(events) => {
1814            assert_eq!(events.len(), 3);
1815            check_test_event(&events[0], "hello");
1816            check_test_event(&events[1], "world");
1817            check_test_event(&events[2], "howdy");
1818        });
1819    }
1820
1821    #[async_test]
1822    async fn test_linked_chunk_clear() {
1823        let store = get_event_cache_store().await.expect("creating cache store failed");
1824
1825        let room_id = *DEFAULT_TEST_ROOM_ID;
1826
1827        store
1828            .handle_linked_chunk_updates(
1829                room_id,
1830                vec![
1831                    Update::NewItemsChunk {
1832                        previous: None,
1833                        new: ChunkIdentifier::new(42),
1834                        next: None,
1835                    },
1836                    Update::NewGapChunk {
1837                        previous: Some(ChunkIdentifier::new(42)),
1838                        new: ChunkIdentifier::new(54),
1839                        next: None,
1840                        gap: Gap { prev_token: "fondue".to_owned() },
1841                    },
1842                    Update::PushItems {
1843                        at: Position::new(ChunkIdentifier::new(42), 0),
1844                        items: vec![
1845                            make_test_event(room_id, "hello"),
1846                            make_test_event(room_id, "world"),
1847                            make_test_event(room_id, "howdy"),
1848                        ],
1849                    },
1850                    Update::Clear,
1851                ],
1852            )
1853            .await
1854            .unwrap();
1855
1856        let chunks = store.load_all_chunks(room_id).await.unwrap();
1857        assert!(chunks.is_empty());
1858    }
1859
1860    #[async_test]
1861    async fn test_linked_chunk_multiple_rooms() {
1862        let store = get_event_cache_store().await.expect("creating cache store failed");
1863
1864        let room1 = room_id!("!realcheeselovers:raclette.fr");
1865        let room2 = room_id!("!realcheeselovers:fondue.ch");
1866
1867        // Check that applying updates to one room doesn't affect the others.
1868        // Use the same chunk identifier in both rooms to battle-test search.
1869
1870        store
1871            .handle_linked_chunk_updates(
1872                room1,
1873                vec![
1874                    Update::NewItemsChunk {
1875                        previous: None,
1876                        new: ChunkIdentifier::new(42),
1877                        next: None,
1878                    },
1879                    Update::PushItems {
1880                        at: Position::new(ChunkIdentifier::new(42), 0),
1881                        items: vec![
1882                            make_test_event(room1, "best cheese is raclette"),
1883                            make_test_event(room1, "obviously"),
1884                        ],
1885                    },
1886                ],
1887            )
1888            .await
1889            .unwrap();
1890
1891        store
1892            .handle_linked_chunk_updates(
1893                room2,
1894                vec![
1895                    Update::NewItemsChunk {
1896                        previous: None,
1897                        new: ChunkIdentifier::new(42),
1898                        next: None,
1899                    },
1900                    Update::PushItems {
1901                        at: Position::new(ChunkIdentifier::new(42), 0),
1902                        items: vec![make_test_event(room1, "beaufort is the best")],
1903                    },
1904                ],
1905            )
1906            .await
1907            .unwrap();
1908
1909        // Check chunks from room 1.
1910        let mut chunks_room1 = store.load_all_chunks(room1).await.unwrap();
1911        assert_eq!(chunks_room1.len(), 1);
1912
1913        let c = chunks_room1.remove(0);
1914        assert_matches!(c.content, ChunkContent::Items(events) => {
1915            assert_eq!(events.len(), 2);
1916            check_test_event(&events[0], "best cheese is raclette");
1917            check_test_event(&events[1], "obviously");
1918        });
1919
1920        // Check chunks from room 2.
1921        let mut chunks_room2 = store.load_all_chunks(room2).await.unwrap();
1922        assert_eq!(chunks_room2.len(), 1);
1923
1924        let c = chunks_room2.remove(0);
1925        assert_matches!(c.content, ChunkContent::Items(events) => {
1926            assert_eq!(events.len(), 1);
1927            check_test_event(&events[0], "beaufort is the best");
1928        });
1929    }
1930
1931    #[async_test]
1932    async fn test_linked_chunk_update_is_a_transaction() {
1933        let store = get_event_cache_store().await.expect("creating cache store failed");
1934
1935        let room_id = *DEFAULT_TEST_ROOM_ID;
1936
1937        // Trigger a violation of the unique constraint on the (room id, chunk id)
1938        // couple.
1939        let err = store
1940            .handle_linked_chunk_updates(
1941                room_id,
1942                vec![
1943                    Update::NewItemsChunk {
1944                        previous: None,
1945                        new: ChunkIdentifier::new(42),
1946                        next: None,
1947                    },
1948                    Update::NewItemsChunk {
1949                        previous: None,
1950                        new: ChunkIdentifier::new(42),
1951                        next: None,
1952                    },
1953                ],
1954            )
1955            .await
1956            .unwrap_err();
1957
1958        // The operation fails with a constraint violation error.
1959        assert_matches!(err, crate::error::Error::Sqlite(err) => {
1960            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1961        });
1962
1963        // If the updates have been handled transactionally, then no new chunks should
1964        // have been added; failure of the second update leads to the first one being
1965        // rolled back.
1966        let chunks = store.load_all_chunks(room_id).await.unwrap();
1967        assert!(chunks.is_empty());
1968    }
1969
1970    #[async_test]
1971    async fn test_filter_duplicate_events_no_events() {
1972        let store = get_event_cache_store().await.expect("creating cache store failed");
1973
1974        let room_id = *DEFAULT_TEST_ROOM_ID;
1975        let duplicates = store.filter_duplicated_events(room_id, Vec::new()).await.unwrap();
1976        assert!(duplicates.is_empty());
1977    }
1978
1979    #[async_test]
1980    async fn test_load_last_chunk() {
1981        let room_id = room_id!("!r0:matrix.org");
1982        let event = |msg: &str| make_test_event(room_id, msg);
1983        let store = get_event_cache_store().await.expect("creating cache store failed");
1984
1985        // Case #1: no last chunk.
1986        {
1987            let (last_chunk, chunk_identifier_generator) =
1988                store.load_last_chunk(room_id).await.unwrap();
1989
1990            assert!(last_chunk.is_none());
1991            assert_eq!(chunk_identifier_generator.current(), 0);
1992        }
1993
1994        // Case #2: only one chunk is present.
1995        {
1996            store
1997                .handle_linked_chunk_updates(
1998                    room_id,
1999                    vec![
2000                        Update::NewItemsChunk {
2001                            previous: None,
2002                            new: ChunkIdentifier::new(42),
2003                            next: None,
2004                        },
2005                        Update::PushItems {
2006                            at: Position::new(ChunkIdentifier::new(42), 0),
2007                            items: vec![event("saucisse de morteau"), event("comté")],
2008                        },
2009                    ],
2010                )
2011                .await
2012                .unwrap();
2013
2014            let (last_chunk, chunk_identifier_generator) =
2015                store.load_last_chunk(room_id).await.unwrap();
2016
2017            assert_matches!(last_chunk, Some(last_chunk) => {
2018                assert_eq!(last_chunk.identifier, 42);
2019                assert!(last_chunk.previous.is_none());
2020                assert!(last_chunk.next.is_none());
2021                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2022                    assert_eq!(items.len(), 2);
2023                    check_test_event(&items[0], "saucisse de morteau");
2024                    check_test_event(&items[1], "comté");
2025                });
2026            });
2027            assert_eq!(chunk_identifier_generator.current(), 42);
2028        }
2029
2030        // Case #3: more chunks are present.
2031        {
2032            store
2033                .handle_linked_chunk_updates(
2034                    room_id,
2035                    vec![
2036                        Update::NewItemsChunk {
2037                            previous: Some(ChunkIdentifier::new(42)),
2038                            new: ChunkIdentifier::new(7),
2039                            next: None,
2040                        },
2041                        Update::PushItems {
2042                            at: Position::new(ChunkIdentifier::new(7), 0),
2043                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2044                        },
2045                    ],
2046                )
2047                .await
2048                .unwrap();
2049
2050            let (last_chunk, chunk_identifier_generator) =
2051                store.load_last_chunk(room_id).await.unwrap();
2052
2053            assert_matches!(last_chunk, Some(last_chunk) => {
2054                assert_eq!(last_chunk.identifier, 7);
2055                assert_matches!(last_chunk.previous, Some(previous) => {
2056                    assert_eq!(previous, 42);
2057                });
2058                assert!(last_chunk.next.is_none());
2059                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2060                    assert_eq!(items.len(), 3);
2061                    check_test_event(&items[0], "fondue");
2062                    check_test_event(&items[1], "gruyère");
2063                    check_test_event(&items[2], "mont d'or");
2064                });
2065            });
2066            assert_eq!(chunk_identifier_generator.current(), 42);
2067        }
2068    }
2069
2070    #[async_test]
2071    async fn test_load_last_chunk_with_a_cycle() {
2072        let room_id = room_id!("!r0:matrix.org");
2073        let store = get_event_cache_store().await.expect("creating cache store failed");
2074
2075        store
2076            .handle_linked_chunk_updates(
2077                room_id,
2078                vec![
2079                    Update::NewItemsChunk {
2080                        previous: None,
2081                        new: ChunkIdentifier::new(0),
2082                        next: None,
2083                    },
2084                    Update::NewItemsChunk {
2085                        // Because `previous` connects to chunk #0, it will create a cycle.
2086                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2087                        // **does not exist**. We have to detect this cycle.
2088                        previous: Some(ChunkIdentifier::new(0)),
2089                        new: ChunkIdentifier::new(1),
2090                        next: Some(ChunkIdentifier::new(0)),
2091                    },
2092                ],
2093            )
2094            .await
2095            .unwrap();
2096
2097        store.load_last_chunk(room_id).await.unwrap_err();
2098    }
2099
2100    #[async_test]
2101    async fn test_load_previous_chunk() {
2102        let room_id = room_id!("!r0:matrix.org");
2103        let event = |msg: &str| make_test_event(room_id, msg);
2104        let store = get_event_cache_store().await.expect("creating cache store failed");
2105
2106        // Case #1: no chunk at all, equivalent to having an nonexistent
2107        // `before_chunk_identifier`.
2108        {
2109            let previous_chunk =
2110                store.load_previous_chunk(room_id, ChunkIdentifier::new(153)).await.unwrap();
2111
2112            assert!(previous_chunk.is_none());
2113        }
2114
2115        // Case #2: there is one chunk only: we request the previous on this
2116        // one, it doesn't exist.
2117        {
2118            store
2119                .handle_linked_chunk_updates(
2120                    room_id,
2121                    vec![Update::NewItemsChunk {
2122                        previous: None,
2123                        new: ChunkIdentifier::new(42),
2124                        next: None,
2125                    }],
2126                )
2127                .await
2128                .unwrap();
2129
2130            let previous_chunk =
2131                store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2132
2133            assert!(previous_chunk.is_none());
2134        }
2135
2136        // Case #3: there are two chunks.
2137        {
2138            store
2139                .handle_linked_chunk_updates(
2140                    room_id,
2141                    vec![
2142                        // new chunk before the one that exists.
2143                        Update::NewItemsChunk {
2144                            previous: None,
2145                            new: ChunkIdentifier::new(7),
2146                            next: Some(ChunkIdentifier::new(42)),
2147                        },
2148                        Update::PushItems {
2149                            at: Position::new(ChunkIdentifier::new(7), 0),
2150                            items: vec![event("brigand du jorat"), event("morbier")],
2151                        },
2152                    ],
2153                )
2154                .await
2155                .unwrap();
2156
2157            let previous_chunk =
2158                store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2159
2160            assert_matches!(previous_chunk, Some(previous_chunk) => {
2161                assert_eq!(previous_chunk.identifier, 7);
2162                assert!(previous_chunk.previous.is_none());
2163                assert_matches!(previous_chunk.next, Some(next) => {
2164                    assert_eq!(next, 42);
2165                });
2166                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2167                    assert_eq!(items.len(), 2);
2168                    check_test_event(&items[0], "brigand du jorat");
2169                    check_test_event(&items[1], "morbier");
2170                });
2171            });
2172        }
2173    }
2174}
2175
2176#[cfg(test)]
2177mod encrypted_tests {
2178    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2179
2180    use matrix_sdk_base::{
2181        event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2182        event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2183    };
2184    use once_cell::sync::Lazy;
2185    use tempfile::{tempdir, TempDir};
2186
2187    use super::SqliteEventCacheStore;
2188
2189    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2190    static NUM: AtomicU32 = AtomicU32::new(0);
2191
2192    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2193        let name = NUM.fetch_add(1, SeqCst).to_string();
2194        let tmpdir_path = TMP_DIR.path().join(name);
2195
2196        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2197
2198        Ok(SqliteEventCacheStore::open(
2199            tmpdir_path.to_str().unwrap(),
2200            Some("default_test_password"),
2201        )
2202        .await
2203        .unwrap())
2204    }
2205
2206    event_cache_store_integration_tests!();
2207    event_cache_store_integration_tests_time!();
2208    event_cache_store_media_integration_tests!();
2209}