matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        Event, Gap,
25        store::{EventCacheStore, extract_event_relation},
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
36};
37use rusqlite::{
38    OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
39};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47    OpenStoreError, Secret, SqliteStoreConfig,
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
53    },
54};
55
56mod keys {
57    // Tables
58    pub const LINKED_CHUNKS: &str = "linked_chunks";
59    pub const EVENTS: &str = "events";
60}
61
62/// The database name.
63const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65/// Identifier of the latest database version.
66///
67/// This is used to figure whether the SQLite database requires a migration.
68/// Every new SQL migration should imply a bump of this number, and changes in
69/// the [`run_migrations`] function.
70const DATABASE_VERSION: u8 = 13;
71
72/// The string used to identify a chunk of type events, in the `type` field in
73/// the database.
74const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75/// The string used to identify a chunk of type gap, in the `type` field in the
76/// database.
77const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79/// An SQLite-based event cache store.
80#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82    store_cipher: Option<Arc<StoreCipher>>,
83
84    /// The pool of connections.
85    pool: SqlitePool,
86
87    /// We make the difference between connections for read operations, and for
88    /// write operations. We keep a single connection apart from write
89    /// operations. All other connections are used for read operations. The
90    /// lock is used to ensure there is one owner at a time.
91    write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98    }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102    fn get_cypher(&self) -> Option<&StoreCipher> {
103        self.store_cipher.as_deref()
104    }
105}
106
107impl SqliteEventCacheStore {
108    /// Open the SQLite-based event cache store at the given path using the
109    /// given passphrase to encrypt private data.
110    pub async fn open(
111        path: impl AsRef<Path>,
112        passphrase: Option<&str>,
113    ) -> Result<Self, OpenStoreError> {
114        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115    }
116
117    /// Open the SQLite-based event cache store at the given path using the
118    /// given key to encrypt private data.
119    pub async fn open_with_key(
120        path: impl AsRef<Path>,
121        key: Option<&[u8; 32]>,
122    ) -> Result<Self, OpenStoreError> {
123        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124    }
125
126    /// Open the SQLite-based event cache store with the config open config.
127    #[instrument(skip(config), fields(path = ?config.path))]
128    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129        debug!(?config);
130
131        let _timer = timer!("open_with_config");
132
133        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137        let this = Self::open_with_pool(pool, config.secret).await?;
138        this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140        Ok(this)
141    }
142
143    /// Open an SQLite-based event cache store using the given SQLite database
144    /// pool. The given secret will be used to encrypt private data.
145    async fn open_with_pool(
146        pool: SqlitePool,
147        secret: Option<Secret>,
148    ) -> Result<Self, OpenStoreError> {
149        let conn = pool.get().await?;
150
151        let version = conn.db_version().await?;
152        run_migrations(&conn, version).await?;
153
154        let store_cipher = match secret {
155            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
156            None => None,
157        };
158
159        Ok(Self {
160            store_cipher,
161            pool,
162            // Use `conn` as our selected write connections.
163            write_connection: Arc::new(Mutex::new(conn)),
164        })
165    }
166
167    /// Acquire a connection for executing read operations.
168    #[instrument(skip_all)]
169    async fn read(&self) -> Result<SqliteAsyncConn> {
170        let connection = self.pool.get().await?;
171
172        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
173        // support must be enabled on a per-connection basis. Execute it every
174        // time we try to get a connection, since we can't guarantee a previous
175        // connection did enable it before.
176        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
177
178        Ok(connection)
179    }
180
181    /// Acquire a connection for executing write operations.
182    #[instrument(skip_all)]
183    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
184        let connection = self.write_connection.clone().lock_owned().await;
185
186        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
187        // support must be enabled on a per-connection basis. Execute it every
188        // time we try to get a connection, since we can't guarantee a previous
189        // connection did enable it before.
190        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
191
192        Ok(connection)
193    }
194
195    fn map_row_to_chunk(
196        row: &rusqlite::Row<'_>,
197    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
198        Ok((
199            row.get::<_, u64>(0)?,
200            row.get::<_, Option<u64>>(1)?,
201            row.get::<_, Option<u64>>(2)?,
202            row.get::<_, String>(3)?,
203        ))
204    }
205
206    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
207        let serialized = serde_json::to_vec(event)?;
208
209        // Extract the relationship info here.
210        let raw_event = event.raw();
211        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
212
213        // The content may be encrypted.
214        let content = self.encode_value(serialized)?;
215
216        Ok(EncodedEvent {
217            content,
218            rel_type,
219            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
220        })
221    }
222
223    pub async fn vacuum(&self) -> Result<()> {
224        self.write_connection.lock().await.vacuum().await
225    }
226
227    async fn get_db_size(&self) -> Result<Option<usize>> {
228        Ok(Some(self.pool.get().await?.get_db_size().await?))
229    }
230}
231
232struct EncodedEvent {
233    content: Vec<u8>,
234    rel_type: Option<String>,
235    relates_to: Option<String>,
236}
237
238trait TransactionExtForLinkedChunks {
239    fn rebuild_chunk(
240        &self,
241        store: &SqliteEventCacheStore,
242        linked_chunk_id: &Key,
243        previous: Option<u64>,
244        index: u64,
245        next: Option<u64>,
246        chunk_type: &str,
247    ) -> Result<RawChunk<Event, Gap>>;
248
249    fn load_gap_content(
250        &self,
251        store: &SqliteEventCacheStore,
252        linked_chunk_id: &Key,
253        chunk_id: ChunkIdentifier,
254    ) -> Result<Gap>;
255
256    fn load_events_content(
257        &self,
258        store: &SqliteEventCacheStore,
259        linked_chunk_id: &Key,
260        chunk_id: ChunkIdentifier,
261    ) -> Result<Vec<Event>>;
262}
263
264impl TransactionExtForLinkedChunks for Transaction<'_> {
265    fn rebuild_chunk(
266        &self,
267        store: &SqliteEventCacheStore,
268        linked_chunk_id: &Key,
269        previous: Option<u64>,
270        id: u64,
271        next: Option<u64>,
272        chunk_type: &str,
273    ) -> Result<RawChunk<Event, Gap>> {
274        let previous = previous.map(ChunkIdentifier::new);
275        let next = next.map(ChunkIdentifier::new);
276        let id = ChunkIdentifier::new(id);
277
278        match chunk_type {
279            CHUNK_TYPE_GAP_TYPE_STRING => {
280                // It's a gap!
281                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
282                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
283            }
284
285            CHUNK_TYPE_EVENT_TYPE_STRING => {
286                // It's events!
287                let events = self.load_events_content(store, linked_chunk_id, id)?;
288                Ok(RawChunk {
289                    content: ChunkContent::Items(events),
290                    previous,
291                    identifier: id,
292                    next,
293                })
294            }
295
296            other => {
297                // It's an error!
298                Err(Error::InvalidData {
299                    details: format!("a linked chunk has an unknown type {other}"),
300                })
301            }
302        }
303    }
304
305    fn load_gap_content(
306        &self,
307        store: &SqliteEventCacheStore,
308        linked_chunk_id: &Key,
309        chunk_id: ChunkIdentifier,
310    ) -> Result<Gap> {
311        // There's at most one row for it in the database, so a call to `query_row` is
312        // sufficient.
313        let encoded_prev_token: Vec<u8> = self.query_row(
314            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
315            (chunk_id.index(), &linked_chunk_id),
316            |row| row.get(0),
317        )?;
318        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
319        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
320        Ok(Gap { prev_token })
321    }
322
323    fn load_events_content(
324        &self,
325        store: &SqliteEventCacheStore,
326        linked_chunk_id: &Key,
327        chunk_id: ChunkIdentifier,
328    ) -> Result<Vec<Event>> {
329        // Retrieve all the events from the database.
330        let mut events = Vec::new();
331
332        for event_data in self
333            .prepare(
334                r#"
335                    SELECT events.content
336                    FROM event_chunks ec, events
337                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
338                    ORDER BY ec.position ASC
339                "#,
340            )?
341            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
342        {
343            let encoded_content = event_data?;
344            let serialized_content = store.decode_value(&encoded_content)?;
345            let event = serde_json::from_slice(&serialized_content)?;
346
347            events.push(event);
348        }
349
350        Ok(events)
351    }
352}
353
354/// Run migrations for the given version of the database.
355async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
356    if version == 0 {
357        debug!("Creating database");
358    } else if version < DATABASE_VERSION {
359        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
360    } else {
361        return Ok(());
362    }
363
364    // Always enable foreign keys for the current connection.
365    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
366
367    if version < 1 {
368        // First turn on WAL mode, this can't be done in the transaction, it fails with
369        // the error message: "cannot change into wal mode from within a transaction".
370        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
371        conn.with_transaction(|txn| {
372            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
373            txn.set_db_version(1)
374        })
375        .await?;
376    }
377
378    if version < 2 {
379        conn.with_transaction(|txn| {
380            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
381            txn.set_db_version(2)
382        })
383        .await?;
384    }
385
386    if version < 3 {
387        conn.with_transaction(|txn| {
388            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
389            txn.set_db_version(3)
390        })
391        .await?;
392    }
393
394    if version < 4 {
395        conn.with_transaction(|txn| {
396            txn.execute_batch(include_str!(
397                "../migrations/event_cache_store/004_ignore_policy.sql"
398            ))?;
399            txn.set_db_version(4)
400        })
401        .await?;
402    }
403
404    if version < 5 {
405        conn.with_transaction(|txn| {
406            txn.execute_batch(include_str!(
407                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
408            ))?;
409            txn.set_db_version(5)
410        })
411        .await?;
412    }
413
414    if version < 6 {
415        conn.with_transaction(|txn| {
416            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
417            txn.set_db_version(6)
418        })
419        .await?;
420    }
421
422    if version < 7 {
423        conn.with_transaction(|txn| {
424            txn.execute_batch(include_str!(
425                "../migrations/event_cache_store/007_event_chunks.sql"
426            ))?;
427            txn.set_db_version(7)
428        })
429        .await?;
430    }
431
432    if version < 8 {
433        conn.with_transaction(|txn| {
434            txn.execute_batch(include_str!(
435                "../migrations/event_cache_store/008_linked_chunk_id.sql"
436            ))?;
437            txn.set_db_version(8)
438        })
439        .await?;
440    }
441
442    if version < 9 {
443        conn.with_transaction(|txn| {
444            txn.execute_batch(include_str!(
445                "../migrations/event_cache_store/009_related_event_index.sql"
446            ))?;
447            txn.set_db_version(9)
448        })
449        .await?;
450    }
451
452    if version < 10 {
453        conn.with_transaction(|txn| {
454            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
455            txn.set_db_version(10)
456        })
457        .await?;
458
459        if version >= 1 {
460            // Defragment the DB and optimize its size on the filesystem now that we removed
461            // the media cache.
462            conn.vacuum().await?;
463        }
464    }
465
466    if version < 11 {
467        conn.with_transaction(|txn| {
468            txn.execute_batch(include_str!(
469                "../migrations/event_cache_store/011_empty_event_cache.sql"
470            ))?;
471            txn.set_db_version(11)
472        })
473        .await?;
474    }
475
476    if version < 12 {
477        conn.with_transaction(|txn| {
478            txn.execute_batch(include_str!(
479                "../migrations/event_cache_store/012_store_event_type.sql"
480            ))?;
481            txn.set_db_version(12)
482        })
483        .await?;
484    }
485
486    if version < 13 {
487        conn.with_transaction(|txn| {
488            txn.execute_batch(include_str!(
489                "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
490            ))?;
491            txn.set_db_version(13)
492        })
493        .await?;
494    }
495
496    Ok(())
497}
498
499#[async_trait]
500impl EventCacheStore for SqliteEventCacheStore {
501    type Error = Error;
502
503    #[instrument(skip(self))]
504    async fn try_take_leased_lock(
505        &self,
506        lease_duration_ms: u32,
507        key: &str,
508        holder: &str,
509    ) -> Result<Option<CrossProcessLockGeneration>> {
510        let key = key.to_owned();
511        let holder = holder.to_owned();
512
513        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
514        let expiration = now + lease_duration_ms as u64;
515
516        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
517        let generation = self
518            .write()
519            .await?
520            .with_transaction(move |txn| {
521                txn.query_row(
522                    "INSERT INTO lease_locks (key, holder, expiration)
523                    VALUES (?1, ?2, ?3)
524                    ON CONFLICT (key)
525                    DO
526                        UPDATE SET
527                            holder = excluded.holder,
528                            expiration = excluded.expiration,
529                            generation =
530                                CASE holder
531                                    WHEN excluded.holder THEN generation
532                                    ELSE generation + 1
533                                END
534                        WHERE
535                            holder = excluded.holder
536                            OR expiration < ?4
537                    RETURNING generation
538                    ",
539                    (key, holder, expiration, now),
540                    |row| row.get(0),
541                )
542                .optional()
543            })
544            .await?;
545
546        Ok(generation)
547    }
548
549    #[instrument(skip(self, updates))]
550    async fn handle_linked_chunk_updates(
551        &self,
552        linked_chunk_id: LinkedChunkId<'_>,
553        updates: Vec<Update<Event, Gap>>,
554    ) -> Result<(), Self::Error> {
555        let _timer = timer!("method");
556
557        // Use a single transaction throughout this function, so that either all updates
558        // work, or none is taken into account.
559        let hashed_linked_chunk_id =
560            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
561        let linked_chunk_id = linked_chunk_id.to_owned();
562        let this = self.clone();
563
564        with_immediate_transaction(self, move |txn| {
565            for up in updates {
566                match up {
567                    Update::NewItemsChunk { previous, new, next } => {
568                        let previous = previous.as_ref().map(ChunkIdentifier::index);
569                        let new = new.index();
570                        let next = next.as_ref().map(ChunkIdentifier::index);
571
572                        trace!(
573                            %linked_chunk_id,
574                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
575                        );
576
577                        insert_chunk(
578                            txn,
579                            &hashed_linked_chunk_id,
580                            previous,
581                            new,
582                            next,
583                            CHUNK_TYPE_EVENT_TYPE_STRING,
584                        )?;
585                    }
586
587                    Update::NewGapChunk { previous, new, next, gap } => {
588                        let serialized = serde_json::to_vec(&gap.prev_token)?;
589                        let prev_token = this.encode_value(serialized)?;
590
591                        let previous = previous.as_ref().map(ChunkIdentifier::index);
592                        let new = new.index();
593                        let next = next.as_ref().map(ChunkIdentifier::index);
594
595                        trace!(
596                            %linked_chunk_id,
597                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
598                        );
599
600                        // Insert the chunk as a gap.
601                        insert_chunk(
602                            txn,
603                            &hashed_linked_chunk_id,
604                            previous,
605                            new,
606                            next,
607                            CHUNK_TYPE_GAP_TYPE_STRING,
608                        )?;
609
610                        // Insert the gap's value.
611                        txn.execute(
612                            r#"
613                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
614                            VALUES (?, ?, ?)
615                        "#,
616                            (new, &hashed_linked_chunk_id, prev_token),
617                        )?;
618                    }
619
620                    Update::RemoveChunk(chunk_identifier) => {
621                        let chunk_id = chunk_identifier.index();
622
623                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
624
625                        // Find chunk to delete.
626                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
627                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
628                            (chunk_id, &hashed_linked_chunk_id),
629                            |row| Ok((row.get(0)?, row.get(1)?))
630                        )?;
631
632                        // Replace its previous' next to its own next.
633                        if let Some(previous) = previous {
634                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
635                        }
636
637                        // Replace its next' previous to its own previous.
638                        if let Some(next) = next {
639                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
640                        }
641
642                        // Now delete it, and let cascading delete corresponding entries in the
643                        // other data tables.
644                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
645                    }
646
647                    Update::PushItems { at, items } => {
648                        if items.is_empty() {
649                            // Should never happens, but better be safe.
650                            continue;
651                        }
652
653                        let chunk_id = at.chunk_identifier().index();
654
655                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
656
657                        let mut chunk_statement = txn.prepare(
658                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
659                        )?;
660
661                        // Note: we use `OR REPLACE` here, because the event might have been
662                        // already inserted in the database. This is the case when an event is
663                        // deduplicated and moved to another position; or because it was inserted
664                        // outside the context of a linked chunk (e.g. pinned event).
665                        let mut content_statement = txn.prepare(
666                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
667                        )?;
668
669                        let invalid_event = |event: TimelineEvent| {
670                            let Some(event_id) = event.event_id() else {
671                                error!(%linked_chunk_id, "Trying to push an event with no ID");
672                                return None;
673                            };
674
675                            let Some(event_type) = event.kind.event_type() else {
676                                error!(%event_id, "Trying to save an event with no event type");
677                                return None;
678                            };
679
680                            Some((event_id.to_string(), event_type, event))
681                        };
682
683                        let room_id = linked_chunk_id.room_id();
684                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
685
686                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
687                            // Insert the location information into the database.
688                            let index = at.index() + i;
689                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
690
691                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
692                            let event_type = this.encode_key(keys::EVENTS, event_type);
693
694                            // Now, insert the event content into the database.
695                            let encoded_event = this.encode_event(&event)?;
696                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
697                        }
698                    }
699
700                    Update::ReplaceItem { at, item: event } => {
701                        let chunk_id = at.chunk_identifier().index();
702
703                        let index = at.index();
704
705                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
706
707                        // The event id should be the same, but just in case it changed…
708                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
709                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
710                            continue;
711                        };
712
713                        let Some(event_type) = event.kind.event_type() else {
714                            error!(%event_id, "Trying to save an event with no event type");
715                            continue;
716                        };
717
718                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
719                        let event_type = this.encode_key(keys::EVENTS, event_type);
720
721                        // Replace the event's content. Really we'd like to update, but in case the
722                        // event id changed, we are a bit lenient here and will allow an insertion
723                        // of the new event.
724                        let encoded_event = this.encode_event(&event)?;
725                        let room_id = linked_chunk_id.room_id();
726                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
727                        txn.execute(
728                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
729                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
730
731                        // Replace the event id in the linked chunk, in case it changed.
732                        txn.execute(
733                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
734                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
735                        )?;
736                    }
737
738                    Update::RemoveItem { at } => {
739                        let chunk_id = at.chunk_identifier().index();
740                        let index = at.index();
741
742                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
743
744                        // Remove the entry in the chunk table.
745                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
746
747                        // Decrement the index of each item after the one we are
748                        // going to remove.
749                        //
750                        // Imagine we have the following events:
751                        //
752                        // | event_id | linked_chunk_id | chunk_id | position |
753                        // |----------|-----------------|----------|----------|
754                        // | $ev0     | !r0             | 42       | 0        |
755                        // | $ev1     | !r0             | 42       | 1        |
756                        // | $ev2     | !r0             | 42       | 2        |
757                        // | $ev3     | !r0             | 42       | 3        |
758                        // | $ev4     | !r0             | 42       | 4        |
759                        //
760                        // `$ev2` has been removed, then we end up in this
761                        // state:
762                        //
763                        // | event_id | linked_chunk_id    | chunk_id | position |
764                        // |----------|--------------------|----------|----------|
765                        // | $ev0     | !r0                | 42       | 0        |
766                        // | $ev1     | !r0                | 42       | 1        |
767                        // |          |                    |          |          | <- no more `$ev2`
768                        // | $ev3     | !r0                | 42       | 3        |
769                        // | $ev4     | !r0                | 42       | 4        |
770                        //
771                        // We need to shift the `position` of `$ev3` and `$ev4`
772                        // to `position - 1`, like so:
773                        //
774                        // | event_id | linked_chunk_id | chunk_id | position |
775                        // |----------|-----------------|----------|----------|
776                        // | $ev0     | !r0             | 42       | 0        |
777                        // | $ev1     | !r0             | 42       | 1        |
778                        // | $ev3     | !r0             | 42       | 2        |
779                        // | $ev4     | !r0             | 42       | 3        |
780                        //
781                        // Usually, it boils down to run the following query:
782                        //
783                        // ```sql
784                        // UPDATE event_chunks
785                        // SET position = position - 1
786                        // WHERE position > 2 AND …
787                        // ```
788                        //
789                        // Okay. But `UPDATE` runs on rows in no particular
790                        // order. It means that it can update `$ev4` before
791                        // `$ev3` for example. What happens in this particular
792                        // case? The `position` of `$ev4` becomes `3`, however
793                        // `$ev3` already has `position = 3`. Because there
794                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
795                        // position)`, it will result in a constraint violation.
796                        //
797                        // There is **no way** to control the execution order of
798                        // `UPDATE` in SQLite. To persuade yourself, try:
799                        //
800                        // ```sql
801                        // UPDATE event_chunks
802                        // SET position = position - 1
803                        // FROM (
804                        //     SELECT event_id
805                        //     FROM event_chunks
806                        //     WHERE position > 2 AND …
807                        //     ORDER BY position ASC
808                        // ) as ordered
809                        // WHERE event_chunks.event_id = ordered.event_id
810                        // ```
811                        //
812                        // It will fail the same way.
813                        //
814                        // Thus, we have 2 solutions:
815                        //
816                        // 1. Remove the `UNIQUE` constraint,
817                        // 2. Be creative.
818                        //
819                        // The `UNIQUE` constraint is a safe belt. Normally, we
820                        // have `event_cache::Deduplicator` that is responsible
821                        // to ensure there is no duplicated event. However,
822                        // relying on this is “fragile” in the sense it can
823                        // contain bugs. Relying on the `UNIQUE` constraint from
824                        // SQLite is more robust. It's “braces and belt” as we
825                        // say here.
826                        //
827                        // So. We need to be creative.
828                        //
829                        // Many solutions exist. Amongst the most popular, we
830                        // see _dropping and re-creating the index_, which is
831                        // no-go for us, it's too expensive. I (@hywan) have
832                        // adopted the following one:
833                        //
834                        // - Do `position = position - 1` but in the negative
835                        //   space, so `position = -(position - 1)`. A position
836                        //   cannot be negative; we are sure it is unique!
837                        // - Once all candidate rows are updated, do `position =
838                        //   -position` to move back to the positive space.
839                        //
840                        // 'told you it's gonna be creative.
841                        //
842                        // This solution is a hack, **but** it is a small
843                        // number of operations, and we can keep the `UNIQUE`
844                        // constraint in place.
845                        txn.execute(
846                            r#"
847                                UPDATE event_chunks
848                                SET position = -(position - 1)
849                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
850                            "#,
851                            (&hashed_linked_chunk_id, chunk_id, index)
852                        )?;
853                        txn.execute(
854                            r#"
855                                UPDATE event_chunks
856                                SET position = -position
857                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
858                            "#,
859                            (&hashed_linked_chunk_id, chunk_id)
860                        )?;
861                    }
862
863                    Update::DetachLastItems { at } => {
864                        let chunk_id = at.chunk_identifier().index();
865                        let index = at.index();
866
867                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
868
869                        // Remove these entries.
870                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
871                    }
872
873                    Update::Clear => {
874                        trace!(%linked_chunk_id, "clearing items");
875
876                        // Remove chunks, and let cascading do its job.
877                        txn.execute(
878                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
879                            (&hashed_linked_chunk_id,),
880                        )?;
881                    }
882
883                    Update::StartReattachItems | Update::EndReattachItems => {
884                        // Nothing.
885                    }
886                }
887            }
888
889            Ok(())
890        })
891        .await?;
892
893        Ok(())
894    }
895
896    #[instrument(skip(self))]
897    async fn load_all_chunks(
898        &self,
899        linked_chunk_id: LinkedChunkId<'_>,
900    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
901        let _timer = timer!("method");
902
903        let hashed_linked_chunk_id =
904            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
905
906        let this = self.clone();
907
908        let result = self
909            .read()
910            .await?
911            .with_transaction(move |txn| -> Result<_> {
912                let mut items = Vec::new();
913
914                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
915                for data in txn
916                    .prepare(
917                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
918                    )?
919                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
920                {
921                    let (id, previous, next, chunk_type) = data?;
922                    let new = txn.rebuild_chunk(
923                        &this,
924                        &hashed_linked_chunk_id,
925                        previous,
926                        id,
927                        next,
928                        chunk_type.as_str(),
929                    )?;
930                    items.push(new);
931                }
932
933                Ok(items)
934            })
935            .await?;
936
937        Ok(result)
938    }
939
940    #[instrument(skip(self))]
941    async fn load_all_chunks_metadata(
942        &self,
943        linked_chunk_id: LinkedChunkId<'_>,
944    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
945        let _timer = timer!("method");
946
947        let hashed_linked_chunk_id =
948            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
949
950        self.read()
951            .await?
952            .with_transaction(move |txn| -> Result<_> {
953                // We want to collect the metadata about each chunk (id, next, previous), and
954                // for event chunks, the number of events in it. For gaps, the
955                // number of events is 0, by convention.
956                //
957                // We've tried different strategies over time:
958                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
959                //   caused a full table traversal for each chunk, including for gaps which
960                //   don't have any events. This happened in
961                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
962                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
963                //   additional `SELECT` query. It was an immense improvement, but still caused
964                //   one select query per event chunk. This happened in
965                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
966                //
967                // The current solution is to run two queries:
968                // - one to get each chunk and its number of events, by doing a single `SELECT`
969                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
970                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
971                //   hashmap.
972                // - one to get each chunk's metadata (id, previous, next, type) from the
973                //   database with a `SELECT`, and then use the hashmap to get the number of
974                //   events.
975                //
976                // This strategy minimizes the number of queries to the database, and keeps them
977                // super simple, while doing a bit more processing here, which is much faster.
978
979                let num_events_by_chunk_ids = txn
980                    .prepare(
981                        r#"
982                            SELECT ec.chunk_id, COUNT(ec.event_id)
983                            FROM event_chunks as ec
984                            WHERE ec.linked_chunk_id = ?
985                            GROUP BY ec.chunk_id
986                        "#,
987                    )?
988                    .query_map((&hashed_linked_chunk_id,), |row| {
989                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
990                    })?
991                    .collect::<Result<HashMap<_, _>, _>>()?;
992
993                txn.prepare(
994                    r#"
995                        SELECT
996                            lc.id,
997                            lc.previous,
998                            lc.next,
999                            lc.type
1000                        FROM linked_chunks as lc
1001                        WHERE lc.linked_chunk_id = ?
1002                        ORDER BY lc.id"#,
1003                )?
1004                .query_map((&hashed_linked_chunk_id,), |row| {
1005                    Ok((
1006                        row.get::<_, u64>(0)?,
1007                        row.get::<_, Option<u64>>(1)?,
1008                        row.get::<_, Option<u64>>(2)?,
1009                        row.get::<_, String>(3)?,
1010                    ))
1011                })?
1012                .map(|data| -> Result<_> {
1013                    let (id, previous, next, chunk_type) = data?;
1014
1015                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
1016                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
1017                    // benchmarking shows that this is slightly slower than matching the chunk
1018                    // type (around 1%, so in the realm of noise), so we keep the explicit
1019                    // check instead.
1020                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1021                        0
1022                    } else {
1023                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1024                    };
1025
1026                    Ok(ChunkMetadata {
1027                        identifier: ChunkIdentifier::new(id),
1028                        previous: previous.map(ChunkIdentifier::new),
1029                        next: next.map(ChunkIdentifier::new),
1030                        num_items,
1031                    })
1032                })
1033                .collect::<Result<Vec<_>, _>>()
1034            })
1035            .await
1036    }
1037
1038    #[instrument(skip(self))]
1039    async fn load_last_chunk(
1040        &self,
1041        linked_chunk_id: LinkedChunkId<'_>,
1042    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1043        let _timer = timer!("method");
1044
1045        let hashed_linked_chunk_id =
1046            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1047
1048        let this = self.clone();
1049
1050        self
1051            .read()
1052            .await?
1053            .with_transaction(move |txn| -> Result<_> {
1054                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1055                let (observed_max_identifier, number_of_chunks) = txn
1056                    .prepare(
1057                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1058                    )?
1059                    .query_row(
1060                        (&hashed_linked_chunk_id,),
1061                        |row| {
1062                            Ok((
1063                                // Read the `MAX(id)` as an `Option<u64>` instead
1064                                // of `u64` in case the `SELECT` returns nothing.
1065                                // Indeed, if it returns no line, the `MAX(id)` is
1066                                // set to `Null`.
1067                                row.get::<_, Option<u64>>(0)?,
1068                                row.get::<_, u64>(1)?,
1069                            ))
1070                        }
1071                    )?;
1072
1073                let chunk_identifier_generator = match observed_max_identifier {
1074                    Some(max_observed_identifier) => {
1075                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1076                            ChunkIdentifier::new(max_observed_identifier)
1077                        )
1078                    },
1079                    None => ChunkIdentifierGenerator::new_from_scratch(),
1080                };
1081
1082                // Find the last chunk.
1083                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1084                    .prepare(
1085                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1086                    )?
1087                    .query_row(
1088                        (&hashed_linked_chunk_id,),
1089                        |row| {
1090                            Ok((
1091                                row.get::<_, u64>(0)?,
1092                                row.get::<_, Option<u64>>(1)?,
1093                                row.get::<_, String>(2)?,
1094                            ))
1095                        }
1096                    )
1097                    .optional()?
1098                else {
1099                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1100                    // good.
1101                    if number_of_chunks == 0 {
1102                        return Ok((None, chunk_identifier_generator));
1103                    }
1104                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1105                    // linked chunk is malformed.
1106                    //
1107                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1108                    else {
1109                        return Err(Error::InvalidData {
1110                            details:
1111                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1112                                    .to_owned()
1113                            }
1114                        )
1115                    }
1116                };
1117
1118                // Build the chunk.
1119                let last_chunk = txn.rebuild_chunk(
1120                    &this,
1121                    &hashed_linked_chunk_id,
1122                    previous_chunk,
1123                    chunk_identifier,
1124                    None,
1125                    &chunk_type
1126                )?;
1127
1128                Ok((Some(last_chunk), chunk_identifier_generator))
1129            })
1130            .await
1131    }
1132
1133    #[instrument(skip(self))]
1134    async fn load_previous_chunk(
1135        &self,
1136        linked_chunk_id: LinkedChunkId<'_>,
1137        before_chunk_identifier: ChunkIdentifier,
1138    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1139        let _timer = timer!("method");
1140
1141        let hashed_linked_chunk_id =
1142            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1143
1144        let this = self.clone();
1145
1146        self
1147            .read()
1148            .await?
1149            .with_transaction(move |txn| -> Result<_> {
1150                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1151                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1152                    .prepare(
1153                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1154                    )?
1155                    .query_row(
1156                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1157                        |row| {
1158                            Ok((
1159                                row.get::<_, u64>(0)?,
1160                                row.get::<_, Option<u64>>(1)?,
1161                                row.get::<_, Option<u64>>(2)?,
1162                                row.get::<_, String>(3)?,
1163                            ))
1164                        }
1165                    )
1166                    .optional()?
1167                else {
1168                    // Chunk is not found.
1169                    return Ok(None);
1170                };
1171
1172                // Build the chunk.
1173                let last_chunk = txn.rebuild_chunk(
1174                    &this,
1175                    &hashed_linked_chunk_id,
1176                    previous_chunk,
1177                    chunk_identifier,
1178                    next_chunk,
1179                    &chunk_type
1180                )?;
1181
1182                Ok(Some(last_chunk))
1183            })
1184            .await
1185    }
1186
1187    #[instrument(skip(self))]
1188    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1189        let _timer = timer!("method");
1190
1191        self.write()
1192            .await?
1193            .with_transaction(move |txn| {
1194                // Remove all the chunks, and let cascading do its job.
1195                txn.execute("DELETE FROM linked_chunks", ())?;
1196                // Also clear all the events' contents.
1197                txn.execute("DELETE FROM events", ())
1198            })
1199            .await?;
1200
1201        Ok(())
1202    }
1203
1204    #[instrument(skip(self, events))]
1205    async fn filter_duplicated_events(
1206        &self,
1207        linked_chunk_id: LinkedChunkId<'_>,
1208        events: Vec<OwnedEventId>,
1209    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1210        let _timer = timer!("method");
1211
1212        // If there's no events for which we want to check duplicates, we can return
1213        // early. It's not only an optimization to do so: it's required, otherwise the
1214        // `repeat_vars` call below will panic.
1215        if events.is_empty() {
1216            return Ok(Vec::new());
1217        }
1218
1219        // Select all events that exist in the store, i.e. the duplicates.
1220        let hashed_linked_chunk_id =
1221            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1222        let linked_chunk_id = linked_chunk_id.to_owned();
1223
1224        self.read()
1225            .await?
1226            .with_transaction(move |txn| -> Result<_> {
1227                txn.chunk_large_query_over(events, None, move |txn, events| {
1228                    let query = format!(
1229                        r#"
1230                            SELECT event_id, chunk_id, position
1231                            FROM event_chunks
1232                            WHERE linked_chunk_id = ? AND event_id IN ({})
1233                            ORDER BY chunk_id ASC, position ASC
1234                        "#,
1235                        repeat_vars(events.len()),
1236                    );
1237
1238                    let parameters = params_from_iter(
1239                        // parameter for `linked_chunk_id = ?`
1240                        once(
1241                            hashed_linked_chunk_id
1242                                .to_sql()
1243                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1244                                .unwrap(),
1245                        )
1246                        // parameters for `event_id IN (…)`
1247                        .chain(events.iter().map(|event| {
1248                            event
1249                                .as_str()
1250                                .to_sql()
1251                                // SAFETY: it cannot fail since `str::to_sql` never fails
1252                                .unwrap()
1253                        })),
1254                    );
1255
1256                    let mut duplicated_events = Vec::new();
1257
1258                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1259                        Ok((
1260                            row.get::<_, String>(0)?,
1261                            row.get::<_, u64>(1)?,
1262                            row.get::<_, usize>(2)?,
1263                        ))
1264                    })? {
1265                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1266
1267                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1268                            // Normally unreachable, but the event ID has been stored even if it is
1269                            // malformed, let's skip it.
1270                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1271                            continue;
1272                        };
1273
1274                        duplicated_events.push((
1275                            duplicated_event,
1276                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1277                        ));
1278                    }
1279
1280                    Ok(duplicated_events)
1281                })
1282            })
1283            .await
1284    }
1285
1286    #[instrument(skip(self, event_id))]
1287    async fn find_event(
1288        &self,
1289        room_id: &RoomId,
1290        event_id: &EventId,
1291    ) -> Result<Option<Event>, Self::Error> {
1292        let _timer = timer!("method");
1293
1294        let event_id = event_id.to_owned();
1295        let this = self.clone();
1296
1297        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1298
1299        self.read()
1300            .await?
1301            .with_transaction(move |txn| -> Result<_> {
1302                let Some(event) = txn
1303                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1304                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1305                    .optional()?
1306                else {
1307                    // Event is not found.
1308                    return Ok(None);
1309                };
1310
1311                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1312
1313                Ok(Some(event))
1314            })
1315            .await
1316    }
1317
1318    #[instrument(skip(self, event_id, filters))]
1319    async fn find_event_relations(
1320        &self,
1321        room_id: &RoomId,
1322        event_id: &EventId,
1323        filters: Option<&[RelationType]>,
1324    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1325        let _timer = timer!("method");
1326
1327        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1328
1329        let hashed_linked_chunk_id =
1330            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1331
1332        let event_id = event_id.to_owned();
1333        let filters = filters.map(ToOwned::to_owned);
1334        let store = self.clone();
1335
1336        self.read()
1337            .await?
1338            .with_transaction(move |txn| -> Result<_> {
1339                find_event_relations_transaction(
1340                    store,
1341                    hashed_room_id,
1342                    hashed_linked_chunk_id,
1343                    event_id,
1344                    filters,
1345                    txn,
1346                )
1347            })
1348            .await
1349    }
1350
1351    #[instrument(skip(self))]
1352    async fn get_room_events(
1353        &self,
1354        room_id: &RoomId,
1355        event_type: Option<&str>,
1356        session_id: Option<&str>,
1357    ) -> Result<Vec<Event>, Self::Error> {
1358        let _timer = timer!("method");
1359
1360        let this = self.clone();
1361
1362        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1363        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1364        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1365
1366        self.read()
1367            .await?
1368            .with_transaction(move |txn| -> Result<_> {
1369                // I'm not sure why clippy claims that the clones aren't required. The compiler
1370                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1371                // much so let's silence things.
1372                #[allow(clippy::redundant_clone)]
1373                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1374                    (None, None) => {
1375                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1376                    }
1377                    (None, Some(session_id)) => (
1378                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1379                        params![hashed_room_id, session_id.to_owned()],
1380                    ),
1381                    (Some(event_type), None) => (
1382                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1383                        params![hashed_room_id, event_type.to_owned()]
1384                    ),
1385                    (Some(event_type), Some(session_id)) => (
1386                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1387                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1388                    ),
1389                };
1390
1391                let mut statement = txn.prepare(query)?;
1392                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1393
1394                let mut events = Vec::new();
1395                for ev in maybe_events {
1396                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1397                    events.push(event);
1398                }
1399
1400                Ok(events)
1401            })
1402            .await
1403    }
1404
1405    #[instrument(skip(self, event))]
1406    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1407        let _timer = timer!("method");
1408
1409        let Some(event_id) = event.event_id() else {
1410            error!("Trying to save an event with no ID");
1411            return Ok(());
1412        };
1413
1414        let Some(event_type) = event.kind.event_type() else {
1415            error!(%event_id, "Trying to save an event with no event type");
1416            return Ok(());
1417        };
1418
1419        let event_type = self.encode_key(keys::EVENTS, event_type);
1420        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1421
1422        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1423        let event_id = event_id.to_string();
1424        let encoded_event = self.encode_event(&event)?;
1425
1426        self.write()
1427            .await?
1428            .with_transaction(move |txn| -> Result<_> {
1429                txn.execute(
1430                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1431                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1432
1433                Ok(())
1434            })
1435            .await
1436    }
1437
1438    async fn optimize(&self) -> Result<(), Self::Error> {
1439        Ok(self.vacuum().await?)
1440    }
1441
1442    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1443        self.get_db_size().await
1444    }
1445}
1446
1447fn find_event_relations_transaction(
1448    store: SqliteEventCacheStore,
1449    hashed_room_id: Key,
1450    hashed_linked_chunk_id: Key,
1451    event_id: OwnedEventId,
1452    filters: Option<Vec<RelationType>>,
1453    txn: &Transaction<'_>,
1454) -> Result<Vec<(Event, Option<Position>)>> {
1455    let get_rows = |row: &rusqlite::Row<'_>| {
1456        Ok((
1457            row.get::<_, Vec<u8>>(0)?,
1458            row.get::<_, Option<u64>>(1)?,
1459            row.get::<_, Option<usize>>(2)?,
1460        ))
1461    };
1462
1463    // Collect related events.
1464    let collect_results = |transaction| {
1465        let mut related = Vec::new();
1466
1467        for result in transaction {
1468            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1469
1470            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1471
1472            // Only build the position if both the chunk_id and position were present; in
1473            // theory, they should either be present at the same time, or not at all.
1474            let pos = chunk_id
1475                .zip(index)
1476                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1477
1478            related.push((event, pos));
1479        }
1480
1481        Ok(related)
1482    };
1483
1484    if let Some(filters) = filters {
1485        let question_marks = repeat_vars(filters.len());
1486        let query = format!(
1487            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1488            FROM events
1489            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1490            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1491        );
1492
1493        // First the filters need to be stringified; because `.to_sql()` will borrow
1494        // from them, they also need to be stringified onto the stack, so as to
1495        // get a stable address (to avoid returning a temporary reference in the
1496        // map closure below).
1497        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1498        let filters_params: Vec<_> = filter_strings
1499            .iter()
1500            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1501            .collect();
1502
1503        let parameters = params_from_iter(
1504            [
1505                hashed_linked_chunk_id.to_sql().expect(
1506                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1507                ),
1508                event_id
1509                    .as_str()
1510                    .to_sql()
1511                    .expect("We should be able to convert an event ID to a SQLite value"),
1512                hashed_room_id
1513                    .to_sql()
1514                    .expect("We should be able to convert a room ID to a SQLite value"),
1515            ]
1516            .into_iter()
1517            .chain(filters_params),
1518        );
1519
1520        let mut transaction = txn.prepare(&query)?;
1521        let transaction = transaction.query_map(parameters, get_rows)?;
1522
1523        collect_results(transaction)
1524    } else {
1525        let query =
1526            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1527            FROM events
1528            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1529            WHERE relates_to = ? AND room_id = ?";
1530        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1531
1532        let mut transaction = txn.prepare(query)?;
1533        let transaction = transaction.query_map(parameters, get_rows)?;
1534
1535        collect_results(transaction)
1536    }
1537}
1538
1539/// Like `deadpool::managed::Object::with_transaction`, but starts the
1540/// transaction in immediate (write) mode from the beginning, precluding errors
1541/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1542/// both reads and writes, and start with a write.
1543async fn with_immediate_transaction<
1544    T: Send + 'static,
1545    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1546>(
1547    this: &SqliteEventCacheStore,
1548    f: F,
1549) -> Result<T, Error> {
1550    this.write()
1551        .await?
1552        .interact(move |conn| -> Result<T, Error> {
1553            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1554            // to avoid read transactions upgrading to write mode and causing
1555            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1556            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1557
1558            let code = || -> Result<T, Error> {
1559                let txn = conn.transaction()?;
1560                let res = f(&txn)?;
1561                txn.commit()?;
1562                Ok(res)
1563            };
1564
1565            let res = code();
1566
1567            // Reset the transaction behavior to use Deferred, after this transaction has
1568            // been run, whether it was successful or not.
1569            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1570
1571            res
1572        })
1573        .await
1574        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1575        .unwrap()
1576}
1577
1578fn insert_chunk(
1579    txn: &Transaction<'_>,
1580    linked_chunk_id: &Key,
1581    previous: Option<u64>,
1582    new: u64,
1583    next: Option<u64>,
1584    type_str: &str,
1585) -> rusqlite::Result<()> {
1586    // First, insert the new chunk.
1587    txn.execute(
1588        r#"
1589            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1590            VALUES (?, ?, ?, ?, ?)
1591        "#,
1592        (new, linked_chunk_id, previous, next, type_str),
1593    )?;
1594
1595    // If this chunk has a previous one, update its `next` field.
1596    if let Some(previous) = previous {
1597        txn.execute(
1598            r#"
1599                UPDATE linked_chunks
1600                SET next = ?
1601                WHERE id = ? AND linked_chunk_id = ?
1602            "#,
1603            (new, previous, linked_chunk_id),
1604        )?;
1605    }
1606
1607    // If this chunk has a next one, update its `previous` field.
1608    if let Some(next) = next {
1609        txn.execute(
1610            r#"
1611                UPDATE linked_chunks
1612                SET previous = ?
1613                WHERE id = ? AND linked_chunk_id = ?
1614            "#,
1615            (new, next, linked_chunk_id),
1616        )?;
1617    }
1618
1619    Ok(())
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624    use std::{
1625        path::PathBuf,
1626        sync::atomic::{AtomicU32, Ordering::SeqCst},
1627    };
1628
1629    use assert_matches::assert_matches;
1630    use matrix_sdk_base::{
1631        event_cache::{
1632            Gap,
1633            store::{
1634                EventCacheStore, EventCacheStoreError,
1635                integration_tests::{
1636                    check_test_event, make_test_event, make_test_event_with_event_id,
1637                },
1638            },
1639        },
1640        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1641        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1642    };
1643    use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1644    use once_cell::sync::Lazy;
1645    use ruma::{event_id, room_id};
1646    use tempfile::{TempDir, tempdir};
1647
1648    use super::SqliteEventCacheStore;
1649    use crate::{
1650        SqliteStoreConfig,
1651        event_cache_store::keys,
1652        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1653    };
1654
1655    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1656    static NUM: AtomicU32 = AtomicU32::new(0);
1657
1658    fn new_event_cache_store_workspace() -> PathBuf {
1659        let name = NUM.fetch_add(1, SeqCst).to_string();
1660        TMP_DIR.path().join(name)
1661    }
1662
1663    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1664        let tmpdir_path = new_event_cache_store_workspace();
1665
1666        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1667
1668        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1669    }
1670
1671    event_cache_store_integration_tests!();
1672    event_cache_store_integration_tests_time!();
1673
1674    #[async_test]
1675    async fn test_pool_size() {
1676        let tmpdir_path = new_event_cache_store_workspace();
1677        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1678
1679        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1680
1681        assert_eq!(store.pool.status().max_size, 42);
1682    }
1683
1684    #[async_test]
1685    async fn test_linked_chunk_new_items_chunk() {
1686        let store = get_event_cache_store().await.expect("creating cache store failed");
1687
1688        let room_id = &DEFAULT_TEST_ROOM_ID;
1689        let linked_chunk_id = LinkedChunkId::Room(room_id);
1690
1691        store
1692            .handle_linked_chunk_updates(
1693                linked_chunk_id,
1694                vec![
1695                    Update::NewItemsChunk {
1696                        previous: None,
1697                        new: ChunkIdentifier::new(42),
1698                        next: None, // Note: the store must link the next entry itself.
1699                    },
1700                    Update::NewItemsChunk {
1701                        previous: Some(ChunkIdentifier::new(42)),
1702                        new: ChunkIdentifier::new(13),
1703                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1704                                                               * the next link ahead of time. */
1705                    },
1706                    Update::NewItemsChunk {
1707                        previous: Some(ChunkIdentifier::new(13)),
1708                        new: ChunkIdentifier::new(37),
1709                        next: None,
1710                    },
1711                ],
1712            )
1713            .await
1714            .unwrap();
1715
1716        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1717
1718        assert_eq!(chunks.len(), 3);
1719
1720        {
1721            // Chunks are ordered from smaller to bigger IDs.
1722            let c = chunks.remove(0);
1723            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1724            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1725            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1726            assert_matches!(c.content, ChunkContent::Items(events) => {
1727                assert!(events.is_empty());
1728            });
1729
1730            let c = chunks.remove(0);
1731            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1732            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1733            assert_eq!(c.next, None);
1734            assert_matches!(c.content, ChunkContent::Items(events) => {
1735                assert!(events.is_empty());
1736            });
1737
1738            let c = chunks.remove(0);
1739            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1740            assert_eq!(c.previous, None);
1741            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1742            assert_matches!(c.content, ChunkContent::Items(events) => {
1743                assert!(events.is_empty());
1744            });
1745        }
1746    }
1747
1748    #[async_test]
1749    async fn test_linked_chunk_new_gap_chunk() {
1750        let store = get_event_cache_store().await.expect("creating cache store failed");
1751
1752        let room_id = &DEFAULT_TEST_ROOM_ID;
1753        let linked_chunk_id = LinkedChunkId::Room(room_id);
1754
1755        store
1756            .handle_linked_chunk_updates(
1757                linked_chunk_id,
1758                vec![Update::NewGapChunk {
1759                    previous: None,
1760                    new: ChunkIdentifier::new(42),
1761                    next: None,
1762                    gap: Gap { prev_token: "raclette".to_owned() },
1763                }],
1764            )
1765            .await
1766            .unwrap();
1767
1768        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1769
1770        assert_eq!(chunks.len(), 1);
1771
1772        // Chunks are ordered from smaller to bigger IDs.
1773        let c = chunks.remove(0);
1774        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1775        assert_eq!(c.previous, None);
1776        assert_eq!(c.next, None);
1777        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1778            assert_eq!(gap.prev_token, "raclette");
1779        });
1780    }
1781
1782    #[async_test]
1783    async fn test_linked_chunk_replace_item() {
1784        let store = get_event_cache_store().await.expect("creating cache store failed");
1785
1786        let room_id = &DEFAULT_TEST_ROOM_ID;
1787        let linked_chunk_id = LinkedChunkId::Room(room_id);
1788        let event_id = event_id!("$world");
1789
1790        store
1791            .handle_linked_chunk_updates(
1792                linked_chunk_id,
1793                vec![
1794                    Update::NewItemsChunk {
1795                        previous: None,
1796                        new: ChunkIdentifier::new(42),
1797                        next: None,
1798                    },
1799                    Update::PushItems {
1800                        at: Position::new(ChunkIdentifier::new(42), 0),
1801                        items: vec![
1802                            make_test_event(room_id, "hello"),
1803                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1804                        ],
1805                    },
1806                    Update::ReplaceItem {
1807                        at: Position::new(ChunkIdentifier::new(42), 1),
1808                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1809                    },
1810                ],
1811            )
1812            .await
1813            .unwrap();
1814
1815        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1816
1817        assert_eq!(chunks.len(), 1);
1818
1819        let c = chunks.remove(0);
1820        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1821        assert_eq!(c.previous, None);
1822        assert_eq!(c.next, None);
1823        assert_matches!(c.content, ChunkContent::Items(events) => {
1824            assert_eq!(events.len(), 2);
1825            check_test_event(&events[0], "hello");
1826            check_test_event(&events[1], "yolo");
1827        });
1828    }
1829
1830    #[async_test]
1831    async fn test_linked_chunk_remove_chunk() {
1832        let store = get_event_cache_store().await.expect("creating cache store failed");
1833
1834        let room_id = &DEFAULT_TEST_ROOM_ID;
1835        let linked_chunk_id = LinkedChunkId::Room(room_id);
1836
1837        store
1838            .handle_linked_chunk_updates(
1839                linked_chunk_id,
1840                vec![
1841                    Update::NewGapChunk {
1842                        previous: None,
1843                        new: ChunkIdentifier::new(42),
1844                        next: None,
1845                        gap: Gap { prev_token: "raclette".to_owned() },
1846                    },
1847                    Update::NewGapChunk {
1848                        previous: Some(ChunkIdentifier::new(42)),
1849                        new: ChunkIdentifier::new(43),
1850                        next: None,
1851                        gap: Gap { prev_token: "fondue".to_owned() },
1852                    },
1853                    Update::NewGapChunk {
1854                        previous: Some(ChunkIdentifier::new(43)),
1855                        new: ChunkIdentifier::new(44),
1856                        next: None,
1857                        gap: Gap { prev_token: "tartiflette".to_owned() },
1858                    },
1859                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1860                ],
1861            )
1862            .await
1863            .unwrap();
1864
1865        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1866
1867        assert_eq!(chunks.len(), 2);
1868
1869        // Chunks are ordered from smaller to bigger IDs.
1870        let c = chunks.remove(0);
1871        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1872        assert_eq!(c.previous, None);
1873        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1874        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1875            assert_eq!(gap.prev_token, "raclette");
1876        });
1877
1878        let c = chunks.remove(0);
1879        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1880        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1881        assert_eq!(c.next, None);
1882        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1883            assert_eq!(gap.prev_token, "tartiflette");
1884        });
1885
1886        // Check that cascading worked. Yes, SQLite, I doubt you.
1887        let gaps = store
1888            .read()
1889            .await
1890            .unwrap()
1891            .with_transaction(|txn| -> rusqlite::Result<_> {
1892                let mut gaps = Vec::new();
1893                for data in txn
1894                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1895                    .query_map((), |row| row.get::<_, u64>(0))?
1896                {
1897                    gaps.push(data?);
1898                }
1899                Ok(gaps)
1900            })
1901            .await
1902            .unwrap();
1903
1904        assert_eq!(gaps, vec![42, 44]);
1905    }
1906
1907    #[async_test]
1908    async fn test_linked_chunk_push_items() {
1909        let store = get_event_cache_store().await.expect("creating cache store failed");
1910
1911        let room_id = &DEFAULT_TEST_ROOM_ID;
1912        let linked_chunk_id = LinkedChunkId::Room(room_id);
1913
1914        store
1915            .handle_linked_chunk_updates(
1916                linked_chunk_id,
1917                vec![
1918                    Update::NewItemsChunk {
1919                        previous: None,
1920                        new: ChunkIdentifier::new(42),
1921                        next: None,
1922                    },
1923                    Update::PushItems {
1924                        at: Position::new(ChunkIdentifier::new(42), 0),
1925                        items: vec![
1926                            make_test_event(room_id, "hello"),
1927                            make_test_event(room_id, "world"),
1928                        ],
1929                    },
1930                    Update::PushItems {
1931                        at: Position::new(ChunkIdentifier::new(42), 2),
1932                        items: vec![make_test_event(room_id, "who?")],
1933                    },
1934                ],
1935            )
1936            .await
1937            .unwrap();
1938
1939        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1940
1941        assert_eq!(chunks.len(), 1);
1942
1943        let c = chunks.remove(0);
1944        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1945        assert_eq!(c.previous, None);
1946        assert_eq!(c.next, None);
1947        assert_matches!(c.content, ChunkContent::Items(events) => {
1948            assert_eq!(events.len(), 3);
1949
1950            check_test_event(&events[0], "hello");
1951            check_test_event(&events[1], "world");
1952            check_test_event(&events[2], "who?");
1953        });
1954    }
1955
1956    #[async_test]
1957    async fn test_linked_chunk_remove_item() {
1958        let store = get_event_cache_store().await.expect("creating cache store failed");
1959
1960        let room_id = *DEFAULT_TEST_ROOM_ID;
1961        let linked_chunk_id = LinkedChunkId::Room(room_id);
1962
1963        store
1964            .handle_linked_chunk_updates(
1965                linked_chunk_id,
1966                vec![
1967                    Update::NewItemsChunk {
1968                        previous: None,
1969                        new: ChunkIdentifier::new(42),
1970                        next: None,
1971                    },
1972                    Update::PushItems {
1973                        at: Position::new(ChunkIdentifier::new(42), 0),
1974                        items: vec![
1975                            make_test_event(room_id, "one"),
1976                            make_test_event(room_id, "two"),
1977                            make_test_event(room_id, "three"),
1978                            make_test_event(room_id, "four"),
1979                            make_test_event(room_id, "five"),
1980                            make_test_event(room_id, "six"),
1981                        ],
1982                    },
1983                    Update::RemoveItem {
1984                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
1985                    },
1986                ],
1987            )
1988            .await
1989            .unwrap();
1990
1991        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1992
1993        assert_eq!(chunks.len(), 1);
1994
1995        let c = chunks.remove(0);
1996        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1997        assert_eq!(c.previous, None);
1998        assert_eq!(c.next, None);
1999        assert_matches!(c.content, ChunkContent::Items(events) => {
2000            assert_eq!(events.len(), 5);
2001            check_test_event(&events[0], "one");
2002            check_test_event(&events[1], "two");
2003            check_test_event(&events[2], "four");
2004            check_test_event(&events[3], "five");
2005            check_test_event(&events[4], "six");
2006        });
2007
2008        // Make sure the position have been updated for the remaining events.
2009        let num_rows: u64 = store
2010            .read()
2011            .await
2012            .unwrap()
2013            .with_transaction(move |txn| {
2014                txn.query_row(
2015                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2016                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2017                    |row| row.get(0),
2018                )
2019            })
2020            .await
2021            .unwrap();
2022        assert_eq!(num_rows, 3);
2023    }
2024
2025    #[async_test]
2026    async fn test_linked_chunk_detach_last_items() {
2027        let store = get_event_cache_store().await.expect("creating cache store failed");
2028
2029        let room_id = *DEFAULT_TEST_ROOM_ID;
2030        let linked_chunk_id = LinkedChunkId::Room(room_id);
2031
2032        store
2033            .handle_linked_chunk_updates(
2034                linked_chunk_id,
2035                vec![
2036                    Update::NewItemsChunk {
2037                        previous: None,
2038                        new: ChunkIdentifier::new(42),
2039                        next: None,
2040                    },
2041                    Update::PushItems {
2042                        at: Position::new(ChunkIdentifier::new(42), 0),
2043                        items: vec![
2044                            make_test_event(room_id, "hello"),
2045                            make_test_event(room_id, "world"),
2046                            make_test_event(room_id, "howdy"),
2047                        ],
2048                    },
2049                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2050                ],
2051            )
2052            .await
2053            .unwrap();
2054
2055        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2056
2057        assert_eq!(chunks.len(), 1);
2058
2059        let c = chunks.remove(0);
2060        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2061        assert_eq!(c.previous, None);
2062        assert_eq!(c.next, None);
2063        assert_matches!(c.content, ChunkContent::Items(events) => {
2064            assert_eq!(events.len(), 1);
2065            check_test_event(&events[0], "hello");
2066        });
2067    }
2068
2069    #[async_test]
2070    async fn test_linked_chunk_start_end_reattach_items() {
2071        let store = get_event_cache_store().await.expect("creating cache store failed");
2072
2073        let room_id = *DEFAULT_TEST_ROOM_ID;
2074        let linked_chunk_id = LinkedChunkId::Room(room_id);
2075
2076        // Same updates and checks as test_linked_chunk_push_items, but with extra
2077        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2078        // effects.
2079        store
2080            .handle_linked_chunk_updates(
2081                linked_chunk_id,
2082                vec![
2083                    Update::NewItemsChunk {
2084                        previous: None,
2085                        new: ChunkIdentifier::new(42),
2086                        next: None,
2087                    },
2088                    Update::PushItems {
2089                        at: Position::new(ChunkIdentifier::new(42), 0),
2090                        items: vec![
2091                            make_test_event(room_id, "hello"),
2092                            make_test_event(room_id, "world"),
2093                            make_test_event(room_id, "howdy"),
2094                        ],
2095                    },
2096                    Update::StartReattachItems,
2097                    Update::EndReattachItems,
2098                ],
2099            )
2100            .await
2101            .unwrap();
2102
2103        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2104
2105        assert_eq!(chunks.len(), 1);
2106
2107        let c = chunks.remove(0);
2108        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2109        assert_eq!(c.previous, None);
2110        assert_eq!(c.next, None);
2111        assert_matches!(c.content, ChunkContent::Items(events) => {
2112            assert_eq!(events.len(), 3);
2113            check_test_event(&events[0], "hello");
2114            check_test_event(&events[1], "world");
2115            check_test_event(&events[2], "howdy");
2116        });
2117    }
2118
2119    #[async_test]
2120    async fn test_linked_chunk_clear() {
2121        let store = get_event_cache_store().await.expect("creating cache store failed");
2122
2123        let room_id = *DEFAULT_TEST_ROOM_ID;
2124        let linked_chunk_id = LinkedChunkId::Room(room_id);
2125        let event_0 = make_test_event(room_id, "hello");
2126        let event_1 = make_test_event(room_id, "world");
2127        let event_2 = make_test_event(room_id, "howdy");
2128
2129        store
2130            .handle_linked_chunk_updates(
2131                linked_chunk_id,
2132                vec![
2133                    Update::NewItemsChunk {
2134                        previous: None,
2135                        new: ChunkIdentifier::new(42),
2136                        next: None,
2137                    },
2138                    Update::NewGapChunk {
2139                        previous: Some(ChunkIdentifier::new(42)),
2140                        new: ChunkIdentifier::new(54),
2141                        next: None,
2142                        gap: Gap { prev_token: "fondue".to_owned() },
2143                    },
2144                    Update::PushItems {
2145                        at: Position::new(ChunkIdentifier::new(42), 0),
2146                        items: vec![event_0.clone(), event_1, event_2],
2147                    },
2148                    Update::Clear,
2149                ],
2150            )
2151            .await
2152            .unwrap();
2153
2154        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2155        assert!(chunks.is_empty());
2156
2157        // Check that cascading worked. Yes, SQLite, I doubt you.
2158        store
2159            .read()
2160            .await
2161            .unwrap()
2162            .with_transaction(|txn| -> rusqlite::Result<_> {
2163                let num_gaps = txn
2164                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2165                    .query_row((), |row| row.get::<_, u64>(0))?;
2166                assert_eq!(num_gaps, 0);
2167
2168                let num_events = txn
2169                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2170                    .query_row((), |row| row.get::<_, u64>(0))?;
2171                assert_eq!(num_events, 0);
2172
2173                Ok(())
2174            })
2175            .await
2176            .unwrap();
2177
2178        // It's okay to re-insert a past event.
2179        store
2180            .handle_linked_chunk_updates(
2181                linked_chunk_id,
2182                vec![
2183                    Update::NewItemsChunk {
2184                        previous: None,
2185                        new: ChunkIdentifier::new(42),
2186                        next: None,
2187                    },
2188                    Update::PushItems {
2189                        at: Position::new(ChunkIdentifier::new(42), 0),
2190                        items: vec![event_0],
2191                    },
2192                ],
2193            )
2194            .await
2195            .unwrap();
2196    }
2197
2198    #[async_test]
2199    async fn test_linked_chunk_multiple_rooms() {
2200        let store = get_event_cache_store().await.expect("creating cache store failed");
2201
2202        let room1 = room_id!("!realcheeselovers:raclette.fr");
2203        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2204        let room2 = room_id!("!realcheeselovers:fondue.ch");
2205        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2206
2207        // Check that applying updates to one room doesn't affect the others.
2208        // Use the same chunk identifier in both rooms to battle-test search.
2209
2210        store
2211            .handle_linked_chunk_updates(
2212                linked_chunk_id1,
2213                vec![
2214                    Update::NewItemsChunk {
2215                        previous: None,
2216                        new: ChunkIdentifier::new(42),
2217                        next: None,
2218                    },
2219                    Update::PushItems {
2220                        at: Position::new(ChunkIdentifier::new(42), 0),
2221                        items: vec![
2222                            make_test_event(room1, "best cheese is raclette"),
2223                            make_test_event(room1, "obviously"),
2224                        ],
2225                    },
2226                ],
2227            )
2228            .await
2229            .unwrap();
2230
2231        store
2232            .handle_linked_chunk_updates(
2233                linked_chunk_id2,
2234                vec![
2235                    Update::NewItemsChunk {
2236                        previous: None,
2237                        new: ChunkIdentifier::new(42),
2238                        next: None,
2239                    },
2240                    Update::PushItems {
2241                        at: Position::new(ChunkIdentifier::new(42), 0),
2242                        items: vec![make_test_event(room1, "beaufort is the best")],
2243                    },
2244                ],
2245            )
2246            .await
2247            .unwrap();
2248
2249        // Check chunks from room 1.
2250        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2251        assert_eq!(chunks_room1.len(), 1);
2252
2253        let c = chunks_room1.remove(0);
2254        assert_matches!(c.content, ChunkContent::Items(events) => {
2255            assert_eq!(events.len(), 2);
2256            check_test_event(&events[0], "best cheese is raclette");
2257            check_test_event(&events[1], "obviously");
2258        });
2259
2260        // Check chunks from room 2.
2261        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2262        assert_eq!(chunks_room2.len(), 1);
2263
2264        let c = chunks_room2.remove(0);
2265        assert_matches!(c.content, ChunkContent::Items(events) => {
2266            assert_eq!(events.len(), 1);
2267            check_test_event(&events[0], "beaufort is the best");
2268        });
2269    }
2270
2271    #[async_test]
2272    async fn test_linked_chunk_update_is_a_transaction() {
2273        let store = get_event_cache_store().await.expect("creating cache store failed");
2274
2275        let room_id = *DEFAULT_TEST_ROOM_ID;
2276        let linked_chunk_id = LinkedChunkId::Room(room_id);
2277
2278        // Trigger a violation of the unique constraint on the (room id, chunk id)
2279        // couple.
2280        let err = store
2281            .handle_linked_chunk_updates(
2282                linked_chunk_id,
2283                vec![
2284                    Update::NewItemsChunk {
2285                        previous: None,
2286                        new: ChunkIdentifier::new(42),
2287                        next: None,
2288                    },
2289                    Update::NewItemsChunk {
2290                        previous: None,
2291                        new: ChunkIdentifier::new(42),
2292                        next: None,
2293                    },
2294                ],
2295            )
2296            .await
2297            .unwrap_err();
2298
2299        // The operation fails with a constraint violation error.
2300        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2301            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2302        });
2303
2304        // If the updates have been handled transactionally, then no new chunks should
2305        // have been added; failure of the second update leads to the first one being
2306        // rolled back.
2307        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2308        assert!(chunks.is_empty());
2309    }
2310
2311    #[async_test]
2312    async fn test_filter_duplicate_events_no_events() {
2313        let store = get_event_cache_store().await.expect("creating cache store failed");
2314
2315        let room_id = *DEFAULT_TEST_ROOM_ID;
2316        let linked_chunk_id = LinkedChunkId::Room(room_id);
2317        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2318        assert!(duplicates.is_empty());
2319    }
2320
2321    #[async_test]
2322    async fn test_load_last_chunk() {
2323        let room_id = room_id!("!r0:matrix.org");
2324        let linked_chunk_id = LinkedChunkId::Room(room_id);
2325        let event = |msg: &str| make_test_event(room_id, msg);
2326        let store = get_event_cache_store().await.expect("creating cache store failed");
2327
2328        // Case #1: no last chunk.
2329        {
2330            let (last_chunk, chunk_identifier_generator) =
2331                store.load_last_chunk(linked_chunk_id).await.unwrap();
2332
2333            assert!(last_chunk.is_none());
2334            assert_eq!(chunk_identifier_generator.current(), 0);
2335        }
2336
2337        // Case #2: only one chunk is present.
2338        {
2339            store
2340                .handle_linked_chunk_updates(
2341                    linked_chunk_id,
2342                    vec![
2343                        Update::NewItemsChunk {
2344                            previous: None,
2345                            new: ChunkIdentifier::new(42),
2346                            next: None,
2347                        },
2348                        Update::PushItems {
2349                            at: Position::new(ChunkIdentifier::new(42), 0),
2350                            items: vec![event("saucisse de morteau"), event("comté")],
2351                        },
2352                    ],
2353                )
2354                .await
2355                .unwrap();
2356
2357            let (last_chunk, chunk_identifier_generator) =
2358                store.load_last_chunk(linked_chunk_id).await.unwrap();
2359
2360            assert_matches!(last_chunk, Some(last_chunk) => {
2361                assert_eq!(last_chunk.identifier, 42);
2362                assert!(last_chunk.previous.is_none());
2363                assert!(last_chunk.next.is_none());
2364                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2365                    assert_eq!(items.len(), 2);
2366                    check_test_event(&items[0], "saucisse de morteau");
2367                    check_test_event(&items[1], "comté");
2368                });
2369            });
2370            assert_eq!(chunk_identifier_generator.current(), 42);
2371        }
2372
2373        // Case #3: more chunks are present.
2374        {
2375            store
2376                .handle_linked_chunk_updates(
2377                    linked_chunk_id,
2378                    vec![
2379                        Update::NewItemsChunk {
2380                            previous: Some(ChunkIdentifier::new(42)),
2381                            new: ChunkIdentifier::new(7),
2382                            next: None,
2383                        },
2384                        Update::PushItems {
2385                            at: Position::new(ChunkIdentifier::new(7), 0),
2386                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2387                        },
2388                    ],
2389                )
2390                .await
2391                .unwrap();
2392
2393            let (last_chunk, chunk_identifier_generator) =
2394                store.load_last_chunk(linked_chunk_id).await.unwrap();
2395
2396            assert_matches!(last_chunk, Some(last_chunk) => {
2397                assert_eq!(last_chunk.identifier, 7);
2398                assert_matches!(last_chunk.previous, Some(previous) => {
2399                    assert_eq!(previous, 42);
2400                });
2401                assert!(last_chunk.next.is_none());
2402                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2403                    assert_eq!(items.len(), 3);
2404                    check_test_event(&items[0], "fondue");
2405                    check_test_event(&items[1], "gruyère");
2406                    check_test_event(&items[2], "mont d'or");
2407                });
2408            });
2409            assert_eq!(chunk_identifier_generator.current(), 42);
2410        }
2411    }
2412
2413    #[async_test]
2414    async fn test_load_last_chunk_with_a_cycle() {
2415        let room_id = room_id!("!r0:matrix.org");
2416        let linked_chunk_id = LinkedChunkId::Room(room_id);
2417        let store = get_event_cache_store().await.expect("creating cache store failed");
2418
2419        store
2420            .handle_linked_chunk_updates(
2421                linked_chunk_id,
2422                vec![
2423                    Update::NewItemsChunk {
2424                        previous: None,
2425                        new: ChunkIdentifier::new(0),
2426                        next: None,
2427                    },
2428                    Update::NewItemsChunk {
2429                        // Because `previous` connects to chunk #0, it will create a cycle.
2430                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2431                        // **does not exist**. We have to detect this cycle.
2432                        previous: Some(ChunkIdentifier::new(0)),
2433                        new: ChunkIdentifier::new(1),
2434                        next: Some(ChunkIdentifier::new(0)),
2435                    },
2436                ],
2437            )
2438            .await
2439            .unwrap();
2440
2441        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2442    }
2443
2444    #[async_test]
2445    async fn test_load_previous_chunk() {
2446        let room_id = room_id!("!r0:matrix.org");
2447        let linked_chunk_id = LinkedChunkId::Room(room_id);
2448        let event = |msg: &str| make_test_event(room_id, msg);
2449        let store = get_event_cache_store().await.expect("creating cache store failed");
2450
2451        // Case #1: no chunk at all, equivalent to having an nonexistent
2452        // `before_chunk_identifier`.
2453        {
2454            let previous_chunk = store
2455                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2456                .await
2457                .unwrap();
2458
2459            assert!(previous_chunk.is_none());
2460        }
2461
2462        // Case #2: there is one chunk only: we request the previous on this
2463        // one, it doesn't exist.
2464        {
2465            store
2466                .handle_linked_chunk_updates(
2467                    linked_chunk_id,
2468                    vec![Update::NewItemsChunk {
2469                        previous: None,
2470                        new: ChunkIdentifier::new(42),
2471                        next: None,
2472                    }],
2473                )
2474                .await
2475                .unwrap();
2476
2477            let previous_chunk =
2478                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2479
2480            assert!(previous_chunk.is_none());
2481        }
2482
2483        // Case #3: there are two chunks.
2484        {
2485            store
2486                .handle_linked_chunk_updates(
2487                    linked_chunk_id,
2488                    vec![
2489                        // new chunk before the one that exists.
2490                        Update::NewItemsChunk {
2491                            previous: None,
2492                            new: ChunkIdentifier::new(7),
2493                            next: Some(ChunkIdentifier::new(42)),
2494                        },
2495                        Update::PushItems {
2496                            at: Position::new(ChunkIdentifier::new(7), 0),
2497                            items: vec![event("brigand du jorat"), event("morbier")],
2498                        },
2499                    ],
2500                )
2501                .await
2502                .unwrap();
2503
2504            let previous_chunk =
2505                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2506
2507            assert_matches!(previous_chunk, Some(previous_chunk) => {
2508                assert_eq!(previous_chunk.identifier, 7);
2509                assert!(previous_chunk.previous.is_none());
2510                assert_matches!(previous_chunk.next, Some(next) => {
2511                    assert_eq!(next, 42);
2512                });
2513                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2514                    assert_eq!(items.len(), 2);
2515                    check_test_event(&items[0], "brigand du jorat");
2516                    check_test_event(&items[1], "morbier");
2517                });
2518            });
2519        }
2520    }
2521}
2522
2523#[cfg(test)]
2524mod encrypted_tests {
2525    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2526
2527    use matrix_sdk_base::{
2528        event_cache::store::{EventCacheStore, EventCacheStoreError},
2529        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2530    };
2531    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2532    use once_cell::sync::Lazy;
2533    use ruma::{
2534        event_id,
2535        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2536        room_id, user_id,
2537    };
2538    use tempfile::{TempDir, tempdir};
2539
2540    use super::SqliteEventCacheStore;
2541
2542    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2543    static NUM: AtomicU32 = AtomicU32::new(0);
2544
2545    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2546        let name = NUM.fetch_add(1, SeqCst).to_string();
2547        let tmpdir_path = TMP_DIR.path().join(name);
2548
2549        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2550
2551        Ok(SqliteEventCacheStore::open(
2552            tmpdir_path.to_str().unwrap(),
2553            Some("default_test_password"),
2554        )
2555        .await
2556        .unwrap())
2557    }
2558
2559    event_cache_store_integration_tests!();
2560    event_cache_store_integration_tests_time!();
2561
2562    #[async_test]
2563    async fn test_no_sqlite_injection_in_find_event_relations() {
2564        let room_id = room_id!("!test:localhost");
2565        let another_room_id = room_id!("!r1:matrix.org");
2566        let sender = user_id!("@alice:localhost");
2567
2568        let store = get_event_cache_store()
2569            .await
2570            .expect("We should be able to create a new, empty, event cache store");
2571
2572        let f = EventFactory::new().room(room_id).sender(sender);
2573
2574        // Create an event for the first room.
2575        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2576        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2577
2578        // Create a related event.
2579        let edit_id = event_id!("$find_me:matrix.org");
2580        let edit = f
2581            .text_msg("Find me")
2582            .event_id(edit_id)
2583            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2584            .into_event();
2585
2586        // Create an event for the second room.
2587        let f = f.room(another_room_id);
2588
2589        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2590        let another_event =
2591            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2592
2593        // Save the events in the DB.
2594        store.save_event(room_id, event).await.unwrap();
2595        store.save_event(room_id, edit).await.unwrap();
2596        store.save_event(another_room_id, another_event).await.unwrap();
2597
2598        // Craft a `RelationType` that will inject some SQL to be executed. The
2599        // `OR 1=1` ensures that all the previous parameters, the room
2600        // ID and event ID are ignored.
2601        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2602
2603        // Attempt to find events in the first room.
2604        let results = store
2605            .find_event_relations(room_id, event_id, filter.as_deref())
2606            .await
2607            .expect("We should be able to attempt to find event relations");
2608
2609        // Ensure that we only got the single related event the first room contains.
2610        similar_asserts::assert_eq!(
2611            results.len(),
2612            1,
2613            "We should only have loaded events for the first room {results:#?}"
2614        );
2615
2616        // The event needs to be the edit event, otherwise something is wrong.
2617        let (found_event, _) = &results[0];
2618        assert_eq!(
2619            found_event.event_id().as_deref(),
2620            Some(edit_id),
2621            "The single event we found should be the edit event"
2622        );
2623    }
2624}