matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{
25            compute_filters_string, extract_event_relation,
26            media::{
27                EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28                MediaService,
29            },
30            EventCacheStore,
31        },
32        Event, Gap,
33    },
34    linked_chunk::{
35        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
36        Position, RawChunk, Update,
37    },
38    media::{MediaRequestParameters, UniqueKey},
39    timer,
40};
41use matrix_sdk_store_encryption::StoreCipher;
42use ruma::{
43    events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
44    OwnedEventId, RoomId,
45};
46use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
47use tokio::{
48    fs,
49    sync::{Mutex, OwnedMutexGuard},
50};
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54    error::{Error, Result},
55    utils::{
56        repeat_vars, time_to_timestamp, EncryptableStore, Key, SqliteAsyncConnExt,
57        SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt,
58    },
59    OpenStoreError, SqliteStoreConfig,
60};
61
62mod keys {
63    // Entries in Key-value store
64    pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
65    pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
66
67    // Tables
68    pub const LINKED_CHUNKS: &str = "linked_chunks";
69    pub const MEDIA: &str = "media";
70}
71
72/// The database name.
73const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
74
75/// Identifier of the latest database version.
76///
77/// This is used to figure whether the SQLite database requires a migration.
78/// Every new SQL migration should imply a bump of this number, and changes in
79/// the [`run_migrations`] function.
80const DATABASE_VERSION: u8 = 9;
81
82/// The string used to identify a chunk of type events, in the `type` field in
83/// the database.
84const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
85/// The string used to identify a chunk of type gap, in the `type` field in the
86/// database.
87const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
88
89/// An SQLite-based event cache store.
90#[derive(Clone)]
91pub struct SqliteEventCacheStore {
92    store_cipher: Option<Arc<StoreCipher>>,
93
94    /// The pool of connections.
95    pool: SqlitePool,
96
97    /// We make the difference between connections for read operations, and for
98    /// write operations. We keep a single connection apart from write
99    /// operations. All other connections are used for read operations. The
100    /// lock is used to ensure there is one owner at a time.
101    write_connection: Arc<Mutex<SqliteAsyncConn>>,
102
103    media_service: MediaService,
104}
105
106#[cfg(not(tarpaulin_include))]
107impl fmt::Debug for SqliteEventCacheStore {
108    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
110    }
111}
112
113impl EncryptableStore for SqliteEventCacheStore {
114    fn get_cypher(&self) -> Option<&StoreCipher> {
115        self.store_cipher.as_deref()
116    }
117}
118
119impl SqliteEventCacheStore {
120    /// Open the SQLite-based event cache store at the given path using the
121    /// given passphrase to encrypt private data.
122    pub async fn open(
123        path: impl AsRef<Path>,
124        passphrase: Option<&str>,
125    ) -> Result<Self, OpenStoreError> {
126        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
127    }
128
129    /// Open the SQLite-based event cache store with the config open config.
130    #[instrument(skip(config), fields(path = ?config.path))]
131    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
132        debug!(?config);
133
134        let _timer = timer!("open_with_config");
135
136        let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
137
138        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
139
140        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
141        config.pool = Some(pool_config);
142
143        let pool = config.create_pool(Runtime::Tokio1)?;
144
145        let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
146        this.write().await?.apply_runtime_config(runtime_config).await?;
147
148        Ok(this)
149    }
150
151    /// Open an SQLite-based event cache store using the given SQLite database
152    /// pool. The given passphrase will be used to encrypt private data.
153    async fn open_with_pool(
154        pool: SqlitePool,
155        passphrase: Option<&str>,
156    ) -> Result<Self, OpenStoreError> {
157        let conn = pool.get().await?;
158
159        let version = conn.db_version().await?;
160        run_migrations(&conn, version).await?;
161
162        let store_cipher = match passphrase {
163            Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
164            None => None,
165        };
166
167        let media_service = MediaService::new();
168        let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
169        let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
170        media_service.restore(media_retention_policy, last_media_cleanup_time);
171
172        Ok(Self {
173            store_cipher,
174            pool,
175            // Use `conn` as our selected write connections.
176            write_connection: Arc::new(Mutex::new(conn)),
177            media_service,
178        })
179    }
180
181    // Acquire a connection for executing read operations.
182    #[instrument(skip_all)]
183    async fn read(&self) -> Result<SqliteAsyncConn> {
184        trace!("Taking a `read` connection");
185        let _timer = timer!("connection");
186
187        let connection = self.pool.get().await?;
188
189        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
190        // support must be enabled on a per-connection basis. Execute it every
191        // time we try to get a connection, since we can't guarantee a previous
192        // connection did enable it before.
193        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
194
195        Ok(connection)
196    }
197
198    // Acquire a connection for executing write operations.
199    #[instrument(skip_all)]
200    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
201        trace!("Taking a `write` connection");
202        let _timer = timer!("connection");
203
204        let connection = self.write_connection.clone().lock_owned().await;
205
206        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
207        // support must be enabled on a per-connection basis. Execute it every
208        // time we try to get a connection, since we can't guarantee a previous
209        // connection did enable it before.
210        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
211
212        Ok(connection)
213    }
214
215    fn map_row_to_chunk(
216        row: &rusqlite::Row<'_>,
217    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
218        Ok((
219            row.get::<_, u64>(0)?,
220            row.get::<_, Option<u64>>(1)?,
221            row.get::<_, Option<u64>>(2)?,
222            row.get::<_, String>(3)?,
223        ))
224    }
225
226    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
227        let serialized = serde_json::to_vec(event)?;
228
229        // Extract the relationship info here.
230        let raw_event = event.raw();
231        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
232
233        // The content may be encrypted.
234        let content = self.encode_value(serialized)?;
235
236        Ok(EncodedEvent {
237            content,
238            rel_type,
239            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
240        })
241    }
242}
243
244struct EncodedEvent {
245    content: Vec<u8>,
246    rel_type: Option<String>,
247    relates_to: Option<String>,
248}
249
250trait TransactionExtForLinkedChunks {
251    fn rebuild_chunk(
252        &self,
253        store: &SqliteEventCacheStore,
254        linked_chunk_id: &Key,
255        previous: Option<u64>,
256        index: u64,
257        next: Option<u64>,
258        chunk_type: &str,
259    ) -> Result<RawChunk<Event, Gap>>;
260
261    fn load_gap_content(
262        &self,
263        store: &SqliteEventCacheStore,
264        linked_chunk_id: &Key,
265        chunk_id: ChunkIdentifier,
266    ) -> Result<Gap>;
267
268    fn load_events_content(
269        &self,
270        store: &SqliteEventCacheStore,
271        linked_chunk_id: &Key,
272        chunk_id: ChunkIdentifier,
273    ) -> Result<Vec<Event>>;
274}
275
276impl TransactionExtForLinkedChunks for Transaction<'_> {
277    fn rebuild_chunk(
278        &self,
279        store: &SqliteEventCacheStore,
280        linked_chunk_id: &Key,
281        previous: Option<u64>,
282        id: u64,
283        next: Option<u64>,
284        chunk_type: &str,
285    ) -> Result<RawChunk<Event, Gap>> {
286        let previous = previous.map(ChunkIdentifier::new);
287        let next = next.map(ChunkIdentifier::new);
288        let id = ChunkIdentifier::new(id);
289
290        match chunk_type {
291            CHUNK_TYPE_GAP_TYPE_STRING => {
292                // It's a gap!
293                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
294                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
295            }
296
297            CHUNK_TYPE_EVENT_TYPE_STRING => {
298                // It's events!
299                let events = self.load_events_content(store, linked_chunk_id, id)?;
300                Ok(RawChunk {
301                    content: ChunkContent::Items(events),
302                    previous,
303                    identifier: id,
304                    next,
305                })
306            }
307
308            other => {
309                // It's an error!
310                Err(Error::InvalidData {
311                    details: format!("a linked chunk has an unknown type {other}"),
312                })
313            }
314        }
315    }
316
317    fn load_gap_content(
318        &self,
319        store: &SqliteEventCacheStore,
320        linked_chunk_id: &Key,
321        chunk_id: ChunkIdentifier,
322    ) -> Result<Gap> {
323        // There's at most one row for it in the database, so a call to `query_row` is
324        // sufficient.
325        let encoded_prev_token: Vec<u8> = self.query_row(
326            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
327            (chunk_id.index(), &linked_chunk_id),
328            |row| row.get(0),
329        )?;
330        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
331        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
332        Ok(Gap { prev_token })
333    }
334
335    fn load_events_content(
336        &self,
337        store: &SqliteEventCacheStore,
338        linked_chunk_id: &Key,
339        chunk_id: ChunkIdentifier,
340    ) -> Result<Vec<Event>> {
341        // Retrieve all the events from the database.
342        let mut events = Vec::new();
343
344        for event_data in self
345            .prepare(
346                r#"
347                    SELECT events.content
348                    FROM event_chunks ec, events
349                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
350                    ORDER BY ec.position ASC
351                "#,
352            )?
353            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
354        {
355            let encoded_content = event_data?;
356            let serialized_content = store.decode_value(&encoded_content)?;
357            let event = serde_json::from_slice(&serialized_content)?;
358
359            events.push(event);
360        }
361
362        Ok(events)
363    }
364}
365
366/// Run migrations for the given version of the database.
367async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
368    if version == 0 {
369        debug!("Creating database");
370    } else if version < DATABASE_VERSION {
371        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
372    } else {
373        return Ok(());
374    }
375
376    // Always enable foreign keys for the current connection.
377    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
378
379    if version < 1 {
380        // First turn on WAL mode, this can't be done in the transaction, it fails with
381        // the error message: "cannot change into wal mode from within a transaction".
382        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
383        conn.with_transaction(|txn| {
384            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
385            txn.set_db_version(1)
386        })
387        .await?;
388    }
389
390    if version < 2 {
391        conn.with_transaction(|txn| {
392            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
393            txn.set_db_version(2)
394        })
395        .await?;
396    }
397
398    if version < 3 {
399        conn.with_transaction(|txn| {
400            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
401            txn.set_db_version(3)
402        })
403        .await?;
404    }
405
406    if version < 4 {
407        conn.with_transaction(|txn| {
408            txn.execute_batch(include_str!(
409                "../migrations/event_cache_store/004_ignore_policy.sql"
410            ))?;
411            txn.set_db_version(4)
412        })
413        .await?;
414    }
415
416    if version < 5 {
417        conn.with_transaction(|txn| {
418            txn.execute_batch(include_str!(
419                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
420            ))?;
421            txn.set_db_version(5)
422        })
423        .await?;
424    }
425
426    if version < 6 {
427        conn.with_transaction(|txn| {
428            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
429            txn.set_db_version(6)
430        })
431        .await?;
432    }
433
434    if version < 7 {
435        conn.with_transaction(|txn| {
436            txn.execute_batch(include_str!(
437                "../migrations/event_cache_store/007_event_chunks.sql"
438            ))?;
439            txn.set_db_version(7)
440        })
441        .await?;
442    }
443
444    if version < 8 {
445        conn.with_transaction(|txn| {
446            txn.execute_batch(include_str!(
447                "../migrations/event_cache_store/008_linked_chunk_id.sql"
448            ))?;
449            txn.set_db_version(8)
450        })
451        .await?;
452    }
453
454    if version < 9 {
455        conn.with_transaction(|txn| {
456            txn.execute_batch(include_str!(
457                "../migrations/event_cache_store/009_related_event_index.sql"
458            ))?;
459            txn.set_db_version(9)
460        })
461        .await?;
462    }
463
464    Ok(())
465}
466
467#[async_trait]
468impl EventCacheStore for SqliteEventCacheStore {
469    type Error = Error;
470
471    #[instrument(skip(self))]
472    async fn try_take_leased_lock(
473        &self,
474        lease_duration_ms: u32,
475        key: &str,
476        holder: &str,
477    ) -> Result<bool> {
478        let _timer = timer!("method");
479
480        let key = key.to_owned();
481        let holder = holder.to_owned();
482
483        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
484        let expiration = now + lease_duration_ms as u64;
485
486        let num_touched = self
487            .write()
488            .await?
489            .with_transaction(move |txn| {
490                txn.execute(
491                    "INSERT INTO lease_locks (key, holder, expiration)
492                    VALUES (?1, ?2, ?3)
493                    ON CONFLICT (key)
494                    DO
495                        UPDATE SET holder = ?2, expiration = ?3
496                        WHERE holder = ?2
497                        OR expiration < ?4
498                ",
499                    (key, holder, expiration, now),
500                )
501            })
502            .await?;
503
504        Ok(num_touched == 1)
505    }
506
507    #[instrument(skip(self, updates))]
508    async fn handle_linked_chunk_updates(
509        &self,
510        linked_chunk_id: LinkedChunkId<'_>,
511        updates: Vec<Update<Event, Gap>>,
512    ) -> Result<(), Self::Error> {
513        let _timer = timer!("method");
514
515        // Use a single transaction throughout this function, so that either all updates
516        // work, or none is taken into account.
517        let hashed_linked_chunk_id =
518            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
519        let linked_chunk_id = linked_chunk_id.to_owned();
520        let this = self.clone();
521
522        with_immediate_transaction(self, move |txn| {
523            for up in updates {
524                match up {
525                    Update::NewItemsChunk { previous, new, next } => {
526                        let previous = previous.as_ref().map(ChunkIdentifier::index);
527                        let new = new.index();
528                        let next = next.as_ref().map(ChunkIdentifier::index);
529
530                        trace!(
531                            %linked_chunk_id,
532                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
533                        );
534
535                        insert_chunk(
536                            txn,
537                            &hashed_linked_chunk_id,
538                            previous,
539                            new,
540                            next,
541                            CHUNK_TYPE_EVENT_TYPE_STRING,
542                        )?;
543                    }
544
545                    Update::NewGapChunk { previous, new, next, gap } => {
546                        let serialized = serde_json::to_vec(&gap.prev_token)?;
547                        let prev_token = this.encode_value(serialized)?;
548
549                        let previous = previous.as_ref().map(ChunkIdentifier::index);
550                        let new = new.index();
551                        let next = next.as_ref().map(ChunkIdentifier::index);
552
553                        trace!(
554                            %linked_chunk_id,
555                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
556                        );
557
558                        // Insert the chunk as a gap.
559                        insert_chunk(
560                            txn,
561                            &hashed_linked_chunk_id,
562                            previous,
563                            new,
564                            next,
565                            CHUNK_TYPE_GAP_TYPE_STRING,
566                        )?;
567
568                        // Insert the gap's value.
569                        txn.execute(
570                            r#"
571                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
572                            VALUES (?, ?, ?)
573                        "#,
574                            (new, &hashed_linked_chunk_id, prev_token),
575                        )?;
576                    }
577
578                    Update::RemoveChunk(chunk_identifier) => {
579                        let chunk_id = chunk_identifier.index();
580
581                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
582
583                        // Find chunk to delete.
584                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
585                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
586                            (chunk_id, &hashed_linked_chunk_id),
587                            |row| Ok((row.get(0)?, row.get(1)?))
588                        )?;
589
590                        // Replace its previous' next to its own next.
591                        if let Some(previous) = previous {
592                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
593                        }
594
595                        // Replace its next' previous to its own previous.
596                        if let Some(next) = next {
597                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
598                        }
599
600                        // Now delete it, and let cascading delete corresponding entries in the
601                        // other data tables.
602                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
603                    }
604
605                    Update::PushItems { at, items } => {
606                        if items.is_empty() {
607                            // Should never happens, but better be safe.
608                            continue;
609                        }
610
611                        let chunk_id = at.chunk_identifier().index();
612
613                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
614
615                        let mut chunk_statement = txn.prepare(
616                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
617                        )?;
618
619                        // Note: we use `OR REPLACE` here, because the event might have been
620                        // already inserted in the database. This is the case when an event is
621                        // deduplicated and moved to another position; or because it was inserted
622                        // outside the context of a linked chunk (e.g. pinned event).
623                        let mut content_statement = txn.prepare(
624                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
625                        )?;
626
627                        let invalid_event = |event: TimelineEvent| {
628                            let Some(event_id) = event.event_id() else {
629                                error!(%linked_chunk_id, "Trying to push an event with no ID");
630                                return None;
631                            };
632
633                            Some((event_id.to_string(), event))
634                        };
635
636                        let room_id = linked_chunk_id.room_id();
637                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
638
639                        for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
640                            // Insert the location information into the database.
641                            let index = at.index() + i;
642                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
643
644                            // Now, insert the event content into the database.
645                            let encoded_event = this.encode_event(&event)?;
646                            content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
647                        }
648                    }
649
650                    Update::ReplaceItem { at, item: event } => {
651                        let chunk_id = at.chunk_identifier().index();
652
653                        let index = at.index();
654
655                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
656
657                        // The event id should be the same, but just in case it changed…
658                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
659                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
660                            continue;
661                        };
662
663                        // Replace the event's content. Really we'd like to update, but in case the
664                        // event id changed, we are a bit lenient here and will allow an insertion
665                        // of the new event.
666                        let encoded_event = this.encode_event(&event)?;
667                        let room_id = linked_chunk_id.room_id();
668                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
669                        txn.execute(
670                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
671                        , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
672
673                        // Replace the event id in the linked chunk, in case it changed.
674                        txn.execute(
675                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
676                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
677                        )?;
678                    }
679
680                    Update::RemoveItem { at } => {
681                        let chunk_id = at.chunk_identifier().index();
682                        let index = at.index();
683
684                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
685
686                        // Remove the entry in the chunk table.
687                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
688
689                        // Decrement the index of each item after the one we are
690                        // going to remove.
691                        //
692                        // Imagine we have the following events:
693                        //
694                        // | event_id | linked_chunk_id | chunk_id | position |
695                        // |----------|-----------------|----------|----------|
696                        // | $ev0     | !r0             | 42       | 0        |
697                        // | $ev1     | !r0             | 42       | 1        |
698                        // | $ev2     | !r0             | 42       | 2        |
699                        // | $ev3     | !r0             | 42       | 3        |
700                        // | $ev4     | !r0             | 42       | 4        |
701                        // 
702                        // `$ev2` has been removed, then we end up in this
703                        // state:
704                        //
705                        // | event_id | linked_chunk_id    | chunk_id | position |
706                        // |----------|--------------------|----------|----------|
707                        // | $ev0     | !r0                | 42       | 0        |
708                        // | $ev1     | !r0                | 42       | 1        |
709                        // |          |                    |          |          | <- no more `$ev2`
710                        // | $ev3     | !r0                | 42       | 3        |
711                        // | $ev4     | !r0                | 42       | 4        |
712                        //
713                        // We need to shift the `position` of `$ev3` and `$ev4`
714                        // to `position - 1`, like so:
715                        // 
716                        // | event_id | linked_chunk_id | chunk_id | position |
717                        // |----------|-----------------|----------|----------|
718                        // | $ev0     | !r0             | 42       | 0        |
719                        // | $ev1     | !r0             | 42       | 1        |
720                        // | $ev3     | !r0             | 42       | 2        |
721                        // | $ev4     | !r0             | 42       | 3        |
722                        //
723                        // Usually, it boils down to run the following query:
724                        //
725                        // ```sql
726                        // UPDATE event_chunks
727                        // SET position = position - 1
728                        // WHERE position > 2 AND …
729                        // ```
730                        //
731                        // Okay. But `UPDATE` runs on rows in no particular
732                        // order. It means that it can update `$ev4` before
733                        // `$ev3` for example. What happens in this particular
734                        // case? The `position` of `$ev4` becomes `3`, however
735                        // `$ev3` already has `position = 3`. Because there
736                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
737                        // position)`, it will result in a constraint violation.
738                        //
739                        // There is **no way** to control the execution order of
740                        // `UPDATE` in SQLite. To persuade yourself, try:
741                        //
742                        // ```sql
743                        // UPDATE event_chunks
744                        // SET position = position - 1
745                        // FROM (
746                        //     SELECT event_id
747                        //     FROM event_chunks
748                        //     WHERE position > 2 AND …
749                        //     ORDER BY position ASC
750                        // ) as ordered
751                        // WHERE event_chunks.event_id = ordered.event_id
752                        // ```
753                        //
754                        // It will fail the same way.
755                        //
756                        // Thus, we have 2 solutions:
757                        //
758                        // 1. Remove the `UNIQUE` constraint,
759                        // 2. Be creative.
760                        //
761                        // The `UNIQUE` constraint is a safe belt. Normally, we
762                        // have `event_cache::Deduplicator` that is responsible
763                        // to ensure there is no duplicated event. However,
764                        // relying on this is “fragile” in the sense it can
765                        // contain bugs. Relying on the `UNIQUE` constraint from
766                        // SQLite is more robust. It's “braces and belt” as we
767                        // say here.
768                        //
769                        // So. We need to be creative.
770                        //
771                        // Many solutions exist. Amongst the most popular, we
772                        // see _dropping and re-creating the index_, which is
773                        // no-go for us, it's too expensive. I (@hywan) have
774                        // adopted the following one:
775                        // 
776                        // - Do `position = position - 1` but in the negative
777                        //   space, so `position = -(position - 1)`. A position
778                        //   cannot be negative; we are sure it is unique!
779                        // - Once all candidate rows are updated, do `position =
780                        //   -position` to move back to the positive space.
781                        //
782                        // 'told you it's gonna be creative.
783                        //
784                        // This solution is a hack, **but** it is a small
785                        // number of operations, and we can keep the `UNIQUE`
786                        // constraint in place.
787                        txn.execute(
788                            r#"
789                                UPDATE event_chunks
790                                SET position = -(position - 1)
791                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
792                            "#,
793                            (&hashed_linked_chunk_id, chunk_id, index)
794                        )?;
795                        txn.execute(
796                            r#"
797                                UPDATE event_chunks
798                                SET position = -position
799                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
800                            "#,
801                            (&hashed_linked_chunk_id, chunk_id)
802                        )?;
803                    }
804
805                    Update::DetachLastItems { at } => {
806                        let chunk_id = at.chunk_identifier().index();
807                        let index = at.index();
808
809                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
810
811                        // Remove these entries.
812                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
813                    }
814
815                    Update::Clear => {
816                        trace!(%linked_chunk_id, "clearing items");
817
818                        // Remove chunks, and let cascading do its job.
819                        txn.execute(
820                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
821                            (&hashed_linked_chunk_id,),
822                        )?;
823                    }
824
825                    Update::StartReattachItems | Update::EndReattachItems => {
826                        // Nothing.
827                    }
828                }
829            }
830
831            Ok(())
832        })
833        .await?;
834
835        Ok(())
836    }
837
838    #[instrument(skip(self))]
839    async fn load_all_chunks(
840        &self,
841        linked_chunk_id: LinkedChunkId<'_>,
842    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
843        let _timer = timer!("method");
844
845        let hashed_linked_chunk_id =
846            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
847
848        let this = self.clone();
849
850        let result = self
851            .read()
852            .await?
853            .with_transaction(move |txn| -> Result<_> {
854                let mut items = Vec::new();
855
856                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
857                for data in txn
858                    .prepare(
859                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
860                    )?
861                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
862                {
863                    let (id, previous, next, chunk_type) = data?;
864                    let new = txn.rebuild_chunk(
865                        &this,
866                        &hashed_linked_chunk_id,
867                        previous,
868                        id,
869                        next,
870                        chunk_type.as_str(),
871                    )?;
872                    items.push(new);
873                }
874
875                Ok(items)
876            })
877            .await?;
878
879        Ok(result)
880    }
881
882    #[instrument(skip(self))]
883    async fn load_all_chunks_metadata(
884        &self,
885        linked_chunk_id: LinkedChunkId<'_>,
886    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
887        let _timer = timer!("method");
888
889        let hashed_linked_chunk_id =
890            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
891
892        self.read()
893            .await?
894            .with_transaction(move |txn| -> Result<_> {
895                // We want to collect the metadata about each chunk (id, next, previous), and
896                // for event chunks, the number of events in it. For gaps, the
897                // number of events is 0, by convention.
898                //
899                // We've tried different strategies over time:
900                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
901                //   caused a full table traversal for each chunk, including for gaps which
902                //   don't have any events. This happened in
903                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
904                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
905                //   additional `SELECT` query. It was an immense improvement, but still caused
906                //   one select query per event chunk. This happened in
907                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
908                //
909                // The current solution is to run two queries:
910                // - one to get each chunk and its number of events, by doing a single `SELECT`
911                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
912                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
913                //   hashmap.
914                // - one to get each chunk's metadata (id, previous, next, type) from the
915                //   database with a `SELECT`, and then use the hashmap to get the number of
916                //   events.
917                //
918                // This strategy minimizes the number of queries to the database, and keeps them
919                // super simple, while doing a bit more processing here, which is much faster.
920
921                let num_events_by_chunk_ids = txn
922                    .prepare(
923                        r#"
924                            SELECT ec.chunk_id, COUNT(ec.event_id)
925                            FROM event_chunks as ec
926                            WHERE ec.linked_chunk_id = ?
927                            GROUP BY ec.chunk_id
928                        "#,
929                    )?
930                    .query_map((&hashed_linked_chunk_id,), |row| {
931                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
932                    })?
933                    .collect::<Result<HashMap<_, _>, _>>()?;
934
935                txn.prepare(
936                    r#"
937                        SELECT
938                            lc.id,
939                            lc.previous,
940                            lc.next,
941                            lc.type
942                        FROM linked_chunks as lc
943                        WHERE lc.linked_chunk_id = ?
944                        ORDER BY lc.id"#,
945                )?
946                .query_map((&hashed_linked_chunk_id,), |row| {
947                    Ok((
948                        row.get::<_, u64>(0)?,
949                        row.get::<_, Option<u64>>(1)?,
950                        row.get::<_, Option<u64>>(2)?,
951                        row.get::<_, String>(3)?,
952                    ))
953                })?
954                .map(|data| -> Result<_> {
955                    let (id, previous, next, chunk_type) = data?;
956
957                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
958                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
959                    // benchmarking shows that this is slightly slower than matching the chunk
960                    // type (around 1%, so in the realm of noise), so we keep the explicit
961                    // check instead.
962                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
963                        0
964                    } else {
965                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
966                    };
967
968                    Ok(ChunkMetadata {
969                        identifier: ChunkIdentifier::new(id),
970                        previous: previous.map(ChunkIdentifier::new),
971                        next: next.map(ChunkIdentifier::new),
972                        num_items,
973                    })
974                })
975                .collect::<Result<Vec<_>, _>>()
976            })
977            .await
978    }
979
980    #[instrument(skip(self))]
981    async fn load_last_chunk(
982        &self,
983        linked_chunk_id: LinkedChunkId<'_>,
984    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
985        let _timer = timer!("method");
986
987        let hashed_linked_chunk_id =
988            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
989
990        let this = self.clone();
991
992        self
993            .read()
994            .await?
995            .with_transaction(move |txn| -> Result<_> {
996                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
997                let (chunk_identifier_generator, number_of_chunks) = txn
998                    .prepare(
999                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1000                    )?
1001                    .query_row(
1002                        (&hashed_linked_chunk_id,),
1003                        |row| {
1004                            Ok((
1005                                // Read the `MAX(id)` as an `Option<u64>` instead
1006                                // of `u64` in case the `SELECT` returns nothing.
1007                                // Indeed, if it returns no line, the `MAX(id)` is
1008                                // set to `Null`.
1009                                row.get::<_, Option<u64>>(0)?,
1010                                row.get::<_, u64>(1)?,
1011                            ))
1012                        }
1013                    )?;
1014
1015                let chunk_identifier_generator = match chunk_identifier_generator {
1016                    Some(last_chunk_identifier) => {
1017                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1018                            ChunkIdentifier::new(last_chunk_identifier)
1019                        )
1020                    },
1021                    None => ChunkIdentifierGenerator::new_from_scratch(),
1022                };
1023
1024                // Find the last chunk.
1025                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1026                    .prepare(
1027                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1028                    )?
1029                    .query_row(
1030                        (&hashed_linked_chunk_id,),
1031                        |row| {
1032                            Ok((
1033                                row.get::<_, u64>(0)?,
1034                                row.get::<_, Option<u64>>(1)?,
1035                                row.get::<_, String>(2)?,
1036                            ))
1037                        }
1038                    )
1039                    .optional()?
1040                else {
1041                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1042                    // good.
1043                    if number_of_chunks == 0 {
1044                        return Ok((None, chunk_identifier_generator));
1045                    }
1046                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1047                    // linked chunk is malformed.
1048                    //
1049                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1050                    else {
1051                        return Err(Error::InvalidData {
1052                            details:
1053                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1054                                    .to_owned()
1055                            }
1056                        )
1057                    }
1058                };
1059
1060                // Build the chunk.
1061                let last_chunk = txn.rebuild_chunk(
1062                    &this,
1063                    &hashed_linked_chunk_id,
1064                    previous_chunk,
1065                    chunk_identifier,
1066                    None,
1067                    &chunk_type
1068                )?;
1069
1070                Ok((Some(last_chunk), chunk_identifier_generator))
1071            })
1072            .await
1073    }
1074
1075    #[instrument(skip(self))]
1076    async fn load_previous_chunk(
1077        &self,
1078        linked_chunk_id: LinkedChunkId<'_>,
1079        before_chunk_identifier: ChunkIdentifier,
1080    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1081        let _timer = timer!("method");
1082
1083        let hashed_linked_chunk_id =
1084            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1085
1086        let this = self.clone();
1087
1088        self
1089            .read()
1090            .await?
1091            .with_transaction(move |txn| -> Result<_> {
1092                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1093                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1094                    .prepare(
1095                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1096                    )?
1097                    .query_row(
1098                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1099                        |row| {
1100                            Ok((
1101                                row.get::<_, u64>(0)?,
1102                                row.get::<_, Option<u64>>(1)?,
1103                                row.get::<_, Option<u64>>(2)?,
1104                                row.get::<_, String>(3)?,
1105                            ))
1106                        }
1107                    )
1108                    .optional()?
1109                else {
1110                    // Chunk is not found.
1111                    return Ok(None);
1112                };
1113
1114                // Build the chunk.
1115                let last_chunk = txn.rebuild_chunk(
1116                    &this,
1117                    &hashed_linked_chunk_id,
1118                    previous_chunk,
1119                    chunk_identifier,
1120                    next_chunk,
1121                    &chunk_type
1122                )?;
1123
1124                Ok(Some(last_chunk))
1125            })
1126            .await
1127    }
1128
1129    #[instrument(skip(self))]
1130    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1131        let _timer = timer!("method");
1132
1133        self.write()
1134            .await?
1135            .with_transaction(move |txn| {
1136                // Remove all the chunks, and let cascading do its job.
1137                txn.execute("DELETE FROM linked_chunks", ())?;
1138                // Also clear all the events' contents.
1139                txn.execute("DELETE FROM events", ())
1140            })
1141            .await?;
1142
1143        Ok(())
1144    }
1145
1146    #[instrument(skip(self, events))]
1147    async fn filter_duplicated_events(
1148        &self,
1149        linked_chunk_id: LinkedChunkId<'_>,
1150        events: Vec<OwnedEventId>,
1151    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1152        let _timer = timer!("method");
1153
1154        // If there's no events for which we want to check duplicates, we can return
1155        // early. It's not only an optimization to do so: it's required, otherwise the
1156        // `repeat_vars` call below will panic.
1157        if events.is_empty() {
1158            return Ok(Vec::new());
1159        }
1160
1161        // Select all events that exist in the store, i.e. the duplicates.
1162        let hashed_linked_chunk_id =
1163            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1164        let linked_chunk_id = linked_chunk_id.to_owned();
1165
1166        self.read()
1167            .await?
1168            .with_transaction(move |txn| -> Result<_> {
1169                txn.chunk_large_query_over(events, None, move |txn, events| {
1170                    let query = format!(
1171                        r#"
1172                            SELECT event_id, chunk_id, position
1173                            FROM event_chunks
1174                            WHERE linked_chunk_id = ? AND event_id IN ({})
1175                            ORDER BY chunk_id ASC, position ASC
1176                        "#,
1177                        repeat_vars(events.len()),
1178                    );
1179
1180                    let parameters = params_from_iter(
1181                        // parameter for `linked_chunk_id = ?`
1182                        once(
1183                            hashed_linked_chunk_id
1184                                .to_sql()
1185                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1186                                .unwrap(),
1187                        )
1188                        // parameters for `event_id IN (…)`
1189                        .chain(events.iter().map(|event| {
1190                            event
1191                                .as_str()
1192                                .to_sql()
1193                                // SAFETY: it cannot fail since `str::to_sql` never fails
1194                                .unwrap()
1195                        })),
1196                    );
1197
1198                    let mut duplicated_events = Vec::new();
1199
1200                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1201                        Ok((
1202                            row.get::<_, String>(0)?,
1203                            row.get::<_, u64>(1)?,
1204                            row.get::<_, usize>(2)?,
1205                        ))
1206                    })? {
1207                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1208
1209                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1210                            // Normally unreachable, but the event ID has been stored even if it is
1211                            // malformed, let's skip it.
1212                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1213                            continue;
1214                        };
1215
1216                        duplicated_events.push((
1217                            duplicated_event,
1218                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1219                        ));
1220                    }
1221
1222                    Ok(duplicated_events)
1223                })
1224            })
1225            .await
1226    }
1227
1228    #[instrument(skip(self, event_id))]
1229    async fn find_event(
1230        &self,
1231        room_id: &RoomId,
1232        event_id: &EventId,
1233    ) -> Result<Option<Event>, Self::Error> {
1234        let _timer = timer!("method");
1235
1236        let event_id = event_id.to_owned();
1237        let this = self.clone();
1238
1239        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1240
1241        self.read()
1242            .await?
1243            .with_transaction(move |txn| -> Result<_> {
1244                let Some(event) = txn
1245                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1246                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1247                    .optional()?
1248                else {
1249                    // Event is not found.
1250                    return Ok(None);
1251                };
1252
1253                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1254
1255                Ok(Some(event))
1256            })
1257            .await
1258    }
1259
1260    #[instrument(skip(self, event_id, filters))]
1261    async fn find_event_relations(
1262        &self,
1263        room_id: &RoomId,
1264        event_id: &EventId,
1265        filters: Option<&[RelationType]>,
1266    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1267        let _timer = timer!("method");
1268
1269        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1270
1271        let hashed_linked_chunk_id =
1272            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1273
1274        let event_id = event_id.to_owned();
1275        let filters = filters.map(ToOwned::to_owned);
1276        let store = self.clone();
1277
1278        self.read()
1279            .await?
1280            .with_transaction(move |txn| -> Result<_> {
1281                find_event_relations_transaction(
1282                    store,
1283                    hashed_room_id,
1284                    hashed_linked_chunk_id,
1285                    event_id,
1286                    filters,
1287                    txn,
1288                )
1289            })
1290            .await
1291    }
1292
1293    #[instrument(skip(self, event))]
1294    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1295        let _timer = timer!("method");
1296
1297        let Some(event_id) = event.event_id() else {
1298            error!(%room_id, "Trying to save an event with no ID");
1299            return Ok(());
1300        };
1301
1302        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1303        let event_id = event_id.to_string();
1304        let encoded_event = self.encode_event(&event)?;
1305
1306        self.write()
1307            .await?
1308            .with_transaction(move |txn| -> Result<_> {
1309                txn.execute(
1310                    "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1311                    , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1312
1313                Ok(())
1314            })
1315            .await
1316    }
1317
1318    #[instrument(skip_all)]
1319    async fn add_media_content(
1320        &self,
1321        request: &MediaRequestParameters,
1322        content: Vec<u8>,
1323        ignore_policy: IgnoreMediaRetentionPolicy,
1324    ) -> Result<()> {
1325        let _timer = timer!("method");
1326
1327        self.media_service.add_media_content(self, request, content, ignore_policy).await
1328    }
1329
1330    #[instrument(skip_all)]
1331    async fn replace_media_key(
1332        &self,
1333        from: &MediaRequestParameters,
1334        to: &MediaRequestParameters,
1335    ) -> Result<(), Self::Error> {
1336        let _timer = timer!("method");
1337
1338        let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1339        let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1340
1341        let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1342        let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1343
1344        let conn = self.write().await?;
1345        conn.execute(
1346            r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1347            (new_uri, new_format, prev_uri, prev_format),
1348        )
1349        .await?;
1350
1351        Ok(())
1352    }
1353
1354    #[instrument(skip_all)]
1355    async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1356        let _timer = timer!("method");
1357
1358        self.media_service.get_media_content(self, request).await
1359    }
1360
1361    #[instrument(skip_all)]
1362    async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1363        let _timer = timer!("method");
1364
1365        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1366        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1367
1368        let conn = self.write().await?;
1369        conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1370
1371        Ok(())
1372    }
1373
1374    #[instrument(skip(self))]
1375    async fn get_media_content_for_uri(
1376        &self,
1377        uri: &MxcUri,
1378    ) -> Result<Option<Vec<u8>>, Self::Error> {
1379        let _timer = timer!("method");
1380
1381        self.media_service.get_media_content_for_uri(self, uri).await
1382    }
1383
1384    #[instrument(skip(self))]
1385    async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1386        let _timer = timer!("method");
1387
1388        let uri = self.encode_key(keys::MEDIA, uri);
1389
1390        let conn = self.write().await?;
1391        conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1392
1393        Ok(())
1394    }
1395
1396    #[instrument(skip_all)]
1397    async fn set_media_retention_policy(
1398        &self,
1399        policy: MediaRetentionPolicy,
1400    ) -> Result<(), Self::Error> {
1401        let _timer = timer!("method");
1402
1403        self.media_service.set_media_retention_policy(self, policy).await
1404    }
1405
1406    #[instrument(skip_all)]
1407    fn media_retention_policy(&self) -> MediaRetentionPolicy {
1408        let _timer = timer!("method");
1409
1410        self.media_service.media_retention_policy()
1411    }
1412
1413    #[instrument(skip_all)]
1414    async fn set_ignore_media_retention_policy(
1415        &self,
1416        request: &MediaRequestParameters,
1417        ignore_policy: IgnoreMediaRetentionPolicy,
1418    ) -> Result<(), Self::Error> {
1419        let _timer = timer!("method");
1420
1421        self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1422    }
1423
1424    #[instrument(skip_all)]
1425    async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1426        let _timer = timer!("method");
1427
1428        self.media_service.clean_up_media_cache(self).await
1429    }
1430}
1431
1432#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1433#[cfg_attr(not(target_family = "wasm"), async_trait)]
1434impl EventCacheStoreMedia for SqliteEventCacheStore {
1435    type Error = Error;
1436
1437    async fn media_retention_policy_inner(
1438        &self,
1439    ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1440        let conn = self.read().await?;
1441        conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1442    }
1443
1444    async fn set_media_retention_policy_inner(
1445        &self,
1446        policy: MediaRetentionPolicy,
1447    ) -> Result<(), Self::Error> {
1448        let conn = self.write().await?;
1449        conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1450        Ok(())
1451    }
1452
1453    async fn add_media_content_inner(
1454        &self,
1455        request: &MediaRequestParameters,
1456        data: Vec<u8>,
1457        last_access: SystemTime,
1458        policy: MediaRetentionPolicy,
1459        ignore_policy: IgnoreMediaRetentionPolicy,
1460    ) -> Result<(), Self::Error> {
1461        let ignore_policy = ignore_policy.is_yes();
1462        let data = self.encode_value(data)?;
1463
1464        if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1465            return Ok(());
1466        }
1467
1468        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1469        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1470        let timestamp = time_to_timestamp(last_access);
1471
1472        let conn = self.write().await?;
1473        conn.execute(
1474            "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1475            (uri, format, data, timestamp, ignore_policy),
1476        )
1477        .await?;
1478
1479        Ok(())
1480    }
1481
1482    async fn set_ignore_media_retention_policy_inner(
1483        &self,
1484        request: &MediaRequestParameters,
1485        ignore_policy: IgnoreMediaRetentionPolicy,
1486    ) -> Result<(), Self::Error> {
1487        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1488        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1489        let ignore_policy = ignore_policy.is_yes();
1490
1491        let conn = self.write().await?;
1492        conn.execute(
1493            r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1494            (ignore_policy, uri, format),
1495        )
1496        .await?;
1497
1498        Ok(())
1499    }
1500
1501    async fn get_media_content_inner(
1502        &self,
1503        request: &MediaRequestParameters,
1504        current_time: SystemTime,
1505    ) -> Result<Option<Vec<u8>>, Self::Error> {
1506        let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1507        let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1508        let timestamp = time_to_timestamp(current_time);
1509
1510        let conn = self.write().await?;
1511        let data = conn
1512            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1513                // Update the last access.
1514                // We need to do this first so the transaction is in write mode right away.
1515                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1516                txn.execute(
1517                    "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1518                    (timestamp, &uri, &format),
1519                )?;
1520
1521                txn.query_row::<Vec<u8>, _, _>(
1522                    "SELECT data FROM media WHERE uri = ? AND format = ?",
1523                    (&uri, &format),
1524                    |row| row.get(0),
1525                )
1526                .optional()
1527            })
1528            .await?;
1529
1530        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1531    }
1532
1533    async fn get_media_content_for_uri_inner(
1534        &self,
1535        uri: &MxcUri,
1536        current_time: SystemTime,
1537    ) -> Result<Option<Vec<u8>>, Self::Error> {
1538        let uri = self.encode_key(keys::MEDIA, uri);
1539        let timestamp = time_to_timestamp(current_time);
1540
1541        let conn = self.write().await?;
1542        let data = conn
1543            .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1544                // Update the last access.
1545                // We need to do this first so the transaction is in write mode right away.
1546                // See: https://sqlite.org/lang_transaction.html#read_transactions_versus_write_transactions
1547                txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1548
1549                txn.query_row::<Vec<u8>, _, _>(
1550                    "SELECT data FROM media WHERE uri = ?",
1551                    (&uri,),
1552                    |row| row.get(0),
1553                )
1554                .optional()
1555            })
1556            .await?;
1557
1558        data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1559    }
1560
1561    async fn clean_up_media_cache_inner(
1562        &self,
1563        policy: MediaRetentionPolicy,
1564        current_time: SystemTime,
1565    ) -> Result<(), Self::Error> {
1566        if !policy.has_limitations() {
1567            // We can safely skip all the checks.
1568            return Ok(());
1569        }
1570
1571        let conn = self.write().await?;
1572        let removed = conn
1573            .with_transaction::<_, Error, _>(move |txn| {
1574                let mut removed = false;
1575
1576                // First, check media content that exceed the max filesize.
1577                if let Some(max_file_size) = policy.computed_max_file_size() {
1578                    let count = txn.execute(
1579                        "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1580                        (max_file_size,),
1581                    )?;
1582
1583                    if count > 0 {
1584                        removed = true;
1585                    }
1586                }
1587
1588                // Then, clean up expired media content.
1589                if let Some(last_access_expiry) = policy.last_access_expiry {
1590                    let current_timestamp = time_to_timestamp(current_time);
1591                    let expiry_secs = last_access_expiry.as_secs();
1592                    let count = txn.execute(
1593                        "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1594                        (current_timestamp, expiry_secs),
1595                    )?;
1596
1597                    if count > 0 {
1598                        removed = true;
1599                    }
1600                }
1601
1602                // Finally, if the cache size is too big, remove old items until it fits.
1603                if let Some(max_cache_size) = policy.max_cache_size {
1604                    // i64 is the integer type used by SQLite, use it here to avoid usize overflow
1605                    // during the conversion of the result.
1606                    let cache_size = txn
1607                        .query_row(
1608                            "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1609                            (),
1610                            |row| {
1611                                // `sum()` returns `NULL` if there are no rows.
1612                                row.get::<_, Option<u64>>(0)
1613                            },
1614                        )?
1615                        .unwrap_or_default();
1616
1617                    // If the cache size is overflowing or bigger than max cache size, clean up.
1618                    if cache_size > max_cache_size {
1619                        // Get the sizes of the media contents ordered by last access.
1620                        let mut cached_stmt = txn.prepare_cached(
1621                            "SELECT rowid, length(data) FROM media \
1622                             WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1623                        )?;
1624                        let content_sizes = cached_stmt
1625                            .query(())?
1626                            .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1627
1628                        let mut accumulated_items_size = 0u64;
1629                        let mut limit_reached = false;
1630                        let mut rows_to_remove = Vec::new();
1631
1632                        for result in content_sizes {
1633                            let (row_id, size) = match result {
1634                                Ok(content_size) => content_size,
1635                                Err(error) => {
1636                                    return Err(error.into());
1637                                }
1638                            };
1639
1640                            if limit_reached {
1641                                rows_to_remove.push(row_id);
1642                                continue;
1643                            }
1644
1645                            match accumulated_items_size.checked_add(size) {
1646                                Some(acc) if acc > max_cache_size => {
1647                                    // We can stop accumulating.
1648                                    limit_reached = true;
1649                                    rows_to_remove.push(row_id);
1650                                }
1651                                Some(acc) => accumulated_items_size = acc,
1652                                None => {
1653                                    // The accumulated size is overflowing but the setting cannot be
1654                                    // bigger than usize::MAX, we can stop accumulating.
1655                                    limit_reached = true;
1656                                    rows_to_remove.push(row_id);
1657                                }
1658                            }
1659                        }
1660
1661                        if !rows_to_remove.is_empty() {
1662                            removed = true;
1663                        }
1664
1665                        txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1666                            let sql_params = repeat_vars(row_ids.len());
1667                            let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1668                            txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1669                            Ok(Vec::<()>::new())
1670                        })?;
1671                    }
1672                }
1673
1674                txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1675
1676                Ok(removed)
1677            })
1678            .await?;
1679
1680        // If we removed media, defragment the database and free space on the
1681        // filesystem.
1682        if removed {
1683            conn.vacuum().await?;
1684        }
1685
1686        Ok(())
1687    }
1688
1689    async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1690        let conn = self.read().await?;
1691        conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1692    }
1693}
1694
1695fn find_event_relations_transaction(
1696    store: SqliteEventCacheStore,
1697    hashed_room_id: Key,
1698    hashed_linked_chunk_id: Key,
1699    event_id: OwnedEventId,
1700    filters: Option<Vec<RelationType>>,
1701    txn: &Transaction<'_>,
1702) -> Result<Vec<(Event, Option<Position>)>> {
1703    let get_rows = |row: &rusqlite::Row<'_>| {
1704        Ok((
1705            row.get::<_, Vec<u8>>(0)?,
1706            row.get::<_, Option<u64>>(1)?,
1707            row.get::<_, Option<usize>>(2)?,
1708        ))
1709    };
1710
1711    // Collect related events.
1712    let collect_results = |transaction| {
1713        let mut related = Vec::new();
1714
1715        for result in transaction {
1716            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1717
1718            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1719
1720            // Only build the position if both the chunk_id and position were present; in
1721            // theory, they should either be present at the same time, or not at all.
1722            let pos = chunk_id
1723                .zip(index)
1724                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1725
1726            related.push((event, pos));
1727        }
1728
1729        Ok(related)
1730    };
1731
1732    let related = if let Some(filters) = compute_filters_string(filters.as_deref()) {
1733        let question_marks = repeat_vars(filters.len());
1734        let query = format!(
1735            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1736            FROM events
1737            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1738            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1739        );
1740
1741        let filters: Vec<_> = filters.iter().map(|f| f.to_sql().unwrap()).collect();
1742        let parameters = params_from_iter(
1743            [
1744                hashed_linked_chunk_id.to_sql().expect(
1745                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1746                ),
1747                event_id
1748                    .as_str()
1749                    .to_sql()
1750                    .expect("We should be able to convert an event ID to a SQLite value"),
1751                hashed_room_id
1752                    .to_sql()
1753                    .expect("We should be able to convert a room ID to a SQLite value"),
1754            ]
1755            .into_iter()
1756            .chain(filters),
1757        );
1758
1759        let mut transaction = txn.prepare(&query)?;
1760        let transaction = transaction.query_map(parameters, get_rows)?;
1761
1762        collect_results(transaction)
1763    } else {
1764        let query =
1765            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1766            FROM events
1767            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1768            WHERE relates_to = ? AND room_id = ?";
1769        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1770
1771        let mut transaction = txn.prepare(query)?;
1772        let transaction = transaction.query_map(parameters, get_rows)?;
1773
1774        collect_results(transaction)
1775    };
1776
1777    related
1778}
1779
1780/// Like `deadpool::managed::Object::with_transaction`, but starts the
1781/// transaction in immediate (write) mode from the beginning, precluding errors
1782/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1783/// both reads and writes, and start with a write.
1784async fn with_immediate_transaction<
1785    T: Send + 'static,
1786    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1787>(
1788    this: &SqliteEventCacheStore,
1789    f: F,
1790) -> Result<T, Error> {
1791    this.write()
1792        .await?
1793        .interact(move |conn| -> Result<T, Error> {
1794            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1795            // to avoid read transactions upgrading to write mode and causing
1796            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1797            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1798
1799            let code = || -> Result<T, Error> {
1800                let txn = conn.transaction()?;
1801                let res = f(&txn)?;
1802                txn.commit()?;
1803                Ok(res)
1804            };
1805
1806            let res = code();
1807
1808            // Reset the transaction behavior to use Deferred, after this transaction has
1809            // been run, whether it was successful or not.
1810            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1811
1812            res
1813        })
1814        .await
1815        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1816        .unwrap()
1817}
1818
1819fn insert_chunk(
1820    txn: &Transaction<'_>,
1821    linked_chunk_id: &Key,
1822    previous: Option<u64>,
1823    new: u64,
1824    next: Option<u64>,
1825    type_str: &str,
1826) -> rusqlite::Result<()> {
1827    // First, insert the new chunk.
1828    txn.execute(
1829        r#"
1830            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1831            VALUES (?, ?, ?, ?, ?)
1832        "#,
1833        (new, linked_chunk_id, previous, next, type_str),
1834    )?;
1835
1836    // If this chunk has a previous one, update its `next` field.
1837    if let Some(previous) = previous {
1838        txn.execute(
1839            r#"
1840                UPDATE linked_chunks
1841                SET next = ?
1842                WHERE id = ? AND linked_chunk_id = ?
1843            "#,
1844            (new, previous, linked_chunk_id),
1845        )?;
1846    }
1847
1848    // If this chunk has a next one, update its `previous` field.
1849    if let Some(next) = next {
1850        txn.execute(
1851            r#"
1852                UPDATE linked_chunks
1853                SET previous = ?
1854                WHERE id = ? AND linked_chunk_id = ?
1855            "#,
1856            (new, next, linked_chunk_id),
1857        )?;
1858    }
1859
1860    Ok(())
1861}
1862
1863#[cfg(test)]
1864mod tests {
1865    use std::{
1866        path::PathBuf,
1867        sync::atomic::{AtomicU32, Ordering::SeqCst},
1868        time::Duration,
1869    };
1870
1871    use assert_matches::assert_matches;
1872    use matrix_sdk_base::{
1873        event_cache::{
1874            store::{
1875                integration_tests::{
1876                    check_test_event, make_test_event, make_test_event_with_event_id,
1877                },
1878                media::IgnoreMediaRetentionPolicy,
1879                EventCacheStore, EventCacheStoreError,
1880            },
1881            Gap,
1882        },
1883        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1884        event_cache_store_media_integration_tests,
1885        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1886        media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1887    };
1888    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1889    use once_cell::sync::Lazy;
1890    use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1891    use tempfile::{tempdir, TempDir};
1892
1893    use super::SqliteEventCacheStore;
1894    use crate::{
1895        event_cache_store::keys,
1896        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1897        SqliteStoreConfig,
1898    };
1899
1900    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1901    static NUM: AtomicU32 = AtomicU32::new(0);
1902
1903    fn new_event_cache_store_workspace() -> PathBuf {
1904        let name = NUM.fetch_add(1, SeqCst).to_string();
1905        TMP_DIR.path().join(name)
1906    }
1907
1908    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1909        let tmpdir_path = new_event_cache_store_workspace();
1910
1911        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1912
1913        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1914    }
1915
1916    event_cache_store_integration_tests!();
1917    event_cache_store_integration_tests_time!();
1918    event_cache_store_media_integration_tests!(with_media_size_tests);
1919
1920    async fn get_event_cache_store_content_sorted_by_last_access(
1921        event_cache_store: &SqliteEventCacheStore,
1922    ) -> Vec<Vec<u8>> {
1923        let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed");
1924        sqlite_db
1925            .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1926                stmt.query(())?.mapped(|row| row.get(0)).collect()
1927            })
1928            .await
1929            .expect("querying media cache content by last access failed")
1930    }
1931
1932    #[async_test]
1933    async fn test_pool_size() {
1934        let tmpdir_path = new_event_cache_store_workspace();
1935        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1936
1937        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1938
1939        assert_eq!(store.pool.status().max_size, 42);
1940    }
1941
1942    #[async_test]
1943    async fn test_last_access() {
1944        let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1945        let uri = mxc_uri!("mxc://localhost/media");
1946        let file_request = MediaRequestParameters {
1947            source: MediaSource::Plain(uri.to_owned()),
1948            format: MediaFormat::File,
1949        };
1950        let thumbnail_request = MediaRequestParameters {
1951            source: MediaSource::Plain(uri.to_owned()),
1952            format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1953                Method::Crop,
1954                uint!(100),
1955                uint!(100),
1956            )),
1957        };
1958
1959        let content: Vec<u8> = "hello world".into();
1960        let thumbnail_content: Vec<u8> = "hello…".into();
1961
1962        // Add the media.
1963        event_cache_store
1964            .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1965            .await
1966            .expect("adding file failed");
1967
1968        // Since the precision of the timestamp is in seconds, wait so the timestamps
1969        // differ.
1970        tokio::time::sleep(Duration::from_secs(3)).await;
1971
1972        event_cache_store
1973            .add_media_content(
1974                &thumbnail_request,
1975                thumbnail_content.clone(),
1976                IgnoreMediaRetentionPolicy::No,
1977            )
1978            .await
1979            .expect("adding thumbnail failed");
1980
1981        // File's last access is older than thumbnail.
1982        let contents =
1983            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1984
1985        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1986        assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1987        assert_eq!(contents[1], content, "file is not second-to-last access");
1988
1989        // Since the precision of the timestamp is in seconds, wait so the timestamps
1990        // differ.
1991        tokio::time::sleep(Duration::from_secs(3)).await;
1992
1993        // Access the file so its last access is more recent.
1994        let _ = event_cache_store
1995            .get_media_content(&file_request)
1996            .await
1997            .expect("getting file failed")
1998            .expect("file is missing");
1999
2000        // File's last access is more recent than thumbnail.
2001        let contents =
2002            get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
2003
2004        assert_eq!(contents.len(), 2, "media cache contents length is wrong");
2005        assert_eq!(contents[0], content, "file is not last access");
2006        assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
2007    }
2008
2009    #[async_test]
2010    async fn test_linked_chunk_new_items_chunk() {
2011        let store = get_event_cache_store().await.expect("creating cache store failed");
2012
2013        let room_id = &DEFAULT_TEST_ROOM_ID;
2014        let linked_chunk_id = LinkedChunkId::Room(room_id);
2015
2016        store
2017            .handle_linked_chunk_updates(
2018                linked_chunk_id,
2019                vec![
2020                    Update::NewItemsChunk {
2021                        previous: None,
2022                        new: ChunkIdentifier::new(42),
2023                        next: None, // Note: the store must link the next entry itself.
2024                    },
2025                    Update::NewItemsChunk {
2026                        previous: Some(ChunkIdentifier::new(42)),
2027                        new: ChunkIdentifier::new(13),
2028                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
2029                                                               * the next link ahead of time. */
2030                    },
2031                    Update::NewItemsChunk {
2032                        previous: Some(ChunkIdentifier::new(13)),
2033                        new: ChunkIdentifier::new(37),
2034                        next: None,
2035                    },
2036                ],
2037            )
2038            .await
2039            .unwrap();
2040
2041        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2042
2043        assert_eq!(chunks.len(), 3);
2044
2045        {
2046            // Chunks are ordered from smaller to bigger IDs.
2047            let c = chunks.remove(0);
2048            assert_eq!(c.identifier, ChunkIdentifier::new(13));
2049            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2050            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
2051            assert_matches!(c.content, ChunkContent::Items(events) => {
2052                assert!(events.is_empty());
2053            });
2054
2055            let c = chunks.remove(0);
2056            assert_eq!(c.identifier, ChunkIdentifier::new(37));
2057            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
2058            assert_eq!(c.next, None);
2059            assert_matches!(c.content, ChunkContent::Items(events) => {
2060                assert!(events.is_empty());
2061            });
2062
2063            let c = chunks.remove(0);
2064            assert_eq!(c.identifier, ChunkIdentifier::new(42));
2065            assert_eq!(c.previous, None);
2066            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
2067            assert_matches!(c.content, ChunkContent::Items(events) => {
2068                assert!(events.is_empty());
2069            });
2070        }
2071    }
2072
2073    #[async_test]
2074    async fn test_linked_chunk_new_gap_chunk() {
2075        let store = get_event_cache_store().await.expect("creating cache store failed");
2076
2077        let room_id = &DEFAULT_TEST_ROOM_ID;
2078        let linked_chunk_id = LinkedChunkId::Room(room_id);
2079
2080        store
2081            .handle_linked_chunk_updates(
2082                linked_chunk_id,
2083                vec![Update::NewGapChunk {
2084                    previous: None,
2085                    new: ChunkIdentifier::new(42),
2086                    next: None,
2087                    gap: Gap { prev_token: "raclette".to_owned() },
2088                }],
2089            )
2090            .await
2091            .unwrap();
2092
2093        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2094
2095        assert_eq!(chunks.len(), 1);
2096
2097        // Chunks are ordered from smaller to bigger IDs.
2098        let c = chunks.remove(0);
2099        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2100        assert_eq!(c.previous, None);
2101        assert_eq!(c.next, None);
2102        assert_matches!(c.content, ChunkContent::Gap(gap) => {
2103            assert_eq!(gap.prev_token, "raclette");
2104        });
2105    }
2106
2107    #[async_test]
2108    async fn test_linked_chunk_replace_item() {
2109        let store = get_event_cache_store().await.expect("creating cache store failed");
2110
2111        let room_id = &DEFAULT_TEST_ROOM_ID;
2112        let linked_chunk_id = LinkedChunkId::Room(room_id);
2113        let event_id = event_id!("$world");
2114
2115        store
2116            .handle_linked_chunk_updates(
2117                linked_chunk_id,
2118                vec![
2119                    Update::NewItemsChunk {
2120                        previous: None,
2121                        new: ChunkIdentifier::new(42),
2122                        next: None,
2123                    },
2124                    Update::PushItems {
2125                        at: Position::new(ChunkIdentifier::new(42), 0),
2126                        items: vec![
2127                            make_test_event(room_id, "hello"),
2128                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
2129                        ],
2130                    },
2131                    Update::ReplaceItem {
2132                        at: Position::new(ChunkIdentifier::new(42), 1),
2133                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
2134                    },
2135                ],
2136            )
2137            .await
2138            .unwrap();
2139
2140        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2141
2142        assert_eq!(chunks.len(), 1);
2143
2144        let c = chunks.remove(0);
2145        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2146        assert_eq!(c.previous, None);
2147        assert_eq!(c.next, None);
2148        assert_matches!(c.content, ChunkContent::Items(events) => {
2149            assert_eq!(events.len(), 2);
2150            check_test_event(&events[0], "hello");
2151            check_test_event(&events[1], "yolo");
2152        });
2153    }
2154
2155    #[async_test]
2156    async fn test_linked_chunk_remove_chunk() {
2157        let store = get_event_cache_store().await.expect("creating cache store failed");
2158
2159        let room_id = &DEFAULT_TEST_ROOM_ID;
2160        let linked_chunk_id = LinkedChunkId::Room(room_id);
2161
2162        store
2163            .handle_linked_chunk_updates(
2164                linked_chunk_id,
2165                vec![
2166                    Update::NewGapChunk {
2167                        previous: None,
2168                        new: ChunkIdentifier::new(42),
2169                        next: None,
2170                        gap: Gap { prev_token: "raclette".to_owned() },
2171                    },
2172                    Update::NewGapChunk {
2173                        previous: Some(ChunkIdentifier::new(42)),
2174                        new: ChunkIdentifier::new(43),
2175                        next: None,
2176                        gap: Gap { prev_token: "fondue".to_owned() },
2177                    },
2178                    Update::NewGapChunk {
2179                        previous: Some(ChunkIdentifier::new(43)),
2180                        new: ChunkIdentifier::new(44),
2181                        next: None,
2182                        gap: Gap { prev_token: "tartiflette".to_owned() },
2183                    },
2184                    Update::RemoveChunk(ChunkIdentifier::new(43)),
2185                ],
2186            )
2187            .await
2188            .unwrap();
2189
2190        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2191
2192        assert_eq!(chunks.len(), 2);
2193
2194        // Chunks are ordered from smaller to bigger IDs.
2195        let c = chunks.remove(0);
2196        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2197        assert_eq!(c.previous, None);
2198        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
2199        assert_matches!(c.content, ChunkContent::Gap(gap) => {
2200            assert_eq!(gap.prev_token, "raclette");
2201        });
2202
2203        let c = chunks.remove(0);
2204        assert_eq!(c.identifier, ChunkIdentifier::new(44));
2205        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2206        assert_eq!(c.next, None);
2207        assert_matches!(c.content, ChunkContent::Gap(gap) => {
2208            assert_eq!(gap.prev_token, "tartiflette");
2209        });
2210
2211        // Check that cascading worked. Yes, SQLite, I doubt you.
2212        let gaps = store
2213            .read()
2214            .await
2215            .unwrap()
2216            .with_transaction(|txn| -> rusqlite::Result<_> {
2217                let mut gaps = Vec::new();
2218                for data in txn
2219                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
2220                    .query_map((), |row| row.get::<_, u64>(0))?
2221                {
2222                    gaps.push(data?);
2223                }
2224                Ok(gaps)
2225            })
2226            .await
2227            .unwrap();
2228
2229        assert_eq!(gaps, vec![42, 44]);
2230    }
2231
2232    #[async_test]
2233    async fn test_linked_chunk_push_items() {
2234        let store = get_event_cache_store().await.expect("creating cache store failed");
2235
2236        let room_id = &DEFAULT_TEST_ROOM_ID;
2237        let linked_chunk_id = LinkedChunkId::Room(room_id);
2238
2239        store
2240            .handle_linked_chunk_updates(
2241                linked_chunk_id,
2242                vec![
2243                    Update::NewItemsChunk {
2244                        previous: None,
2245                        new: ChunkIdentifier::new(42),
2246                        next: None,
2247                    },
2248                    Update::PushItems {
2249                        at: Position::new(ChunkIdentifier::new(42), 0),
2250                        items: vec![
2251                            make_test_event(room_id, "hello"),
2252                            make_test_event(room_id, "world"),
2253                        ],
2254                    },
2255                    Update::PushItems {
2256                        at: Position::new(ChunkIdentifier::new(42), 2),
2257                        items: vec![make_test_event(room_id, "who?")],
2258                    },
2259                ],
2260            )
2261            .await
2262            .unwrap();
2263
2264        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2265
2266        assert_eq!(chunks.len(), 1);
2267
2268        let c = chunks.remove(0);
2269        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2270        assert_eq!(c.previous, None);
2271        assert_eq!(c.next, None);
2272        assert_matches!(c.content, ChunkContent::Items(events) => {
2273            assert_eq!(events.len(), 3);
2274
2275            check_test_event(&events[0], "hello");
2276            check_test_event(&events[1], "world");
2277            check_test_event(&events[2], "who?");
2278        });
2279    }
2280
2281    #[async_test]
2282    async fn test_linked_chunk_remove_item() {
2283        let store = get_event_cache_store().await.expect("creating cache store failed");
2284
2285        let room_id = *DEFAULT_TEST_ROOM_ID;
2286        let linked_chunk_id = LinkedChunkId::Room(room_id);
2287
2288        store
2289            .handle_linked_chunk_updates(
2290                linked_chunk_id,
2291                vec![
2292                    Update::NewItemsChunk {
2293                        previous: None,
2294                        new: ChunkIdentifier::new(42),
2295                        next: None,
2296                    },
2297                    Update::PushItems {
2298                        at: Position::new(ChunkIdentifier::new(42), 0),
2299                        items: vec![
2300                            make_test_event(room_id, "one"),
2301                            make_test_event(room_id, "two"),
2302                            make_test_event(room_id, "three"),
2303                            make_test_event(room_id, "four"),
2304                            make_test_event(room_id, "five"),
2305                            make_test_event(room_id, "six"),
2306                        ],
2307                    },
2308                    Update::RemoveItem {
2309                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
2310                    },
2311                ],
2312            )
2313            .await
2314            .unwrap();
2315
2316        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2317
2318        assert_eq!(chunks.len(), 1);
2319
2320        let c = chunks.remove(0);
2321        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2322        assert_eq!(c.previous, None);
2323        assert_eq!(c.next, None);
2324        assert_matches!(c.content, ChunkContent::Items(events) => {
2325            assert_eq!(events.len(), 5);
2326            check_test_event(&events[0], "one");
2327            check_test_event(&events[1], "two");
2328            check_test_event(&events[2], "four");
2329            check_test_event(&events[3], "five");
2330            check_test_event(&events[4], "six");
2331        });
2332
2333        // Make sure the position have been updated for the remaining events.
2334        let num_rows: u64 = store
2335            .read()
2336            .await
2337            .unwrap()
2338            .with_transaction(move |txn| {
2339                txn.query_row(
2340                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2341                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2342                    |row| row.get(0),
2343                )
2344            })
2345            .await
2346            .unwrap();
2347        assert_eq!(num_rows, 3);
2348    }
2349
2350    #[async_test]
2351    async fn test_linked_chunk_detach_last_items() {
2352        let store = get_event_cache_store().await.expect("creating cache store failed");
2353
2354        let room_id = *DEFAULT_TEST_ROOM_ID;
2355        let linked_chunk_id = LinkedChunkId::Room(room_id);
2356
2357        store
2358            .handle_linked_chunk_updates(
2359                linked_chunk_id,
2360                vec![
2361                    Update::NewItemsChunk {
2362                        previous: None,
2363                        new: ChunkIdentifier::new(42),
2364                        next: None,
2365                    },
2366                    Update::PushItems {
2367                        at: Position::new(ChunkIdentifier::new(42), 0),
2368                        items: vec![
2369                            make_test_event(room_id, "hello"),
2370                            make_test_event(room_id, "world"),
2371                            make_test_event(room_id, "howdy"),
2372                        ],
2373                    },
2374                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2375                ],
2376            )
2377            .await
2378            .unwrap();
2379
2380        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2381
2382        assert_eq!(chunks.len(), 1);
2383
2384        let c = chunks.remove(0);
2385        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2386        assert_eq!(c.previous, None);
2387        assert_eq!(c.next, None);
2388        assert_matches!(c.content, ChunkContent::Items(events) => {
2389            assert_eq!(events.len(), 1);
2390            check_test_event(&events[0], "hello");
2391        });
2392    }
2393
2394    #[async_test]
2395    async fn test_linked_chunk_start_end_reattach_items() {
2396        let store = get_event_cache_store().await.expect("creating cache store failed");
2397
2398        let room_id = *DEFAULT_TEST_ROOM_ID;
2399        let linked_chunk_id = LinkedChunkId::Room(room_id);
2400
2401        // Same updates and checks as test_linked_chunk_push_items, but with extra
2402        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2403        // effects.
2404        store
2405            .handle_linked_chunk_updates(
2406                linked_chunk_id,
2407                vec![
2408                    Update::NewItemsChunk {
2409                        previous: None,
2410                        new: ChunkIdentifier::new(42),
2411                        next: None,
2412                    },
2413                    Update::PushItems {
2414                        at: Position::new(ChunkIdentifier::new(42), 0),
2415                        items: vec![
2416                            make_test_event(room_id, "hello"),
2417                            make_test_event(room_id, "world"),
2418                            make_test_event(room_id, "howdy"),
2419                        ],
2420                    },
2421                    Update::StartReattachItems,
2422                    Update::EndReattachItems,
2423                ],
2424            )
2425            .await
2426            .unwrap();
2427
2428        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2429
2430        assert_eq!(chunks.len(), 1);
2431
2432        let c = chunks.remove(0);
2433        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2434        assert_eq!(c.previous, None);
2435        assert_eq!(c.next, None);
2436        assert_matches!(c.content, ChunkContent::Items(events) => {
2437            assert_eq!(events.len(), 3);
2438            check_test_event(&events[0], "hello");
2439            check_test_event(&events[1], "world");
2440            check_test_event(&events[2], "howdy");
2441        });
2442    }
2443
2444    #[async_test]
2445    async fn test_linked_chunk_clear() {
2446        let store = get_event_cache_store().await.expect("creating cache store failed");
2447
2448        let room_id = *DEFAULT_TEST_ROOM_ID;
2449        let linked_chunk_id = LinkedChunkId::Room(room_id);
2450        let event_0 = make_test_event(room_id, "hello");
2451        let event_1 = make_test_event(room_id, "world");
2452        let event_2 = make_test_event(room_id, "howdy");
2453
2454        store
2455            .handle_linked_chunk_updates(
2456                linked_chunk_id,
2457                vec![
2458                    Update::NewItemsChunk {
2459                        previous: None,
2460                        new: ChunkIdentifier::new(42),
2461                        next: None,
2462                    },
2463                    Update::NewGapChunk {
2464                        previous: Some(ChunkIdentifier::new(42)),
2465                        new: ChunkIdentifier::new(54),
2466                        next: None,
2467                        gap: Gap { prev_token: "fondue".to_owned() },
2468                    },
2469                    Update::PushItems {
2470                        at: Position::new(ChunkIdentifier::new(42), 0),
2471                        items: vec![event_0.clone(), event_1, event_2],
2472                    },
2473                    Update::Clear,
2474                ],
2475            )
2476            .await
2477            .unwrap();
2478
2479        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2480        assert!(chunks.is_empty());
2481
2482        // Check that cascading worked. Yes, SQLite, I doubt you.
2483        store
2484            .read()
2485            .await
2486            .unwrap()
2487            .with_transaction(|txn| -> rusqlite::Result<_> {
2488                let num_gaps = txn
2489                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2490                    .query_row((), |row| row.get::<_, u64>(0))?;
2491                assert_eq!(num_gaps, 0);
2492
2493                let num_events = txn
2494                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2495                    .query_row((), |row| row.get::<_, u64>(0))?;
2496                assert_eq!(num_events, 0);
2497
2498                Ok(())
2499            })
2500            .await
2501            .unwrap();
2502
2503        // It's okay to re-insert a past event.
2504        store
2505            .handle_linked_chunk_updates(
2506                linked_chunk_id,
2507                vec![
2508                    Update::NewItemsChunk {
2509                        previous: None,
2510                        new: ChunkIdentifier::new(42),
2511                        next: None,
2512                    },
2513                    Update::PushItems {
2514                        at: Position::new(ChunkIdentifier::new(42), 0),
2515                        items: vec![event_0],
2516                    },
2517                ],
2518            )
2519            .await
2520            .unwrap();
2521    }
2522
2523    #[async_test]
2524    async fn test_linked_chunk_multiple_rooms() {
2525        let store = get_event_cache_store().await.expect("creating cache store failed");
2526
2527        let room1 = room_id!("!realcheeselovers:raclette.fr");
2528        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2529        let room2 = room_id!("!realcheeselovers:fondue.ch");
2530        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2531
2532        // Check that applying updates to one room doesn't affect the others.
2533        // Use the same chunk identifier in both rooms to battle-test search.
2534
2535        store
2536            .handle_linked_chunk_updates(
2537                linked_chunk_id1,
2538                vec![
2539                    Update::NewItemsChunk {
2540                        previous: None,
2541                        new: ChunkIdentifier::new(42),
2542                        next: None,
2543                    },
2544                    Update::PushItems {
2545                        at: Position::new(ChunkIdentifier::new(42), 0),
2546                        items: vec![
2547                            make_test_event(room1, "best cheese is raclette"),
2548                            make_test_event(room1, "obviously"),
2549                        ],
2550                    },
2551                ],
2552            )
2553            .await
2554            .unwrap();
2555
2556        store
2557            .handle_linked_chunk_updates(
2558                linked_chunk_id2,
2559                vec![
2560                    Update::NewItemsChunk {
2561                        previous: None,
2562                        new: ChunkIdentifier::new(42),
2563                        next: None,
2564                    },
2565                    Update::PushItems {
2566                        at: Position::new(ChunkIdentifier::new(42), 0),
2567                        items: vec![make_test_event(room1, "beaufort is the best")],
2568                    },
2569                ],
2570            )
2571            .await
2572            .unwrap();
2573
2574        // Check chunks from room 1.
2575        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2576        assert_eq!(chunks_room1.len(), 1);
2577
2578        let c = chunks_room1.remove(0);
2579        assert_matches!(c.content, ChunkContent::Items(events) => {
2580            assert_eq!(events.len(), 2);
2581            check_test_event(&events[0], "best cheese is raclette");
2582            check_test_event(&events[1], "obviously");
2583        });
2584
2585        // Check chunks from room 2.
2586        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2587        assert_eq!(chunks_room2.len(), 1);
2588
2589        let c = chunks_room2.remove(0);
2590        assert_matches!(c.content, ChunkContent::Items(events) => {
2591            assert_eq!(events.len(), 1);
2592            check_test_event(&events[0], "beaufort is the best");
2593        });
2594    }
2595
2596    #[async_test]
2597    async fn test_linked_chunk_update_is_a_transaction() {
2598        let store = get_event_cache_store().await.expect("creating cache store failed");
2599
2600        let room_id = *DEFAULT_TEST_ROOM_ID;
2601        let linked_chunk_id = LinkedChunkId::Room(room_id);
2602
2603        // Trigger a violation of the unique constraint on the (room id, chunk id)
2604        // couple.
2605        let err = store
2606            .handle_linked_chunk_updates(
2607                linked_chunk_id,
2608                vec![
2609                    Update::NewItemsChunk {
2610                        previous: None,
2611                        new: ChunkIdentifier::new(42),
2612                        next: None,
2613                    },
2614                    Update::NewItemsChunk {
2615                        previous: None,
2616                        new: ChunkIdentifier::new(42),
2617                        next: None,
2618                    },
2619                ],
2620            )
2621            .await
2622            .unwrap_err();
2623
2624        // The operation fails with a constraint violation error.
2625        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2626            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2627        });
2628
2629        // If the updates have been handled transactionally, then no new chunks should
2630        // have been added; failure of the second update leads to the first one being
2631        // rolled back.
2632        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2633        assert!(chunks.is_empty());
2634    }
2635
2636    #[async_test]
2637    async fn test_filter_duplicate_events_no_events() {
2638        let store = get_event_cache_store().await.expect("creating cache store failed");
2639
2640        let room_id = *DEFAULT_TEST_ROOM_ID;
2641        let linked_chunk_id = LinkedChunkId::Room(room_id);
2642        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2643        assert!(duplicates.is_empty());
2644    }
2645
2646    #[async_test]
2647    async fn test_load_last_chunk() {
2648        let room_id = room_id!("!r0:matrix.org");
2649        let linked_chunk_id = LinkedChunkId::Room(room_id);
2650        let event = |msg: &str| make_test_event(room_id, msg);
2651        let store = get_event_cache_store().await.expect("creating cache store failed");
2652
2653        // Case #1: no last chunk.
2654        {
2655            let (last_chunk, chunk_identifier_generator) =
2656                store.load_last_chunk(linked_chunk_id).await.unwrap();
2657
2658            assert!(last_chunk.is_none());
2659            assert_eq!(chunk_identifier_generator.current(), 0);
2660        }
2661
2662        // Case #2: only one chunk is present.
2663        {
2664            store
2665                .handle_linked_chunk_updates(
2666                    linked_chunk_id,
2667                    vec![
2668                        Update::NewItemsChunk {
2669                            previous: None,
2670                            new: ChunkIdentifier::new(42),
2671                            next: None,
2672                        },
2673                        Update::PushItems {
2674                            at: Position::new(ChunkIdentifier::new(42), 0),
2675                            items: vec![event("saucisse de morteau"), event("comté")],
2676                        },
2677                    ],
2678                )
2679                .await
2680                .unwrap();
2681
2682            let (last_chunk, chunk_identifier_generator) =
2683                store.load_last_chunk(linked_chunk_id).await.unwrap();
2684
2685            assert_matches!(last_chunk, Some(last_chunk) => {
2686                assert_eq!(last_chunk.identifier, 42);
2687                assert!(last_chunk.previous.is_none());
2688                assert!(last_chunk.next.is_none());
2689                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2690                    assert_eq!(items.len(), 2);
2691                    check_test_event(&items[0], "saucisse de morteau");
2692                    check_test_event(&items[1], "comté");
2693                });
2694            });
2695            assert_eq!(chunk_identifier_generator.current(), 42);
2696        }
2697
2698        // Case #3: more chunks are present.
2699        {
2700            store
2701                .handle_linked_chunk_updates(
2702                    linked_chunk_id,
2703                    vec![
2704                        Update::NewItemsChunk {
2705                            previous: Some(ChunkIdentifier::new(42)),
2706                            new: ChunkIdentifier::new(7),
2707                            next: None,
2708                        },
2709                        Update::PushItems {
2710                            at: Position::new(ChunkIdentifier::new(7), 0),
2711                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2712                        },
2713                    ],
2714                )
2715                .await
2716                .unwrap();
2717
2718            let (last_chunk, chunk_identifier_generator) =
2719                store.load_last_chunk(linked_chunk_id).await.unwrap();
2720
2721            assert_matches!(last_chunk, Some(last_chunk) => {
2722                assert_eq!(last_chunk.identifier, 7);
2723                assert_matches!(last_chunk.previous, Some(previous) => {
2724                    assert_eq!(previous, 42);
2725                });
2726                assert!(last_chunk.next.is_none());
2727                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2728                    assert_eq!(items.len(), 3);
2729                    check_test_event(&items[0], "fondue");
2730                    check_test_event(&items[1], "gruyère");
2731                    check_test_event(&items[2], "mont d'or");
2732                });
2733            });
2734            assert_eq!(chunk_identifier_generator.current(), 42);
2735        }
2736    }
2737
2738    #[async_test]
2739    async fn test_load_last_chunk_with_a_cycle() {
2740        let room_id = room_id!("!r0:matrix.org");
2741        let linked_chunk_id = LinkedChunkId::Room(room_id);
2742        let store = get_event_cache_store().await.expect("creating cache store failed");
2743
2744        store
2745            .handle_linked_chunk_updates(
2746                linked_chunk_id,
2747                vec![
2748                    Update::NewItemsChunk {
2749                        previous: None,
2750                        new: ChunkIdentifier::new(0),
2751                        next: None,
2752                    },
2753                    Update::NewItemsChunk {
2754                        // Because `previous` connects to chunk #0, it will create a cycle.
2755                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2756                        // **does not exist**. We have to detect this cycle.
2757                        previous: Some(ChunkIdentifier::new(0)),
2758                        new: ChunkIdentifier::new(1),
2759                        next: Some(ChunkIdentifier::new(0)),
2760                    },
2761                ],
2762            )
2763            .await
2764            .unwrap();
2765
2766        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2767    }
2768
2769    #[async_test]
2770    async fn test_load_previous_chunk() {
2771        let room_id = room_id!("!r0:matrix.org");
2772        let linked_chunk_id = LinkedChunkId::Room(room_id);
2773        let event = |msg: &str| make_test_event(room_id, msg);
2774        let store = get_event_cache_store().await.expect("creating cache store failed");
2775
2776        // Case #1: no chunk at all, equivalent to having an nonexistent
2777        // `before_chunk_identifier`.
2778        {
2779            let previous_chunk = store
2780                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2781                .await
2782                .unwrap();
2783
2784            assert!(previous_chunk.is_none());
2785        }
2786
2787        // Case #2: there is one chunk only: we request the previous on this
2788        // one, it doesn't exist.
2789        {
2790            store
2791                .handle_linked_chunk_updates(
2792                    linked_chunk_id,
2793                    vec![Update::NewItemsChunk {
2794                        previous: None,
2795                        new: ChunkIdentifier::new(42),
2796                        next: None,
2797                    }],
2798                )
2799                .await
2800                .unwrap();
2801
2802            let previous_chunk =
2803                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2804
2805            assert!(previous_chunk.is_none());
2806        }
2807
2808        // Case #3: there are two chunks.
2809        {
2810            store
2811                .handle_linked_chunk_updates(
2812                    linked_chunk_id,
2813                    vec![
2814                        // new chunk before the one that exists.
2815                        Update::NewItemsChunk {
2816                            previous: None,
2817                            new: ChunkIdentifier::new(7),
2818                            next: Some(ChunkIdentifier::new(42)),
2819                        },
2820                        Update::PushItems {
2821                            at: Position::new(ChunkIdentifier::new(7), 0),
2822                            items: vec![event("brigand du jorat"), event("morbier")],
2823                        },
2824                    ],
2825                )
2826                .await
2827                .unwrap();
2828
2829            let previous_chunk =
2830                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2831
2832            assert_matches!(previous_chunk, Some(previous_chunk) => {
2833                assert_eq!(previous_chunk.identifier, 7);
2834                assert!(previous_chunk.previous.is_none());
2835                assert_matches!(previous_chunk.next, Some(next) => {
2836                    assert_eq!(next, 42);
2837                });
2838                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2839                    assert_eq!(items.len(), 2);
2840                    check_test_event(&items[0], "brigand du jorat");
2841                    check_test_event(&items[1], "morbier");
2842                });
2843            });
2844        }
2845    }
2846}
2847
2848#[cfg(test)]
2849mod encrypted_tests {
2850    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2851
2852    use matrix_sdk_base::{
2853        event_cache::store::{EventCacheStore, EventCacheStoreError},
2854        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2855        event_cache_store_media_integration_tests,
2856    };
2857    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2858    use once_cell::sync::Lazy;
2859    use ruma::{
2860        event_id,
2861        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2862        room_id, user_id,
2863    };
2864    use tempfile::{tempdir, TempDir};
2865
2866    use super::SqliteEventCacheStore;
2867
2868    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2869    static NUM: AtomicU32 = AtomicU32::new(0);
2870
2871    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2872        let name = NUM.fetch_add(1, SeqCst).to_string();
2873        let tmpdir_path = TMP_DIR.path().join(name);
2874
2875        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2876
2877        Ok(SqliteEventCacheStore::open(
2878            tmpdir_path.to_str().unwrap(),
2879            Some("default_test_password"),
2880        )
2881        .await
2882        .unwrap())
2883    }
2884
2885    event_cache_store_integration_tests!();
2886    event_cache_store_integration_tests_time!();
2887    event_cache_store_media_integration_tests!();
2888
2889    #[async_test]
2890    async fn test_no_sqlite_injection_in_find_event_relations() {
2891        let room_id = room_id!("!test:localhost");
2892        let another_room_id = room_id!("!r1:matrix.org");
2893        let sender = user_id!("@alice:localhost");
2894
2895        let store = get_event_cache_store()
2896            .await
2897            .expect("We should be able to create a new, empty, event cache store");
2898
2899        let f = EventFactory::new().room(room_id).sender(sender);
2900
2901        // Create an event for the first room.
2902        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2903        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2904
2905        // Create a related event.
2906        let edit_id = event_id!("$find_me:matrix.org");
2907        let edit = f
2908            .text_msg("Find me")
2909            .event_id(edit_id)
2910            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2911            .into_event();
2912
2913        // Create an event for the second room.
2914        let f = f.room(another_room_id);
2915
2916        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2917        let another_event =
2918            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2919
2920        // Save the events in the DB.
2921        store.save_event(room_id, event).await.unwrap();
2922        store.save_event(room_id, edit).await.unwrap();
2923        store.save_event(another_room_id, another_event).await.unwrap();
2924
2925        // Craft a `RelationType` that will inject some SQL to be executed. The
2926        // `OR 1=1` ensures that all the previous parameters, the room
2927        // ID and event ID are ignored.
2928        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2929
2930        // Attempt to find events in the first room.
2931        let results = store
2932            .find_event_relations(room_id, event_id, filter.as_deref())
2933            .await
2934            .expect("We should be able to attempt to find event relations");
2935
2936        // Ensure that we only got the single related event the first room contains.
2937        similar_asserts::assert_eq!(
2938            results.len(),
2939            1,
2940            "We should only have loaded events for the first room {results:#?}"
2941        );
2942
2943        // The event needs to be the edit event, otherwise something is wrong.
2944        let (found_event, _) = &results[0];
2945        assert_eq!(
2946            found_event.event_id().as_deref(),
2947            Some(edit_id),
2948            "The single event we found should be the edit event"
2949        );
2950    }
2951}