matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{extract_event_relation, EventCacheStore},
25        Event, Gap,
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
38use tokio::{
39    fs,
40    sync::{Mutex, OwnedMutexGuard},
41};
42use tracing::{debug, error, instrument, trace};
43
44use crate::{
45    error::{Error, Result},
46    utils::{
47        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
48        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
49    },
50    OpenStoreError, Secret, SqliteStoreConfig,
51};
52
53mod keys {
54    // Tables
55    pub const LINKED_CHUNKS: &str = "linked_chunks";
56}
57
58/// The database name.
59const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
60
61/// Identifier of the latest database version.
62///
63/// This is used to figure whether the SQLite database requires a migration.
64/// Every new SQL migration should imply a bump of this number, and changes in
65/// the [`run_migrations`] function.
66const DATABASE_VERSION: u8 = 11;
67
68/// The string used to identify a chunk of type events, in the `type` field in
69/// the database.
70const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
71/// The string used to identify a chunk of type gap, in the `type` field in the
72/// database.
73const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
74
75/// An SQLite-based event cache store.
76#[derive(Clone)]
77pub struct SqliteEventCacheStore {
78    store_cipher: Option<Arc<StoreCipher>>,
79
80    /// The pool of connections.
81    pool: SqlitePool,
82
83    /// We make the difference between connections for read operations, and for
84    /// write operations. We keep a single connection apart from write
85    /// operations. All other connections are used for read operations. The
86    /// lock is used to ensure there is one owner at a time.
87    write_connection: Arc<Mutex<SqliteAsyncConn>>,
88}
89
90#[cfg(not(tarpaulin_include))]
91impl fmt::Debug for SqliteEventCacheStore {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
94    }
95}
96
97impl EncryptableStore for SqliteEventCacheStore {
98    fn get_cypher(&self) -> Option<&StoreCipher> {
99        self.store_cipher.as_deref()
100    }
101}
102
103impl SqliteEventCacheStore {
104    /// Open the SQLite-based event cache store at the given path using the
105    /// given passphrase to encrypt private data.
106    pub async fn open(
107        path: impl AsRef<Path>,
108        passphrase: Option<&str>,
109    ) -> Result<Self, OpenStoreError> {
110        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
111    }
112
113    /// Open the SQLite-based event cache store at the given path using the
114    /// given key to encrypt private data.
115    pub async fn open_with_key(
116        path: impl AsRef<Path>,
117        key: Option<&[u8; 32]>,
118    ) -> Result<Self, OpenStoreError> {
119        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
120    }
121
122    /// Open the SQLite-based event cache store with the config open config.
123    #[instrument(skip(config), fields(path = ?config.path))]
124    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
125        debug!(?config);
126
127        let _timer = timer!("open_with_config");
128
129        let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
130
131        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
132
133        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
134        config.pool = Some(pool_config);
135
136        let pool = config.create_pool(Runtime::Tokio1)?;
137
138        let this = Self::open_with_pool(pool, secret).await?;
139        this.write().await?.apply_runtime_config(runtime_config).await?;
140
141        Ok(this)
142    }
143
144    /// Open an SQLite-based event cache store using the given SQLite database
145    /// pool. The given secret will be used to encrypt private data.
146    async fn open_with_pool(
147        pool: SqlitePool,
148        secret: Option<Secret>,
149    ) -> Result<Self, OpenStoreError> {
150        let conn = pool.get().await?;
151
152        let version = conn.db_version().await?;
153        run_migrations(&conn, version).await?;
154
155        let store_cipher = match secret {
156            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
157            None => None,
158        };
159
160        Ok(Self {
161            store_cipher,
162            pool,
163            // Use `conn` as our selected write connections.
164            write_connection: Arc::new(Mutex::new(conn)),
165        })
166    }
167
168    /// Acquire a connection for executing read operations.
169    #[instrument(skip_all)]
170    async fn read(&self) -> Result<SqliteAsyncConn> {
171        trace!("Taking a `read` connection");
172        let _timer = timer!("connection");
173
174        let connection = self.pool.get().await?;
175
176        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
177        // support must be enabled on a per-connection basis. Execute it every
178        // time we try to get a connection, since we can't guarantee a previous
179        // connection did enable it before.
180        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
181
182        Ok(connection)
183    }
184
185    /// Acquire a connection for executing write operations.
186    #[instrument(skip_all)]
187    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
188        trace!("Taking a `write` connection");
189        let _timer = timer!("connection");
190
191        let connection = self.write_connection.clone().lock_owned().await;
192
193        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
194        // support must be enabled on a per-connection basis. Execute it every
195        // time we try to get a connection, since we can't guarantee a previous
196        // connection did enable it before.
197        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
198
199        Ok(connection)
200    }
201
202    fn map_row_to_chunk(
203        row: &rusqlite::Row<'_>,
204    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
205        Ok((
206            row.get::<_, u64>(0)?,
207            row.get::<_, Option<u64>>(1)?,
208            row.get::<_, Option<u64>>(2)?,
209            row.get::<_, String>(3)?,
210        ))
211    }
212
213    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
214        let serialized = serde_json::to_vec(event)?;
215
216        // Extract the relationship info here.
217        let raw_event = event.raw();
218        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
219
220        // The content may be encrypted.
221        let content = self.encode_value(serialized)?;
222
223        Ok(EncodedEvent {
224            content,
225            rel_type,
226            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
227        })
228    }
229}
230
231struct EncodedEvent {
232    content: Vec<u8>,
233    rel_type: Option<String>,
234    relates_to: Option<String>,
235}
236
237trait TransactionExtForLinkedChunks {
238    fn rebuild_chunk(
239        &self,
240        store: &SqliteEventCacheStore,
241        linked_chunk_id: &Key,
242        previous: Option<u64>,
243        index: u64,
244        next: Option<u64>,
245        chunk_type: &str,
246    ) -> Result<RawChunk<Event, Gap>>;
247
248    fn load_gap_content(
249        &self,
250        store: &SqliteEventCacheStore,
251        linked_chunk_id: &Key,
252        chunk_id: ChunkIdentifier,
253    ) -> Result<Gap>;
254
255    fn load_events_content(
256        &self,
257        store: &SqliteEventCacheStore,
258        linked_chunk_id: &Key,
259        chunk_id: ChunkIdentifier,
260    ) -> Result<Vec<Event>>;
261}
262
263impl TransactionExtForLinkedChunks for Transaction<'_> {
264    fn rebuild_chunk(
265        &self,
266        store: &SqliteEventCacheStore,
267        linked_chunk_id: &Key,
268        previous: Option<u64>,
269        id: u64,
270        next: Option<u64>,
271        chunk_type: &str,
272    ) -> Result<RawChunk<Event, Gap>> {
273        let previous = previous.map(ChunkIdentifier::new);
274        let next = next.map(ChunkIdentifier::new);
275        let id = ChunkIdentifier::new(id);
276
277        match chunk_type {
278            CHUNK_TYPE_GAP_TYPE_STRING => {
279                // It's a gap!
280                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
281                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
282            }
283
284            CHUNK_TYPE_EVENT_TYPE_STRING => {
285                // It's events!
286                let events = self.load_events_content(store, linked_chunk_id, id)?;
287                Ok(RawChunk {
288                    content: ChunkContent::Items(events),
289                    previous,
290                    identifier: id,
291                    next,
292                })
293            }
294
295            other => {
296                // It's an error!
297                Err(Error::InvalidData {
298                    details: format!("a linked chunk has an unknown type {other}"),
299                })
300            }
301        }
302    }
303
304    fn load_gap_content(
305        &self,
306        store: &SqliteEventCacheStore,
307        linked_chunk_id: &Key,
308        chunk_id: ChunkIdentifier,
309    ) -> Result<Gap> {
310        // There's at most one row for it in the database, so a call to `query_row` is
311        // sufficient.
312        let encoded_prev_token: Vec<u8> = self.query_row(
313            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
314            (chunk_id.index(), &linked_chunk_id),
315            |row| row.get(0),
316        )?;
317        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
318        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
319        Ok(Gap { prev_token })
320    }
321
322    fn load_events_content(
323        &self,
324        store: &SqliteEventCacheStore,
325        linked_chunk_id: &Key,
326        chunk_id: ChunkIdentifier,
327    ) -> Result<Vec<Event>> {
328        // Retrieve all the events from the database.
329        let mut events = Vec::new();
330
331        for event_data in self
332            .prepare(
333                r#"
334                    SELECT events.content
335                    FROM event_chunks ec, events
336                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
337                    ORDER BY ec.position ASC
338                "#,
339            )?
340            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
341        {
342            let encoded_content = event_data?;
343            let serialized_content = store.decode_value(&encoded_content)?;
344            let event = serde_json::from_slice(&serialized_content)?;
345
346            events.push(event);
347        }
348
349        Ok(events)
350    }
351}
352
353/// Run migrations for the given version of the database.
354async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
355    if version == 0 {
356        debug!("Creating database");
357    } else if version < DATABASE_VERSION {
358        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
359    } else {
360        return Ok(());
361    }
362
363    // Always enable foreign keys for the current connection.
364    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
365
366    if version < 1 {
367        // First turn on WAL mode, this can't be done in the transaction, it fails with
368        // the error message: "cannot change into wal mode from within a transaction".
369        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
370        conn.with_transaction(|txn| {
371            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
372            txn.set_db_version(1)
373        })
374        .await?;
375    }
376
377    if version < 2 {
378        conn.with_transaction(|txn| {
379            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
380            txn.set_db_version(2)
381        })
382        .await?;
383    }
384
385    if version < 3 {
386        conn.with_transaction(|txn| {
387            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
388            txn.set_db_version(3)
389        })
390        .await?;
391    }
392
393    if version < 4 {
394        conn.with_transaction(|txn| {
395            txn.execute_batch(include_str!(
396                "../migrations/event_cache_store/004_ignore_policy.sql"
397            ))?;
398            txn.set_db_version(4)
399        })
400        .await?;
401    }
402
403    if version < 5 {
404        conn.with_transaction(|txn| {
405            txn.execute_batch(include_str!(
406                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
407            ))?;
408            txn.set_db_version(5)
409        })
410        .await?;
411    }
412
413    if version < 6 {
414        conn.with_transaction(|txn| {
415            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
416            txn.set_db_version(6)
417        })
418        .await?;
419    }
420
421    if version < 7 {
422        conn.with_transaction(|txn| {
423            txn.execute_batch(include_str!(
424                "../migrations/event_cache_store/007_event_chunks.sql"
425            ))?;
426            txn.set_db_version(7)
427        })
428        .await?;
429    }
430
431    if version < 8 {
432        conn.with_transaction(|txn| {
433            txn.execute_batch(include_str!(
434                "../migrations/event_cache_store/008_linked_chunk_id.sql"
435            ))?;
436            txn.set_db_version(8)
437        })
438        .await?;
439    }
440
441    if version < 9 {
442        conn.with_transaction(|txn| {
443            txn.execute_batch(include_str!(
444                "../migrations/event_cache_store/009_related_event_index.sql"
445            ))?;
446            txn.set_db_version(9)
447        })
448        .await?;
449    }
450
451    if version < 10 {
452        conn.with_transaction(|txn| {
453            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
454            txn.set_db_version(10)
455        })
456        .await?;
457
458        if version >= 1 {
459            // Defragment the DB and optimize its size on the filesystem now that we removed
460            // the media cache.
461            conn.vacuum().await?;
462        }
463    }
464
465    if version < 11 {
466        conn.with_transaction(|txn| {
467            txn.execute_batch(include_str!(
468                "../migrations/event_cache_store/011_empty_event_cache.sql"
469            ))?;
470            txn.set_db_version(11)
471        })
472        .await?;
473    }
474
475    Ok(())
476}
477
478#[async_trait]
479impl EventCacheStore for SqliteEventCacheStore {
480    type Error = Error;
481
482    #[instrument(skip(self))]
483    async fn try_take_leased_lock(
484        &self,
485        lease_duration_ms: u32,
486        key: &str,
487        holder: &str,
488    ) -> Result<bool> {
489        let _timer = timer!("method");
490
491        let key = key.to_owned();
492        let holder = holder.to_owned();
493
494        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
495        let expiration = now + lease_duration_ms as u64;
496
497        let num_touched = self
498            .write()
499            .await?
500            .with_transaction(move |txn| {
501                txn.execute(
502                    "INSERT INTO lease_locks (key, holder, expiration)
503                    VALUES (?1, ?2, ?3)
504                    ON CONFLICT (key)
505                    DO
506                        UPDATE SET holder = ?2, expiration = ?3
507                        WHERE holder = ?2
508                        OR expiration < ?4
509                ",
510                    (key, holder, expiration, now),
511                )
512            })
513            .await?;
514
515        Ok(num_touched == 1)
516    }
517
518    #[instrument(skip(self, updates))]
519    async fn handle_linked_chunk_updates(
520        &self,
521        linked_chunk_id: LinkedChunkId<'_>,
522        updates: Vec<Update<Event, Gap>>,
523    ) -> Result<(), Self::Error> {
524        let _timer = timer!("method");
525
526        // Use a single transaction throughout this function, so that either all updates
527        // work, or none is taken into account.
528        let hashed_linked_chunk_id =
529            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
530        let linked_chunk_id = linked_chunk_id.to_owned();
531        let this = self.clone();
532
533        with_immediate_transaction(self, move |txn| {
534            for up in updates {
535                match up {
536                    Update::NewItemsChunk { previous, new, next } => {
537                        let previous = previous.as_ref().map(ChunkIdentifier::index);
538                        let new = new.index();
539                        let next = next.as_ref().map(ChunkIdentifier::index);
540
541                        trace!(
542                            %linked_chunk_id,
543                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
544                        );
545
546                        insert_chunk(
547                            txn,
548                            &hashed_linked_chunk_id,
549                            previous,
550                            new,
551                            next,
552                            CHUNK_TYPE_EVENT_TYPE_STRING,
553                        )?;
554                    }
555
556                    Update::NewGapChunk { previous, new, next, gap } => {
557                        let serialized = serde_json::to_vec(&gap.prev_token)?;
558                        let prev_token = this.encode_value(serialized)?;
559
560                        let previous = previous.as_ref().map(ChunkIdentifier::index);
561                        let new = new.index();
562                        let next = next.as_ref().map(ChunkIdentifier::index);
563
564                        trace!(
565                            %linked_chunk_id,
566                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
567                        );
568
569                        // Insert the chunk as a gap.
570                        insert_chunk(
571                            txn,
572                            &hashed_linked_chunk_id,
573                            previous,
574                            new,
575                            next,
576                            CHUNK_TYPE_GAP_TYPE_STRING,
577                        )?;
578
579                        // Insert the gap's value.
580                        txn.execute(
581                            r#"
582                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
583                            VALUES (?, ?, ?)
584                        "#,
585                            (new, &hashed_linked_chunk_id, prev_token),
586                        )?;
587                    }
588
589                    Update::RemoveChunk(chunk_identifier) => {
590                        let chunk_id = chunk_identifier.index();
591
592                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
593
594                        // Find chunk to delete.
595                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
596                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
597                            (chunk_id, &hashed_linked_chunk_id),
598                            |row| Ok((row.get(0)?, row.get(1)?))
599                        )?;
600
601                        // Replace its previous' next to its own next.
602                        if let Some(previous) = previous {
603                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
604                        }
605
606                        // Replace its next' previous to its own previous.
607                        if let Some(next) = next {
608                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
609                        }
610
611                        // Now delete it, and let cascading delete corresponding entries in the
612                        // other data tables.
613                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
614                    }
615
616                    Update::PushItems { at, items } => {
617                        if items.is_empty() {
618                            // Should never happens, but better be safe.
619                            continue;
620                        }
621
622                        let chunk_id = at.chunk_identifier().index();
623
624                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
625
626                        let mut chunk_statement = txn.prepare(
627                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
628                        )?;
629
630                        // Note: we use `OR REPLACE` here, because the event might have been
631                        // already inserted in the database. This is the case when an event is
632                        // deduplicated and moved to another position; or because it was inserted
633                        // outside the context of a linked chunk (e.g. pinned event).
634                        let mut content_statement = txn.prepare(
635                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
636                        )?;
637
638                        let invalid_event = |event: TimelineEvent| {
639                            let Some(event_id) = event.event_id() else {
640                                error!(%linked_chunk_id, "Trying to push an event with no ID");
641                                return None;
642                            };
643
644                            Some((event_id.to_string(), event))
645                        };
646
647                        let room_id = linked_chunk_id.room_id();
648                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
649
650                        for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
651                            // Insert the location information into the database.
652                            let index = at.index() + i;
653                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
654
655                            // Now, insert the event content into the database.
656                            let encoded_event = this.encode_event(&event)?;
657                            content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
658                        }
659                    }
660
661                    Update::ReplaceItem { at, item: event } => {
662                        let chunk_id = at.chunk_identifier().index();
663
664                        let index = at.index();
665
666                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
667
668                        // The event id should be the same, but just in case it changed…
669                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
670                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
671                            continue;
672                        };
673
674                        // Replace the event's content. Really we'd like to update, but in case the
675                        // event id changed, we are a bit lenient here and will allow an insertion
676                        // of the new event.
677                        let encoded_event = this.encode_event(&event)?;
678                        let room_id = linked_chunk_id.room_id();
679                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
680                        txn.execute(
681                            "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
682                        , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
683
684                        // Replace the event id in the linked chunk, in case it changed.
685                        txn.execute(
686                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
687                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
688                        )?;
689                    }
690
691                    Update::RemoveItem { at } => {
692                        let chunk_id = at.chunk_identifier().index();
693                        let index = at.index();
694
695                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
696
697                        // Remove the entry in the chunk table.
698                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
699
700                        // Decrement the index of each item after the one we are
701                        // going to remove.
702                        //
703                        // Imagine we have the following events:
704                        //
705                        // | event_id | linked_chunk_id | chunk_id | position |
706                        // |----------|-----------------|----------|----------|
707                        // | $ev0     | !r0             | 42       | 0        |
708                        // | $ev1     | !r0             | 42       | 1        |
709                        // | $ev2     | !r0             | 42       | 2        |
710                        // | $ev3     | !r0             | 42       | 3        |
711                        // | $ev4     | !r0             | 42       | 4        |
712                        //
713                        // `$ev2` has been removed, then we end up in this
714                        // state:
715                        //
716                        // | event_id | linked_chunk_id    | chunk_id | position |
717                        // |----------|--------------------|----------|----------|
718                        // | $ev0     | !r0                | 42       | 0        |
719                        // | $ev1     | !r0                | 42       | 1        |
720                        // |          |                    |          |          | <- no more `$ev2`
721                        // | $ev3     | !r0                | 42       | 3        |
722                        // | $ev4     | !r0                | 42       | 4        |
723                        //
724                        // We need to shift the `position` of `$ev3` and `$ev4`
725                        // to `position - 1`, like so:
726                        //
727                        // | event_id | linked_chunk_id | chunk_id | position |
728                        // |----------|-----------------|----------|----------|
729                        // | $ev0     | !r0             | 42       | 0        |
730                        // | $ev1     | !r0             | 42       | 1        |
731                        // | $ev3     | !r0             | 42       | 2        |
732                        // | $ev4     | !r0             | 42       | 3        |
733                        //
734                        // Usually, it boils down to run the following query:
735                        //
736                        // ```sql
737                        // UPDATE event_chunks
738                        // SET position = position - 1
739                        // WHERE position > 2 AND …
740                        // ```
741                        //
742                        // Okay. But `UPDATE` runs on rows in no particular
743                        // order. It means that it can update `$ev4` before
744                        // `$ev3` for example. What happens in this particular
745                        // case? The `position` of `$ev4` becomes `3`, however
746                        // `$ev3` already has `position = 3`. Because there
747                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
748                        // position)`, it will result in a constraint violation.
749                        //
750                        // There is **no way** to control the execution order of
751                        // `UPDATE` in SQLite. To persuade yourself, try:
752                        //
753                        // ```sql
754                        // UPDATE event_chunks
755                        // SET position = position - 1
756                        // FROM (
757                        //     SELECT event_id
758                        //     FROM event_chunks
759                        //     WHERE position > 2 AND …
760                        //     ORDER BY position ASC
761                        // ) as ordered
762                        // WHERE event_chunks.event_id = ordered.event_id
763                        // ```
764                        //
765                        // It will fail the same way.
766                        //
767                        // Thus, we have 2 solutions:
768                        //
769                        // 1. Remove the `UNIQUE` constraint,
770                        // 2. Be creative.
771                        //
772                        // The `UNIQUE` constraint is a safe belt. Normally, we
773                        // have `event_cache::Deduplicator` that is responsible
774                        // to ensure there is no duplicated event. However,
775                        // relying on this is “fragile” in the sense it can
776                        // contain bugs. Relying on the `UNIQUE` constraint from
777                        // SQLite is more robust. It's “braces and belt” as we
778                        // say here.
779                        //
780                        // So. We need to be creative.
781                        //
782                        // Many solutions exist. Amongst the most popular, we
783                        // see _dropping and re-creating the index_, which is
784                        // no-go for us, it's too expensive. I (@hywan) have
785                        // adopted the following one:
786                        //
787                        // - Do `position = position - 1` but in the negative
788                        //   space, so `position = -(position - 1)`. A position
789                        //   cannot be negative; we are sure it is unique!
790                        // - Once all candidate rows are updated, do `position =
791                        //   -position` to move back to the positive space.
792                        //
793                        // 'told you it's gonna be creative.
794                        //
795                        // This solution is a hack, **but** it is a small
796                        // number of operations, and we can keep the `UNIQUE`
797                        // constraint in place.
798                        txn.execute(
799                            r#"
800                                UPDATE event_chunks
801                                SET position = -(position - 1)
802                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
803                            "#,
804                            (&hashed_linked_chunk_id, chunk_id, index)
805                        )?;
806                        txn.execute(
807                            r#"
808                                UPDATE event_chunks
809                                SET position = -position
810                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
811                            "#,
812                            (&hashed_linked_chunk_id, chunk_id)
813                        )?;
814                    }
815
816                    Update::DetachLastItems { at } => {
817                        let chunk_id = at.chunk_identifier().index();
818                        let index = at.index();
819
820                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
821
822                        // Remove these entries.
823                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
824                    }
825
826                    Update::Clear => {
827                        trace!(%linked_chunk_id, "clearing items");
828
829                        // Remove chunks, and let cascading do its job.
830                        txn.execute(
831                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
832                            (&hashed_linked_chunk_id,),
833                        )?;
834                    }
835
836                    Update::StartReattachItems | Update::EndReattachItems => {
837                        // Nothing.
838                    }
839                }
840            }
841
842            Ok(())
843        })
844        .await?;
845
846        Ok(())
847    }
848
849    #[instrument(skip(self))]
850    async fn load_all_chunks(
851        &self,
852        linked_chunk_id: LinkedChunkId<'_>,
853    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
854        let _timer = timer!("method");
855
856        let hashed_linked_chunk_id =
857            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
858
859        let this = self.clone();
860
861        let result = self
862            .read()
863            .await?
864            .with_transaction(move |txn| -> Result<_> {
865                let mut items = Vec::new();
866
867                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
868                for data in txn
869                    .prepare(
870                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
871                    )?
872                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
873                {
874                    let (id, previous, next, chunk_type) = data?;
875                    let new = txn.rebuild_chunk(
876                        &this,
877                        &hashed_linked_chunk_id,
878                        previous,
879                        id,
880                        next,
881                        chunk_type.as_str(),
882                    )?;
883                    items.push(new);
884                }
885
886                Ok(items)
887            })
888            .await?;
889
890        Ok(result)
891    }
892
893    #[instrument(skip(self))]
894    async fn load_all_chunks_metadata(
895        &self,
896        linked_chunk_id: LinkedChunkId<'_>,
897    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
898        let _timer = timer!("method");
899
900        let hashed_linked_chunk_id =
901            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
902
903        self.read()
904            .await?
905            .with_transaction(move |txn| -> Result<_> {
906                // We want to collect the metadata about each chunk (id, next, previous), and
907                // for event chunks, the number of events in it. For gaps, the
908                // number of events is 0, by convention.
909                //
910                // We've tried different strategies over time:
911                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
912                //   caused a full table traversal for each chunk, including for gaps which
913                //   don't have any events. This happened in
914                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
915                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
916                //   additional `SELECT` query. It was an immense improvement, but still caused
917                //   one select query per event chunk. This happened in
918                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
919                //
920                // The current solution is to run two queries:
921                // - one to get each chunk and its number of events, by doing a single `SELECT`
922                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
923                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
924                //   hashmap.
925                // - one to get each chunk's metadata (id, previous, next, type) from the
926                //   database with a `SELECT`, and then use the hashmap to get the number of
927                //   events.
928                //
929                // This strategy minimizes the number of queries to the database, and keeps them
930                // super simple, while doing a bit more processing here, which is much faster.
931
932                let num_events_by_chunk_ids = txn
933                    .prepare(
934                        r#"
935                            SELECT ec.chunk_id, COUNT(ec.event_id)
936                            FROM event_chunks as ec
937                            WHERE ec.linked_chunk_id = ?
938                            GROUP BY ec.chunk_id
939                        "#,
940                    )?
941                    .query_map((&hashed_linked_chunk_id,), |row| {
942                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
943                    })?
944                    .collect::<Result<HashMap<_, _>, _>>()?;
945
946                txn.prepare(
947                    r#"
948                        SELECT
949                            lc.id,
950                            lc.previous,
951                            lc.next,
952                            lc.type
953                        FROM linked_chunks as lc
954                        WHERE lc.linked_chunk_id = ?
955                        ORDER BY lc.id"#,
956                )?
957                .query_map((&hashed_linked_chunk_id,), |row| {
958                    Ok((
959                        row.get::<_, u64>(0)?,
960                        row.get::<_, Option<u64>>(1)?,
961                        row.get::<_, Option<u64>>(2)?,
962                        row.get::<_, String>(3)?,
963                    ))
964                })?
965                .map(|data| -> Result<_> {
966                    let (id, previous, next, chunk_type) = data?;
967
968                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
969                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
970                    // benchmarking shows that this is slightly slower than matching the chunk
971                    // type (around 1%, so in the realm of noise), so we keep the explicit
972                    // check instead.
973                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
974                        0
975                    } else {
976                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
977                    };
978
979                    Ok(ChunkMetadata {
980                        identifier: ChunkIdentifier::new(id),
981                        previous: previous.map(ChunkIdentifier::new),
982                        next: next.map(ChunkIdentifier::new),
983                        num_items,
984                    })
985                })
986                .collect::<Result<Vec<_>, _>>()
987            })
988            .await
989    }
990
991    #[instrument(skip(self))]
992    async fn load_last_chunk(
993        &self,
994        linked_chunk_id: LinkedChunkId<'_>,
995    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
996        let _timer = timer!("method");
997
998        let hashed_linked_chunk_id =
999            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1000
1001        let this = self.clone();
1002
1003        self
1004            .read()
1005            .await?
1006            .with_transaction(move |txn| -> Result<_> {
1007                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1008                let (observed_max_identifier, number_of_chunks) = txn
1009                    .prepare(
1010                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1011                    )?
1012                    .query_row(
1013                        (&hashed_linked_chunk_id,),
1014                        |row| {
1015                            Ok((
1016                                // Read the `MAX(id)` as an `Option<u64>` instead
1017                                // of `u64` in case the `SELECT` returns nothing.
1018                                // Indeed, if it returns no line, the `MAX(id)` is
1019                                // set to `Null`.
1020                                row.get::<_, Option<u64>>(0)?,
1021                                row.get::<_, u64>(1)?,
1022                            ))
1023                        }
1024                    )?;
1025
1026                let chunk_identifier_generator = match observed_max_identifier {
1027                    Some(max_observed_identifier) => {
1028                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1029                            ChunkIdentifier::new(max_observed_identifier)
1030                        )
1031                    },
1032                    None => ChunkIdentifierGenerator::new_from_scratch(),
1033                };
1034
1035                // Find the last chunk.
1036                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1037                    .prepare(
1038                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1039                    )?
1040                    .query_row(
1041                        (&hashed_linked_chunk_id,),
1042                        |row| {
1043                            Ok((
1044                                row.get::<_, u64>(0)?,
1045                                row.get::<_, Option<u64>>(1)?,
1046                                row.get::<_, String>(2)?,
1047                            ))
1048                        }
1049                    )
1050                    .optional()?
1051                else {
1052                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1053                    // good.
1054                    if number_of_chunks == 0 {
1055                        return Ok((None, chunk_identifier_generator));
1056                    }
1057                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1058                    // linked chunk is malformed.
1059                    //
1060                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1061                    else {
1062                        return Err(Error::InvalidData {
1063                            details:
1064                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1065                                    .to_owned()
1066                            }
1067                        )
1068                    }
1069                };
1070
1071                // Build the chunk.
1072                let last_chunk = txn.rebuild_chunk(
1073                    &this,
1074                    &hashed_linked_chunk_id,
1075                    previous_chunk,
1076                    chunk_identifier,
1077                    None,
1078                    &chunk_type
1079                )?;
1080
1081                Ok((Some(last_chunk), chunk_identifier_generator))
1082            })
1083            .await
1084    }
1085
1086    #[instrument(skip(self))]
1087    async fn load_previous_chunk(
1088        &self,
1089        linked_chunk_id: LinkedChunkId<'_>,
1090        before_chunk_identifier: ChunkIdentifier,
1091    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1092        let _timer = timer!("method");
1093
1094        let hashed_linked_chunk_id =
1095            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1096
1097        let this = self.clone();
1098
1099        self
1100            .read()
1101            .await?
1102            .with_transaction(move |txn| -> Result<_> {
1103                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1104                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1105                    .prepare(
1106                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1107                    )?
1108                    .query_row(
1109                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1110                        |row| {
1111                            Ok((
1112                                row.get::<_, u64>(0)?,
1113                                row.get::<_, Option<u64>>(1)?,
1114                                row.get::<_, Option<u64>>(2)?,
1115                                row.get::<_, String>(3)?,
1116                            ))
1117                        }
1118                    )
1119                    .optional()?
1120                else {
1121                    // Chunk is not found.
1122                    return Ok(None);
1123                };
1124
1125                // Build the chunk.
1126                let last_chunk = txn.rebuild_chunk(
1127                    &this,
1128                    &hashed_linked_chunk_id,
1129                    previous_chunk,
1130                    chunk_identifier,
1131                    next_chunk,
1132                    &chunk_type
1133                )?;
1134
1135                Ok(Some(last_chunk))
1136            })
1137            .await
1138    }
1139
1140    #[instrument(skip(self))]
1141    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1142        let _timer = timer!("method");
1143
1144        self.write()
1145            .await?
1146            .with_transaction(move |txn| {
1147                // Remove all the chunks, and let cascading do its job.
1148                txn.execute("DELETE FROM linked_chunks", ())?;
1149                // Also clear all the events' contents.
1150                txn.execute("DELETE FROM events", ())
1151            })
1152            .await?;
1153
1154        Ok(())
1155    }
1156
1157    #[instrument(skip(self, events))]
1158    async fn filter_duplicated_events(
1159        &self,
1160        linked_chunk_id: LinkedChunkId<'_>,
1161        events: Vec<OwnedEventId>,
1162    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1163        let _timer = timer!("method");
1164
1165        // If there's no events for which we want to check duplicates, we can return
1166        // early. It's not only an optimization to do so: it's required, otherwise the
1167        // `repeat_vars` call below will panic.
1168        if events.is_empty() {
1169            return Ok(Vec::new());
1170        }
1171
1172        // Select all events that exist in the store, i.e. the duplicates.
1173        let hashed_linked_chunk_id =
1174            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1175        let linked_chunk_id = linked_chunk_id.to_owned();
1176
1177        self.read()
1178            .await?
1179            .with_transaction(move |txn| -> Result<_> {
1180                txn.chunk_large_query_over(events, None, move |txn, events| {
1181                    let query = format!(
1182                        r#"
1183                            SELECT event_id, chunk_id, position
1184                            FROM event_chunks
1185                            WHERE linked_chunk_id = ? AND event_id IN ({})
1186                            ORDER BY chunk_id ASC, position ASC
1187                        "#,
1188                        repeat_vars(events.len()),
1189                    );
1190
1191                    let parameters = params_from_iter(
1192                        // parameter for `linked_chunk_id = ?`
1193                        once(
1194                            hashed_linked_chunk_id
1195                                .to_sql()
1196                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1197                                .unwrap(),
1198                        )
1199                        // parameters for `event_id IN (…)`
1200                        .chain(events.iter().map(|event| {
1201                            event
1202                                .as_str()
1203                                .to_sql()
1204                                // SAFETY: it cannot fail since `str::to_sql` never fails
1205                                .unwrap()
1206                        })),
1207                    );
1208
1209                    let mut duplicated_events = Vec::new();
1210
1211                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1212                        Ok((
1213                            row.get::<_, String>(0)?,
1214                            row.get::<_, u64>(1)?,
1215                            row.get::<_, usize>(2)?,
1216                        ))
1217                    })? {
1218                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1219
1220                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1221                            // Normally unreachable, but the event ID has been stored even if it is
1222                            // malformed, let's skip it.
1223                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1224                            continue;
1225                        };
1226
1227                        duplicated_events.push((
1228                            duplicated_event,
1229                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1230                        ));
1231                    }
1232
1233                    Ok(duplicated_events)
1234                })
1235            })
1236            .await
1237    }
1238
1239    #[instrument(skip(self, event_id))]
1240    async fn find_event(
1241        &self,
1242        room_id: &RoomId,
1243        event_id: &EventId,
1244    ) -> Result<Option<Event>, Self::Error> {
1245        let _timer = timer!("method");
1246
1247        let event_id = event_id.to_owned();
1248        let this = self.clone();
1249
1250        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1251
1252        self.read()
1253            .await?
1254            .with_transaction(move |txn| -> Result<_> {
1255                let Some(event) = txn
1256                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1257                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1258                    .optional()?
1259                else {
1260                    // Event is not found.
1261                    return Ok(None);
1262                };
1263
1264                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1265
1266                Ok(Some(event))
1267            })
1268            .await
1269    }
1270
1271    #[instrument(skip(self, event_id, filters))]
1272    async fn find_event_relations(
1273        &self,
1274        room_id: &RoomId,
1275        event_id: &EventId,
1276        filters: Option<&[RelationType]>,
1277    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1278        let _timer = timer!("method");
1279
1280        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1281
1282        let hashed_linked_chunk_id =
1283            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1284
1285        let event_id = event_id.to_owned();
1286        let filters = filters.map(ToOwned::to_owned);
1287        let store = self.clone();
1288
1289        self.read()
1290            .await?
1291            .with_transaction(move |txn| -> Result<_> {
1292                find_event_relations_transaction(
1293                    store,
1294                    hashed_room_id,
1295                    hashed_linked_chunk_id,
1296                    event_id,
1297                    filters,
1298                    txn,
1299                )
1300            })
1301            .await
1302    }
1303
1304    #[instrument(skip(self))]
1305    async fn get_room_events(&self, room_id: &RoomId) -> Result<Vec<Event>, Self::Error> {
1306        let _timer = timer!("method");
1307
1308        let this = self.clone();
1309
1310        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1311
1312        self.read()
1313            .await?
1314            .with_transaction(move |txn| -> Result<_> {
1315                let mut statement = txn.prepare("SELECT content FROM events WHERE room_id = ?")?;
1316                let maybe_events =
1317                    statement.query_map((hashed_room_id,), |row| row.get::<_, Vec<u8>>(0))?;
1318
1319                let mut events = Vec::new();
1320                for ev in maybe_events {
1321                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1322                    events.push(event);
1323                }
1324
1325                Ok(events)
1326            })
1327            .await
1328    }
1329
1330    #[instrument(skip(self, event))]
1331    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1332        let _timer = timer!("method");
1333
1334        let Some(event_id) = event.event_id() else {
1335            error!(%room_id, "Trying to save an event with no ID");
1336            return Ok(());
1337        };
1338
1339        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1340        let event_id = event_id.to_string();
1341        let encoded_event = self.encode_event(&event)?;
1342
1343        self.write()
1344            .await?
1345            .with_transaction(move |txn| -> Result<_> {
1346                txn.execute(
1347                    "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1348                    , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1349
1350                Ok(())
1351            })
1352            .await
1353    }
1354}
1355
1356fn find_event_relations_transaction(
1357    store: SqliteEventCacheStore,
1358    hashed_room_id: Key,
1359    hashed_linked_chunk_id: Key,
1360    event_id: OwnedEventId,
1361    filters: Option<Vec<RelationType>>,
1362    txn: &Transaction<'_>,
1363) -> Result<Vec<(Event, Option<Position>)>> {
1364    let get_rows = |row: &rusqlite::Row<'_>| {
1365        Ok((
1366            row.get::<_, Vec<u8>>(0)?,
1367            row.get::<_, Option<u64>>(1)?,
1368            row.get::<_, Option<usize>>(2)?,
1369        ))
1370    };
1371
1372    // Collect related events.
1373    let collect_results = |transaction| {
1374        let mut related = Vec::new();
1375
1376        for result in transaction {
1377            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1378
1379            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1380
1381            // Only build the position if both the chunk_id and position were present; in
1382            // theory, they should either be present at the same time, or not at all.
1383            let pos = chunk_id
1384                .zip(index)
1385                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1386
1387            related.push((event, pos));
1388        }
1389
1390        Ok(related)
1391    };
1392
1393    let related = if let Some(filters) = filters {
1394        let question_marks = repeat_vars(filters.len());
1395        let query = format!(
1396            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1397            FROM events
1398            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1399            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1400        );
1401
1402        // First the filters need to be stringified; because `.to_sql()` will borrow
1403        // from them, they also need to be stringified onto the stack, so as to
1404        // get a stable address (to avoid returning a temporary reference in the
1405        // map closure below).
1406        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1407        let filters_params: Vec<_> = filter_strings
1408            .iter()
1409            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1410            .collect();
1411
1412        let parameters = params_from_iter(
1413            [
1414                hashed_linked_chunk_id.to_sql().expect(
1415                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1416                ),
1417                event_id
1418                    .as_str()
1419                    .to_sql()
1420                    .expect("We should be able to convert an event ID to a SQLite value"),
1421                hashed_room_id
1422                    .to_sql()
1423                    .expect("We should be able to convert a room ID to a SQLite value"),
1424            ]
1425            .into_iter()
1426            .chain(filters_params),
1427        );
1428
1429        let mut transaction = txn.prepare(&query)?;
1430        let transaction = transaction.query_map(parameters, get_rows)?;
1431
1432        collect_results(transaction)
1433    } else {
1434        let query =
1435            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1436            FROM events
1437            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1438            WHERE relates_to = ? AND room_id = ?";
1439        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1440
1441        let mut transaction = txn.prepare(query)?;
1442        let transaction = transaction.query_map(parameters, get_rows)?;
1443
1444        collect_results(transaction)
1445    };
1446
1447    related
1448}
1449
1450/// Like `deadpool::managed::Object::with_transaction`, but starts the
1451/// transaction in immediate (write) mode from the beginning, precluding errors
1452/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1453/// both reads and writes, and start with a write.
1454async fn with_immediate_transaction<
1455    T: Send + 'static,
1456    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1457>(
1458    this: &SqliteEventCacheStore,
1459    f: F,
1460) -> Result<T, Error> {
1461    this.write()
1462        .await?
1463        .interact(move |conn| -> Result<T, Error> {
1464            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1465            // to avoid read transactions upgrading to write mode and causing
1466            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1467            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1468
1469            let code = || -> Result<T, Error> {
1470                let txn = conn.transaction()?;
1471                let res = f(&txn)?;
1472                txn.commit()?;
1473                Ok(res)
1474            };
1475
1476            let res = code();
1477
1478            // Reset the transaction behavior to use Deferred, after this transaction has
1479            // been run, whether it was successful or not.
1480            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1481
1482            res
1483        })
1484        .await
1485        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1486        .unwrap()
1487}
1488
1489fn insert_chunk(
1490    txn: &Transaction<'_>,
1491    linked_chunk_id: &Key,
1492    previous: Option<u64>,
1493    new: u64,
1494    next: Option<u64>,
1495    type_str: &str,
1496) -> rusqlite::Result<()> {
1497    // First, insert the new chunk.
1498    txn.execute(
1499        r#"
1500            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1501            VALUES (?, ?, ?, ?, ?)
1502        "#,
1503        (new, linked_chunk_id, previous, next, type_str),
1504    )?;
1505
1506    // If this chunk has a previous one, update its `next` field.
1507    if let Some(previous) = previous {
1508        txn.execute(
1509            r#"
1510                UPDATE linked_chunks
1511                SET next = ?
1512                WHERE id = ? AND linked_chunk_id = ?
1513            "#,
1514            (new, previous, linked_chunk_id),
1515        )?;
1516    }
1517
1518    // If this chunk has a next one, update its `previous` field.
1519    if let Some(next) = next {
1520        txn.execute(
1521            r#"
1522                UPDATE linked_chunks
1523                SET previous = ?
1524                WHERE id = ? AND linked_chunk_id = ?
1525            "#,
1526            (new, next, linked_chunk_id),
1527        )?;
1528    }
1529
1530    Ok(())
1531}
1532
1533#[cfg(test)]
1534mod tests {
1535    use std::{
1536        path::PathBuf,
1537        sync::atomic::{AtomicU32, Ordering::SeqCst},
1538    };
1539
1540    use assert_matches::assert_matches;
1541    use matrix_sdk_base::{
1542        event_cache::{
1543            store::{
1544                integration_tests::{
1545                    check_test_event, make_test_event, make_test_event_with_event_id,
1546                },
1547                EventCacheStore, EventCacheStoreError,
1548            },
1549            Gap,
1550        },
1551        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1552        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1553    };
1554    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1555    use once_cell::sync::Lazy;
1556    use ruma::{event_id, room_id};
1557    use tempfile::{tempdir, TempDir};
1558
1559    use super::SqliteEventCacheStore;
1560    use crate::{
1561        event_cache_store::keys,
1562        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1563        SqliteStoreConfig,
1564    };
1565
1566    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1567    static NUM: AtomicU32 = AtomicU32::new(0);
1568
1569    fn new_event_cache_store_workspace() -> PathBuf {
1570        let name = NUM.fetch_add(1, SeqCst).to_string();
1571        TMP_DIR.path().join(name)
1572    }
1573
1574    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1575        let tmpdir_path = new_event_cache_store_workspace();
1576
1577        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1578
1579        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1580    }
1581
1582    event_cache_store_integration_tests!();
1583    event_cache_store_integration_tests_time!();
1584
1585    #[async_test]
1586    async fn test_pool_size() {
1587        let tmpdir_path = new_event_cache_store_workspace();
1588        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1589
1590        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1591
1592        assert_eq!(store.pool.status().max_size, 42);
1593    }
1594
1595    #[async_test]
1596    async fn test_linked_chunk_new_items_chunk() {
1597        let store = get_event_cache_store().await.expect("creating cache store failed");
1598
1599        let room_id = &DEFAULT_TEST_ROOM_ID;
1600        let linked_chunk_id = LinkedChunkId::Room(room_id);
1601
1602        store
1603            .handle_linked_chunk_updates(
1604                linked_chunk_id,
1605                vec![
1606                    Update::NewItemsChunk {
1607                        previous: None,
1608                        new: ChunkIdentifier::new(42),
1609                        next: None, // Note: the store must link the next entry itself.
1610                    },
1611                    Update::NewItemsChunk {
1612                        previous: Some(ChunkIdentifier::new(42)),
1613                        new: ChunkIdentifier::new(13),
1614                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1615                                                               * the next link ahead of time. */
1616                    },
1617                    Update::NewItemsChunk {
1618                        previous: Some(ChunkIdentifier::new(13)),
1619                        new: ChunkIdentifier::new(37),
1620                        next: None,
1621                    },
1622                ],
1623            )
1624            .await
1625            .unwrap();
1626
1627        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1628
1629        assert_eq!(chunks.len(), 3);
1630
1631        {
1632            // Chunks are ordered from smaller to bigger IDs.
1633            let c = chunks.remove(0);
1634            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1635            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1636            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1637            assert_matches!(c.content, ChunkContent::Items(events) => {
1638                assert!(events.is_empty());
1639            });
1640
1641            let c = chunks.remove(0);
1642            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1643            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1644            assert_eq!(c.next, None);
1645            assert_matches!(c.content, ChunkContent::Items(events) => {
1646                assert!(events.is_empty());
1647            });
1648
1649            let c = chunks.remove(0);
1650            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1651            assert_eq!(c.previous, None);
1652            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1653            assert_matches!(c.content, ChunkContent::Items(events) => {
1654                assert!(events.is_empty());
1655            });
1656        }
1657    }
1658
1659    #[async_test]
1660    async fn test_linked_chunk_new_gap_chunk() {
1661        let store = get_event_cache_store().await.expect("creating cache store failed");
1662
1663        let room_id = &DEFAULT_TEST_ROOM_ID;
1664        let linked_chunk_id = LinkedChunkId::Room(room_id);
1665
1666        store
1667            .handle_linked_chunk_updates(
1668                linked_chunk_id,
1669                vec![Update::NewGapChunk {
1670                    previous: None,
1671                    new: ChunkIdentifier::new(42),
1672                    next: None,
1673                    gap: Gap { prev_token: "raclette".to_owned() },
1674                }],
1675            )
1676            .await
1677            .unwrap();
1678
1679        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1680
1681        assert_eq!(chunks.len(), 1);
1682
1683        // Chunks are ordered from smaller to bigger IDs.
1684        let c = chunks.remove(0);
1685        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1686        assert_eq!(c.previous, None);
1687        assert_eq!(c.next, None);
1688        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1689            assert_eq!(gap.prev_token, "raclette");
1690        });
1691    }
1692
1693    #[async_test]
1694    async fn test_linked_chunk_replace_item() {
1695        let store = get_event_cache_store().await.expect("creating cache store failed");
1696
1697        let room_id = &DEFAULT_TEST_ROOM_ID;
1698        let linked_chunk_id = LinkedChunkId::Room(room_id);
1699        let event_id = event_id!("$world");
1700
1701        store
1702            .handle_linked_chunk_updates(
1703                linked_chunk_id,
1704                vec![
1705                    Update::NewItemsChunk {
1706                        previous: None,
1707                        new: ChunkIdentifier::new(42),
1708                        next: None,
1709                    },
1710                    Update::PushItems {
1711                        at: Position::new(ChunkIdentifier::new(42), 0),
1712                        items: vec![
1713                            make_test_event(room_id, "hello"),
1714                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1715                        ],
1716                    },
1717                    Update::ReplaceItem {
1718                        at: Position::new(ChunkIdentifier::new(42), 1),
1719                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1720                    },
1721                ],
1722            )
1723            .await
1724            .unwrap();
1725
1726        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1727
1728        assert_eq!(chunks.len(), 1);
1729
1730        let c = chunks.remove(0);
1731        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1732        assert_eq!(c.previous, None);
1733        assert_eq!(c.next, None);
1734        assert_matches!(c.content, ChunkContent::Items(events) => {
1735            assert_eq!(events.len(), 2);
1736            check_test_event(&events[0], "hello");
1737            check_test_event(&events[1], "yolo");
1738        });
1739    }
1740
1741    #[async_test]
1742    async fn test_linked_chunk_remove_chunk() {
1743        let store = get_event_cache_store().await.expect("creating cache store failed");
1744
1745        let room_id = &DEFAULT_TEST_ROOM_ID;
1746        let linked_chunk_id = LinkedChunkId::Room(room_id);
1747
1748        store
1749            .handle_linked_chunk_updates(
1750                linked_chunk_id,
1751                vec![
1752                    Update::NewGapChunk {
1753                        previous: None,
1754                        new: ChunkIdentifier::new(42),
1755                        next: None,
1756                        gap: Gap { prev_token: "raclette".to_owned() },
1757                    },
1758                    Update::NewGapChunk {
1759                        previous: Some(ChunkIdentifier::new(42)),
1760                        new: ChunkIdentifier::new(43),
1761                        next: None,
1762                        gap: Gap { prev_token: "fondue".to_owned() },
1763                    },
1764                    Update::NewGapChunk {
1765                        previous: Some(ChunkIdentifier::new(43)),
1766                        new: ChunkIdentifier::new(44),
1767                        next: None,
1768                        gap: Gap { prev_token: "tartiflette".to_owned() },
1769                    },
1770                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1771                ],
1772            )
1773            .await
1774            .unwrap();
1775
1776        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1777
1778        assert_eq!(chunks.len(), 2);
1779
1780        // Chunks are ordered from smaller to bigger IDs.
1781        let c = chunks.remove(0);
1782        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1783        assert_eq!(c.previous, None);
1784        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1785        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1786            assert_eq!(gap.prev_token, "raclette");
1787        });
1788
1789        let c = chunks.remove(0);
1790        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1791        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1792        assert_eq!(c.next, None);
1793        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1794            assert_eq!(gap.prev_token, "tartiflette");
1795        });
1796
1797        // Check that cascading worked. Yes, SQLite, I doubt you.
1798        let gaps = store
1799            .read()
1800            .await
1801            .unwrap()
1802            .with_transaction(|txn| -> rusqlite::Result<_> {
1803                let mut gaps = Vec::new();
1804                for data in txn
1805                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1806                    .query_map((), |row| row.get::<_, u64>(0))?
1807                {
1808                    gaps.push(data?);
1809                }
1810                Ok(gaps)
1811            })
1812            .await
1813            .unwrap();
1814
1815        assert_eq!(gaps, vec![42, 44]);
1816    }
1817
1818    #[async_test]
1819    async fn test_linked_chunk_push_items() {
1820        let store = get_event_cache_store().await.expect("creating cache store failed");
1821
1822        let room_id = &DEFAULT_TEST_ROOM_ID;
1823        let linked_chunk_id = LinkedChunkId::Room(room_id);
1824
1825        store
1826            .handle_linked_chunk_updates(
1827                linked_chunk_id,
1828                vec![
1829                    Update::NewItemsChunk {
1830                        previous: None,
1831                        new: ChunkIdentifier::new(42),
1832                        next: None,
1833                    },
1834                    Update::PushItems {
1835                        at: Position::new(ChunkIdentifier::new(42), 0),
1836                        items: vec![
1837                            make_test_event(room_id, "hello"),
1838                            make_test_event(room_id, "world"),
1839                        ],
1840                    },
1841                    Update::PushItems {
1842                        at: Position::new(ChunkIdentifier::new(42), 2),
1843                        items: vec![make_test_event(room_id, "who?")],
1844                    },
1845                ],
1846            )
1847            .await
1848            .unwrap();
1849
1850        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1851
1852        assert_eq!(chunks.len(), 1);
1853
1854        let c = chunks.remove(0);
1855        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1856        assert_eq!(c.previous, None);
1857        assert_eq!(c.next, None);
1858        assert_matches!(c.content, ChunkContent::Items(events) => {
1859            assert_eq!(events.len(), 3);
1860
1861            check_test_event(&events[0], "hello");
1862            check_test_event(&events[1], "world");
1863            check_test_event(&events[2], "who?");
1864        });
1865    }
1866
1867    #[async_test]
1868    async fn test_linked_chunk_remove_item() {
1869        let store = get_event_cache_store().await.expect("creating cache store failed");
1870
1871        let room_id = *DEFAULT_TEST_ROOM_ID;
1872        let linked_chunk_id = LinkedChunkId::Room(room_id);
1873
1874        store
1875            .handle_linked_chunk_updates(
1876                linked_chunk_id,
1877                vec![
1878                    Update::NewItemsChunk {
1879                        previous: None,
1880                        new: ChunkIdentifier::new(42),
1881                        next: None,
1882                    },
1883                    Update::PushItems {
1884                        at: Position::new(ChunkIdentifier::new(42), 0),
1885                        items: vec![
1886                            make_test_event(room_id, "one"),
1887                            make_test_event(room_id, "two"),
1888                            make_test_event(room_id, "three"),
1889                            make_test_event(room_id, "four"),
1890                            make_test_event(room_id, "five"),
1891                            make_test_event(room_id, "six"),
1892                        ],
1893                    },
1894                    Update::RemoveItem {
1895                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
1896                    },
1897                ],
1898            )
1899            .await
1900            .unwrap();
1901
1902        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1903
1904        assert_eq!(chunks.len(), 1);
1905
1906        let c = chunks.remove(0);
1907        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1908        assert_eq!(c.previous, None);
1909        assert_eq!(c.next, None);
1910        assert_matches!(c.content, ChunkContent::Items(events) => {
1911            assert_eq!(events.len(), 5);
1912            check_test_event(&events[0], "one");
1913            check_test_event(&events[1], "two");
1914            check_test_event(&events[2], "four");
1915            check_test_event(&events[3], "five");
1916            check_test_event(&events[4], "six");
1917        });
1918
1919        // Make sure the position have been updated for the remaining events.
1920        let num_rows: u64 = store
1921            .read()
1922            .await
1923            .unwrap()
1924            .with_transaction(move |txn| {
1925                txn.query_row(
1926                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1927                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1928                    |row| row.get(0),
1929                )
1930            })
1931            .await
1932            .unwrap();
1933        assert_eq!(num_rows, 3);
1934    }
1935
1936    #[async_test]
1937    async fn test_linked_chunk_detach_last_items() {
1938        let store = get_event_cache_store().await.expect("creating cache store failed");
1939
1940        let room_id = *DEFAULT_TEST_ROOM_ID;
1941        let linked_chunk_id = LinkedChunkId::Room(room_id);
1942
1943        store
1944            .handle_linked_chunk_updates(
1945                linked_chunk_id,
1946                vec![
1947                    Update::NewItemsChunk {
1948                        previous: None,
1949                        new: ChunkIdentifier::new(42),
1950                        next: None,
1951                    },
1952                    Update::PushItems {
1953                        at: Position::new(ChunkIdentifier::new(42), 0),
1954                        items: vec![
1955                            make_test_event(room_id, "hello"),
1956                            make_test_event(room_id, "world"),
1957                            make_test_event(room_id, "howdy"),
1958                        ],
1959                    },
1960                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1961                ],
1962            )
1963            .await
1964            .unwrap();
1965
1966        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1967
1968        assert_eq!(chunks.len(), 1);
1969
1970        let c = chunks.remove(0);
1971        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1972        assert_eq!(c.previous, None);
1973        assert_eq!(c.next, None);
1974        assert_matches!(c.content, ChunkContent::Items(events) => {
1975            assert_eq!(events.len(), 1);
1976            check_test_event(&events[0], "hello");
1977        });
1978    }
1979
1980    #[async_test]
1981    async fn test_linked_chunk_start_end_reattach_items() {
1982        let store = get_event_cache_store().await.expect("creating cache store failed");
1983
1984        let room_id = *DEFAULT_TEST_ROOM_ID;
1985        let linked_chunk_id = LinkedChunkId::Room(room_id);
1986
1987        // Same updates and checks as test_linked_chunk_push_items, but with extra
1988        // `StartReattachItems` and `EndReattachItems` updates, which must have no
1989        // effects.
1990        store
1991            .handle_linked_chunk_updates(
1992                linked_chunk_id,
1993                vec![
1994                    Update::NewItemsChunk {
1995                        previous: None,
1996                        new: ChunkIdentifier::new(42),
1997                        next: None,
1998                    },
1999                    Update::PushItems {
2000                        at: Position::new(ChunkIdentifier::new(42), 0),
2001                        items: vec![
2002                            make_test_event(room_id, "hello"),
2003                            make_test_event(room_id, "world"),
2004                            make_test_event(room_id, "howdy"),
2005                        ],
2006                    },
2007                    Update::StartReattachItems,
2008                    Update::EndReattachItems,
2009                ],
2010            )
2011            .await
2012            .unwrap();
2013
2014        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2015
2016        assert_eq!(chunks.len(), 1);
2017
2018        let c = chunks.remove(0);
2019        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2020        assert_eq!(c.previous, None);
2021        assert_eq!(c.next, None);
2022        assert_matches!(c.content, ChunkContent::Items(events) => {
2023            assert_eq!(events.len(), 3);
2024            check_test_event(&events[0], "hello");
2025            check_test_event(&events[1], "world");
2026            check_test_event(&events[2], "howdy");
2027        });
2028    }
2029
2030    #[async_test]
2031    async fn test_linked_chunk_clear() {
2032        let store = get_event_cache_store().await.expect("creating cache store failed");
2033
2034        let room_id = *DEFAULT_TEST_ROOM_ID;
2035        let linked_chunk_id = LinkedChunkId::Room(room_id);
2036        let event_0 = make_test_event(room_id, "hello");
2037        let event_1 = make_test_event(room_id, "world");
2038        let event_2 = make_test_event(room_id, "howdy");
2039
2040        store
2041            .handle_linked_chunk_updates(
2042                linked_chunk_id,
2043                vec![
2044                    Update::NewItemsChunk {
2045                        previous: None,
2046                        new: ChunkIdentifier::new(42),
2047                        next: None,
2048                    },
2049                    Update::NewGapChunk {
2050                        previous: Some(ChunkIdentifier::new(42)),
2051                        new: ChunkIdentifier::new(54),
2052                        next: None,
2053                        gap: Gap { prev_token: "fondue".to_owned() },
2054                    },
2055                    Update::PushItems {
2056                        at: Position::new(ChunkIdentifier::new(42), 0),
2057                        items: vec![event_0.clone(), event_1, event_2],
2058                    },
2059                    Update::Clear,
2060                ],
2061            )
2062            .await
2063            .unwrap();
2064
2065        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2066        assert!(chunks.is_empty());
2067
2068        // Check that cascading worked. Yes, SQLite, I doubt you.
2069        store
2070            .read()
2071            .await
2072            .unwrap()
2073            .with_transaction(|txn| -> rusqlite::Result<_> {
2074                let num_gaps = txn
2075                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2076                    .query_row((), |row| row.get::<_, u64>(0))?;
2077                assert_eq!(num_gaps, 0);
2078
2079                let num_events = txn
2080                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2081                    .query_row((), |row| row.get::<_, u64>(0))?;
2082                assert_eq!(num_events, 0);
2083
2084                Ok(())
2085            })
2086            .await
2087            .unwrap();
2088
2089        // It's okay to re-insert a past event.
2090        store
2091            .handle_linked_chunk_updates(
2092                linked_chunk_id,
2093                vec![
2094                    Update::NewItemsChunk {
2095                        previous: None,
2096                        new: ChunkIdentifier::new(42),
2097                        next: None,
2098                    },
2099                    Update::PushItems {
2100                        at: Position::new(ChunkIdentifier::new(42), 0),
2101                        items: vec![event_0],
2102                    },
2103                ],
2104            )
2105            .await
2106            .unwrap();
2107    }
2108
2109    #[async_test]
2110    async fn test_linked_chunk_multiple_rooms() {
2111        let store = get_event_cache_store().await.expect("creating cache store failed");
2112
2113        let room1 = room_id!("!realcheeselovers:raclette.fr");
2114        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2115        let room2 = room_id!("!realcheeselovers:fondue.ch");
2116        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2117
2118        // Check that applying updates to one room doesn't affect the others.
2119        // Use the same chunk identifier in both rooms to battle-test search.
2120
2121        store
2122            .handle_linked_chunk_updates(
2123                linked_chunk_id1,
2124                vec![
2125                    Update::NewItemsChunk {
2126                        previous: None,
2127                        new: ChunkIdentifier::new(42),
2128                        next: None,
2129                    },
2130                    Update::PushItems {
2131                        at: Position::new(ChunkIdentifier::new(42), 0),
2132                        items: vec![
2133                            make_test_event(room1, "best cheese is raclette"),
2134                            make_test_event(room1, "obviously"),
2135                        ],
2136                    },
2137                ],
2138            )
2139            .await
2140            .unwrap();
2141
2142        store
2143            .handle_linked_chunk_updates(
2144                linked_chunk_id2,
2145                vec![
2146                    Update::NewItemsChunk {
2147                        previous: None,
2148                        new: ChunkIdentifier::new(42),
2149                        next: None,
2150                    },
2151                    Update::PushItems {
2152                        at: Position::new(ChunkIdentifier::new(42), 0),
2153                        items: vec![make_test_event(room1, "beaufort is the best")],
2154                    },
2155                ],
2156            )
2157            .await
2158            .unwrap();
2159
2160        // Check chunks from room 1.
2161        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2162        assert_eq!(chunks_room1.len(), 1);
2163
2164        let c = chunks_room1.remove(0);
2165        assert_matches!(c.content, ChunkContent::Items(events) => {
2166            assert_eq!(events.len(), 2);
2167            check_test_event(&events[0], "best cheese is raclette");
2168            check_test_event(&events[1], "obviously");
2169        });
2170
2171        // Check chunks from room 2.
2172        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2173        assert_eq!(chunks_room2.len(), 1);
2174
2175        let c = chunks_room2.remove(0);
2176        assert_matches!(c.content, ChunkContent::Items(events) => {
2177            assert_eq!(events.len(), 1);
2178            check_test_event(&events[0], "beaufort is the best");
2179        });
2180    }
2181
2182    #[async_test]
2183    async fn test_linked_chunk_update_is_a_transaction() {
2184        let store = get_event_cache_store().await.expect("creating cache store failed");
2185
2186        let room_id = *DEFAULT_TEST_ROOM_ID;
2187        let linked_chunk_id = LinkedChunkId::Room(room_id);
2188
2189        // Trigger a violation of the unique constraint on the (room id, chunk id)
2190        // couple.
2191        let err = store
2192            .handle_linked_chunk_updates(
2193                linked_chunk_id,
2194                vec![
2195                    Update::NewItemsChunk {
2196                        previous: None,
2197                        new: ChunkIdentifier::new(42),
2198                        next: None,
2199                    },
2200                    Update::NewItemsChunk {
2201                        previous: None,
2202                        new: ChunkIdentifier::new(42),
2203                        next: None,
2204                    },
2205                ],
2206            )
2207            .await
2208            .unwrap_err();
2209
2210        // The operation fails with a constraint violation error.
2211        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2212            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2213        });
2214
2215        // If the updates have been handled transactionally, then no new chunks should
2216        // have been added; failure of the second update leads to the first one being
2217        // rolled back.
2218        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2219        assert!(chunks.is_empty());
2220    }
2221
2222    #[async_test]
2223    async fn test_filter_duplicate_events_no_events() {
2224        let store = get_event_cache_store().await.expect("creating cache store failed");
2225
2226        let room_id = *DEFAULT_TEST_ROOM_ID;
2227        let linked_chunk_id = LinkedChunkId::Room(room_id);
2228        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2229        assert!(duplicates.is_empty());
2230    }
2231
2232    #[async_test]
2233    async fn test_load_last_chunk() {
2234        let room_id = room_id!("!r0:matrix.org");
2235        let linked_chunk_id = LinkedChunkId::Room(room_id);
2236        let event = |msg: &str| make_test_event(room_id, msg);
2237        let store = get_event_cache_store().await.expect("creating cache store failed");
2238
2239        // Case #1: no last chunk.
2240        {
2241            let (last_chunk, chunk_identifier_generator) =
2242                store.load_last_chunk(linked_chunk_id).await.unwrap();
2243
2244            assert!(last_chunk.is_none());
2245            assert_eq!(chunk_identifier_generator.current(), 0);
2246        }
2247
2248        // Case #2: only one chunk is present.
2249        {
2250            store
2251                .handle_linked_chunk_updates(
2252                    linked_chunk_id,
2253                    vec![
2254                        Update::NewItemsChunk {
2255                            previous: None,
2256                            new: ChunkIdentifier::new(42),
2257                            next: None,
2258                        },
2259                        Update::PushItems {
2260                            at: Position::new(ChunkIdentifier::new(42), 0),
2261                            items: vec![event("saucisse de morteau"), event("comté")],
2262                        },
2263                    ],
2264                )
2265                .await
2266                .unwrap();
2267
2268            let (last_chunk, chunk_identifier_generator) =
2269                store.load_last_chunk(linked_chunk_id).await.unwrap();
2270
2271            assert_matches!(last_chunk, Some(last_chunk) => {
2272                assert_eq!(last_chunk.identifier, 42);
2273                assert!(last_chunk.previous.is_none());
2274                assert!(last_chunk.next.is_none());
2275                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2276                    assert_eq!(items.len(), 2);
2277                    check_test_event(&items[0], "saucisse de morteau");
2278                    check_test_event(&items[1], "comté");
2279                });
2280            });
2281            assert_eq!(chunk_identifier_generator.current(), 42);
2282        }
2283
2284        // Case #3: more chunks are present.
2285        {
2286            store
2287                .handle_linked_chunk_updates(
2288                    linked_chunk_id,
2289                    vec![
2290                        Update::NewItemsChunk {
2291                            previous: Some(ChunkIdentifier::new(42)),
2292                            new: ChunkIdentifier::new(7),
2293                            next: None,
2294                        },
2295                        Update::PushItems {
2296                            at: Position::new(ChunkIdentifier::new(7), 0),
2297                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2298                        },
2299                    ],
2300                )
2301                .await
2302                .unwrap();
2303
2304            let (last_chunk, chunk_identifier_generator) =
2305                store.load_last_chunk(linked_chunk_id).await.unwrap();
2306
2307            assert_matches!(last_chunk, Some(last_chunk) => {
2308                assert_eq!(last_chunk.identifier, 7);
2309                assert_matches!(last_chunk.previous, Some(previous) => {
2310                    assert_eq!(previous, 42);
2311                });
2312                assert!(last_chunk.next.is_none());
2313                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2314                    assert_eq!(items.len(), 3);
2315                    check_test_event(&items[0], "fondue");
2316                    check_test_event(&items[1], "gruyère");
2317                    check_test_event(&items[2], "mont d'or");
2318                });
2319            });
2320            assert_eq!(chunk_identifier_generator.current(), 42);
2321        }
2322    }
2323
2324    #[async_test]
2325    async fn test_load_last_chunk_with_a_cycle() {
2326        let room_id = room_id!("!r0:matrix.org");
2327        let linked_chunk_id = LinkedChunkId::Room(room_id);
2328        let store = get_event_cache_store().await.expect("creating cache store failed");
2329
2330        store
2331            .handle_linked_chunk_updates(
2332                linked_chunk_id,
2333                vec![
2334                    Update::NewItemsChunk {
2335                        previous: None,
2336                        new: ChunkIdentifier::new(0),
2337                        next: None,
2338                    },
2339                    Update::NewItemsChunk {
2340                        // Because `previous` connects to chunk #0, it will create a cycle.
2341                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2342                        // **does not exist**. We have to detect this cycle.
2343                        previous: Some(ChunkIdentifier::new(0)),
2344                        new: ChunkIdentifier::new(1),
2345                        next: Some(ChunkIdentifier::new(0)),
2346                    },
2347                ],
2348            )
2349            .await
2350            .unwrap();
2351
2352        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2353    }
2354
2355    #[async_test]
2356    async fn test_load_previous_chunk() {
2357        let room_id = room_id!("!r0:matrix.org");
2358        let linked_chunk_id = LinkedChunkId::Room(room_id);
2359        let event = |msg: &str| make_test_event(room_id, msg);
2360        let store = get_event_cache_store().await.expect("creating cache store failed");
2361
2362        // Case #1: no chunk at all, equivalent to having an nonexistent
2363        // `before_chunk_identifier`.
2364        {
2365            let previous_chunk = store
2366                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2367                .await
2368                .unwrap();
2369
2370            assert!(previous_chunk.is_none());
2371        }
2372
2373        // Case #2: there is one chunk only: we request the previous on this
2374        // one, it doesn't exist.
2375        {
2376            store
2377                .handle_linked_chunk_updates(
2378                    linked_chunk_id,
2379                    vec![Update::NewItemsChunk {
2380                        previous: None,
2381                        new: ChunkIdentifier::new(42),
2382                        next: None,
2383                    }],
2384                )
2385                .await
2386                .unwrap();
2387
2388            let previous_chunk =
2389                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2390
2391            assert!(previous_chunk.is_none());
2392        }
2393
2394        // Case #3: there are two chunks.
2395        {
2396            store
2397                .handle_linked_chunk_updates(
2398                    linked_chunk_id,
2399                    vec![
2400                        // new chunk before the one that exists.
2401                        Update::NewItemsChunk {
2402                            previous: None,
2403                            new: ChunkIdentifier::new(7),
2404                            next: Some(ChunkIdentifier::new(42)),
2405                        },
2406                        Update::PushItems {
2407                            at: Position::new(ChunkIdentifier::new(7), 0),
2408                            items: vec![event("brigand du jorat"), event("morbier")],
2409                        },
2410                    ],
2411                )
2412                .await
2413                .unwrap();
2414
2415            let previous_chunk =
2416                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2417
2418            assert_matches!(previous_chunk, Some(previous_chunk) => {
2419                assert_eq!(previous_chunk.identifier, 7);
2420                assert!(previous_chunk.previous.is_none());
2421                assert_matches!(previous_chunk.next, Some(next) => {
2422                    assert_eq!(next, 42);
2423                });
2424                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2425                    assert_eq!(items.len(), 2);
2426                    check_test_event(&items[0], "brigand du jorat");
2427                    check_test_event(&items[1], "morbier");
2428                });
2429            });
2430        }
2431    }
2432}
2433
2434#[cfg(test)]
2435mod encrypted_tests {
2436    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2437
2438    use matrix_sdk_base::{
2439        event_cache::store::{EventCacheStore, EventCacheStoreError},
2440        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2441    };
2442    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2443    use once_cell::sync::Lazy;
2444    use ruma::{
2445        event_id,
2446        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2447        room_id, user_id,
2448    };
2449    use tempfile::{tempdir, TempDir};
2450
2451    use super::SqliteEventCacheStore;
2452
2453    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2454    static NUM: AtomicU32 = AtomicU32::new(0);
2455
2456    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2457        let name = NUM.fetch_add(1, SeqCst).to_string();
2458        let tmpdir_path = TMP_DIR.path().join(name);
2459
2460        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2461
2462        Ok(SqliteEventCacheStore::open(
2463            tmpdir_path.to_str().unwrap(),
2464            Some("default_test_password"),
2465        )
2466        .await
2467        .unwrap())
2468    }
2469
2470    event_cache_store_integration_tests!();
2471    event_cache_store_integration_tests_time!();
2472
2473    #[async_test]
2474    async fn test_no_sqlite_injection_in_find_event_relations() {
2475        let room_id = room_id!("!test:localhost");
2476        let another_room_id = room_id!("!r1:matrix.org");
2477        let sender = user_id!("@alice:localhost");
2478
2479        let store = get_event_cache_store()
2480            .await
2481            .expect("We should be able to create a new, empty, event cache store");
2482
2483        let f = EventFactory::new().room(room_id).sender(sender);
2484
2485        // Create an event for the first room.
2486        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2487        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2488
2489        // Create a related event.
2490        let edit_id = event_id!("$find_me:matrix.org");
2491        let edit = f
2492            .text_msg("Find me")
2493            .event_id(edit_id)
2494            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2495            .into_event();
2496
2497        // Create an event for the second room.
2498        let f = f.room(another_room_id);
2499
2500        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2501        let another_event =
2502            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2503
2504        // Save the events in the DB.
2505        store.save_event(room_id, event).await.unwrap();
2506        store.save_event(room_id, edit).await.unwrap();
2507        store.save_event(another_room_id, another_event).await.unwrap();
2508
2509        // Craft a `RelationType` that will inject some SQL to be executed. The
2510        // `OR 1=1` ensures that all the previous parameters, the room
2511        // ID and event ID are ignored.
2512        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2513
2514        // Attempt to find events in the first room.
2515        let results = store
2516            .find_event_relations(room_id, event_id, filter.as_deref())
2517            .await
2518            .expect("We should be able to attempt to find event relations");
2519
2520        // Ensure that we only got the single related event the first room contains.
2521        similar_asserts::assert_eq!(
2522            results.len(),
2523            1,
2524            "We should only have loaded events for the first room {results:#?}"
2525        );
2526
2527        // The event needs to be the edit event, otherwise something is wrong.
2528        let (found_event, _) = &results[0];
2529        assert_eq!(
2530            found_event.event_id().as_deref(),
2531            Some(edit_id),
2532            "The single event we found should be the edit event"
2533        );
2534    }
2535}