matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{extract_event_relation, EventCacheStore},
25        Event, Gap,
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{
38    params, params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior,
39};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
48    error::{Error, Result},
49    utils::{
50        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
51        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
52    },
53    OpenStoreError, Secret, SqliteStoreConfig,
54};
55
56mod keys {
57    // Tables
58    pub const LINKED_CHUNKS: &str = "linked_chunks";
59    pub const EVENTS: &str = "events";
60}
61
62/// The database name.
63const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65/// Identifier of the latest database version.
66///
67/// This is used to figure whether the SQLite database requires a migration.
68/// Every new SQL migration should imply a bump of this number, and changes in
69/// the [`run_migrations`] function.
70const DATABASE_VERSION: u8 = 13;
71
72/// The string used to identify a chunk of type events, in the `type` field in
73/// the database.
74const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75/// The string used to identify a chunk of type gap, in the `type` field in the
76/// database.
77const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79/// An SQLite-based event cache store.
80#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82    store_cipher: Option<Arc<StoreCipher>>,
83
84    /// The pool of connections.
85    pool: SqlitePool,
86
87    /// We make the difference between connections for read operations, and for
88    /// write operations. We keep a single connection apart from write
89    /// operations. All other connections are used for read operations. The
90    /// lock is used to ensure there is one owner at a time.
91    write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98    }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102    fn get_cypher(&self) -> Option<&StoreCipher> {
103        self.store_cipher.as_deref()
104    }
105}
106
107impl SqliteEventCacheStore {
108    /// Open the SQLite-based event cache store at the given path using the
109    /// given passphrase to encrypt private data.
110    pub async fn open(
111        path: impl AsRef<Path>,
112        passphrase: Option<&str>,
113    ) -> Result<Self, OpenStoreError> {
114        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115    }
116
117    /// Open the SQLite-based event cache store at the given path using the
118    /// given key to encrypt private data.
119    pub async fn open_with_key(
120        path: impl AsRef<Path>,
121        key: Option<&[u8; 32]>,
122    ) -> Result<Self, OpenStoreError> {
123        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124    }
125
126    /// Open the SQLite-based event cache store with the config open config.
127    #[instrument(skip(config), fields(path = ?config.path))]
128    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129        debug!(?config);
130
131        let _timer = timer!("open_with_config");
132
133        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137        let this = Self::open_with_pool(pool, config.secret).await?;
138        this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140        Ok(this)
141    }
142
143    /// Open an SQLite-based event cache store using the given SQLite database
144    /// pool. The given secret will be used to encrypt private data.
145    async fn open_with_pool(
146        pool: SqlitePool,
147        secret: Option<Secret>,
148    ) -> Result<Self, OpenStoreError> {
149        let conn = pool.get().await?;
150
151        let version = conn.db_version().await?;
152        run_migrations(&conn, version).await?;
153
154        let store_cipher = match secret {
155            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
156            None => None,
157        };
158
159        Ok(Self {
160            store_cipher,
161            pool,
162            // Use `conn` as our selected write connections.
163            write_connection: Arc::new(Mutex::new(conn)),
164        })
165    }
166
167    /// Acquire a connection for executing read operations.
168    #[instrument(skip_all)]
169    async fn read(&self) -> Result<SqliteAsyncConn> {
170        trace!("Taking a `read` connection");
171        let _timer = timer!("connection");
172
173        let connection = self.pool.get().await?;
174
175        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
176        // support must be enabled on a per-connection basis. Execute it every
177        // time we try to get a connection, since we can't guarantee a previous
178        // connection did enable it before.
179        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
180
181        Ok(connection)
182    }
183
184    /// Acquire a connection for executing write operations.
185    #[instrument(skip_all)]
186    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
187        trace!("Taking a `write` connection");
188        let _timer = timer!("connection");
189
190        let connection = self.write_connection.clone().lock_owned().await;
191
192        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
193        // support must be enabled on a per-connection basis. Execute it every
194        // time we try to get a connection, since we can't guarantee a previous
195        // connection did enable it before.
196        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
197
198        Ok(connection)
199    }
200
201    fn map_row_to_chunk(
202        row: &rusqlite::Row<'_>,
203    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
204        Ok((
205            row.get::<_, u64>(0)?,
206            row.get::<_, Option<u64>>(1)?,
207            row.get::<_, Option<u64>>(2)?,
208            row.get::<_, String>(3)?,
209        ))
210    }
211
212    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
213        let serialized = serde_json::to_vec(event)?;
214
215        // Extract the relationship info here.
216        let raw_event = event.raw();
217        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
218
219        // The content may be encrypted.
220        let content = self.encode_value(serialized)?;
221
222        Ok(EncodedEvent {
223            content,
224            rel_type,
225            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
226        })
227    }
228}
229
230struct EncodedEvent {
231    content: Vec<u8>,
232    rel_type: Option<String>,
233    relates_to: Option<String>,
234}
235
236trait TransactionExtForLinkedChunks {
237    fn rebuild_chunk(
238        &self,
239        store: &SqliteEventCacheStore,
240        linked_chunk_id: &Key,
241        previous: Option<u64>,
242        index: u64,
243        next: Option<u64>,
244        chunk_type: &str,
245    ) -> Result<RawChunk<Event, Gap>>;
246
247    fn load_gap_content(
248        &self,
249        store: &SqliteEventCacheStore,
250        linked_chunk_id: &Key,
251        chunk_id: ChunkIdentifier,
252    ) -> Result<Gap>;
253
254    fn load_events_content(
255        &self,
256        store: &SqliteEventCacheStore,
257        linked_chunk_id: &Key,
258        chunk_id: ChunkIdentifier,
259    ) -> Result<Vec<Event>>;
260}
261
262impl TransactionExtForLinkedChunks for Transaction<'_> {
263    fn rebuild_chunk(
264        &self,
265        store: &SqliteEventCacheStore,
266        linked_chunk_id: &Key,
267        previous: Option<u64>,
268        id: u64,
269        next: Option<u64>,
270        chunk_type: &str,
271    ) -> Result<RawChunk<Event, Gap>> {
272        let previous = previous.map(ChunkIdentifier::new);
273        let next = next.map(ChunkIdentifier::new);
274        let id = ChunkIdentifier::new(id);
275
276        match chunk_type {
277            CHUNK_TYPE_GAP_TYPE_STRING => {
278                // It's a gap!
279                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
280                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
281            }
282
283            CHUNK_TYPE_EVENT_TYPE_STRING => {
284                // It's events!
285                let events = self.load_events_content(store, linked_chunk_id, id)?;
286                Ok(RawChunk {
287                    content: ChunkContent::Items(events),
288                    previous,
289                    identifier: id,
290                    next,
291                })
292            }
293
294            other => {
295                // It's an error!
296                Err(Error::InvalidData {
297                    details: format!("a linked chunk has an unknown type {other}"),
298                })
299            }
300        }
301    }
302
303    fn load_gap_content(
304        &self,
305        store: &SqliteEventCacheStore,
306        linked_chunk_id: &Key,
307        chunk_id: ChunkIdentifier,
308    ) -> Result<Gap> {
309        // There's at most one row for it in the database, so a call to `query_row` is
310        // sufficient.
311        let encoded_prev_token: Vec<u8> = self.query_row(
312            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
313            (chunk_id.index(), &linked_chunk_id),
314            |row| row.get(0),
315        )?;
316        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
317        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
318        Ok(Gap { prev_token })
319    }
320
321    fn load_events_content(
322        &self,
323        store: &SqliteEventCacheStore,
324        linked_chunk_id: &Key,
325        chunk_id: ChunkIdentifier,
326    ) -> Result<Vec<Event>> {
327        // Retrieve all the events from the database.
328        let mut events = Vec::new();
329
330        for event_data in self
331            .prepare(
332                r#"
333                    SELECT events.content
334                    FROM event_chunks ec, events
335                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
336                    ORDER BY ec.position ASC
337                "#,
338            )?
339            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
340        {
341            let encoded_content = event_data?;
342            let serialized_content = store.decode_value(&encoded_content)?;
343            let event = serde_json::from_slice(&serialized_content)?;
344
345            events.push(event);
346        }
347
348        Ok(events)
349    }
350}
351
352/// Run migrations for the given version of the database.
353async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
354    if version == 0 {
355        debug!("Creating database");
356    } else if version < DATABASE_VERSION {
357        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
358    } else {
359        return Ok(());
360    }
361
362    // Always enable foreign keys for the current connection.
363    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
364
365    if version < 1 {
366        // First turn on WAL mode, this can't be done in the transaction, it fails with
367        // the error message: "cannot change into wal mode from within a transaction".
368        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
369        conn.with_transaction(|txn| {
370            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
371            txn.set_db_version(1)
372        })
373        .await?;
374    }
375
376    if version < 2 {
377        conn.with_transaction(|txn| {
378            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
379            txn.set_db_version(2)
380        })
381        .await?;
382    }
383
384    if version < 3 {
385        conn.with_transaction(|txn| {
386            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
387            txn.set_db_version(3)
388        })
389        .await?;
390    }
391
392    if version < 4 {
393        conn.with_transaction(|txn| {
394            txn.execute_batch(include_str!(
395                "../migrations/event_cache_store/004_ignore_policy.sql"
396            ))?;
397            txn.set_db_version(4)
398        })
399        .await?;
400    }
401
402    if version < 5 {
403        conn.with_transaction(|txn| {
404            txn.execute_batch(include_str!(
405                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
406            ))?;
407            txn.set_db_version(5)
408        })
409        .await?;
410    }
411
412    if version < 6 {
413        conn.with_transaction(|txn| {
414            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
415            txn.set_db_version(6)
416        })
417        .await?;
418    }
419
420    if version < 7 {
421        conn.with_transaction(|txn| {
422            txn.execute_batch(include_str!(
423                "../migrations/event_cache_store/007_event_chunks.sql"
424            ))?;
425            txn.set_db_version(7)
426        })
427        .await?;
428    }
429
430    if version < 8 {
431        conn.with_transaction(|txn| {
432            txn.execute_batch(include_str!(
433                "../migrations/event_cache_store/008_linked_chunk_id.sql"
434            ))?;
435            txn.set_db_version(8)
436        })
437        .await?;
438    }
439
440    if version < 9 {
441        conn.with_transaction(|txn| {
442            txn.execute_batch(include_str!(
443                "../migrations/event_cache_store/009_related_event_index.sql"
444            ))?;
445            txn.set_db_version(9)
446        })
447        .await?;
448    }
449
450    if version < 10 {
451        conn.with_transaction(|txn| {
452            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
453            txn.set_db_version(10)
454        })
455        .await?;
456
457        if version >= 1 {
458            // Defragment the DB and optimize its size on the filesystem now that we removed
459            // the media cache.
460            conn.vacuum().await?;
461        }
462    }
463
464    if version < 11 {
465        conn.with_transaction(|txn| {
466            txn.execute_batch(include_str!(
467                "../migrations/event_cache_store/011_empty_event_cache.sql"
468            ))?;
469            txn.set_db_version(11)
470        })
471        .await?;
472    }
473
474    if version < 12 {
475        conn.with_transaction(|txn| {
476            txn.execute_batch(include_str!(
477                "../migrations/event_cache_store/012_store_event_type.sql"
478            ))?;
479            txn.set_db_version(12)
480        })
481        .await?;
482    }
483
484    if version < 13 {
485        conn.with_transaction(|txn| {
486            txn.execute_batch(include_str!(
487                "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
488            ))?;
489            txn.set_db_version(13)
490        })
491        .await?;
492    }
493
494    Ok(())
495}
496
497#[async_trait]
498impl EventCacheStore for SqliteEventCacheStore {
499    type Error = Error;
500
501    #[instrument(skip(self))]
502    async fn try_take_leased_lock(
503        &self,
504        lease_duration_ms: u32,
505        key: &str,
506        holder: &str,
507    ) -> Result<Option<CrossProcessLockGeneration>> {
508        let _timer = timer!("method");
509
510        let key = key.to_owned();
511        let holder = holder.to_owned();
512
513        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
514        let expiration = now + lease_duration_ms as u64;
515
516        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
517        let generation = self
518            .write()
519            .await?
520            .with_transaction(move |txn| {
521                txn.query_row(
522                    "INSERT INTO lease_locks (key, holder, expiration)
523                    VALUES (?1, ?2, ?3)
524                    ON CONFLICT (key)
525                    DO
526                        UPDATE SET
527                            holder = excluded.holder,
528                            expiration = excluded.expiration,
529                            generation =
530                                CASE holder
531                                    WHEN excluded.holder THEN generation
532                                    ELSE generation + 1
533                                END
534                        WHERE
535                            holder = excluded.holder
536                            OR expiration < ?4
537                    RETURNING generation
538                    ",
539                    (key, holder, expiration, now),
540                    |row| row.get(0),
541                )
542                .optional()
543            })
544            .await?;
545
546        Ok(generation)
547    }
548
549    #[instrument(skip(self, updates))]
550    async fn handle_linked_chunk_updates(
551        &self,
552        linked_chunk_id: LinkedChunkId<'_>,
553        updates: Vec<Update<Event, Gap>>,
554    ) -> Result<(), Self::Error> {
555        let _timer = timer!("method");
556
557        // Use a single transaction throughout this function, so that either all updates
558        // work, or none is taken into account.
559        let hashed_linked_chunk_id =
560            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
561        let linked_chunk_id = linked_chunk_id.to_owned();
562        let this = self.clone();
563
564        with_immediate_transaction(self, move |txn| {
565            for up in updates {
566                match up {
567                    Update::NewItemsChunk { previous, new, next } => {
568                        let previous = previous.as_ref().map(ChunkIdentifier::index);
569                        let new = new.index();
570                        let next = next.as_ref().map(ChunkIdentifier::index);
571
572                        trace!(
573                            %linked_chunk_id,
574                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
575                        );
576
577                        insert_chunk(
578                            txn,
579                            &hashed_linked_chunk_id,
580                            previous,
581                            new,
582                            next,
583                            CHUNK_TYPE_EVENT_TYPE_STRING,
584                        )?;
585                    }
586
587                    Update::NewGapChunk { previous, new, next, gap } => {
588                        let serialized = serde_json::to_vec(&gap.prev_token)?;
589                        let prev_token = this.encode_value(serialized)?;
590
591                        let previous = previous.as_ref().map(ChunkIdentifier::index);
592                        let new = new.index();
593                        let next = next.as_ref().map(ChunkIdentifier::index);
594
595                        trace!(
596                            %linked_chunk_id,
597                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
598                        );
599
600                        // Insert the chunk as a gap.
601                        insert_chunk(
602                            txn,
603                            &hashed_linked_chunk_id,
604                            previous,
605                            new,
606                            next,
607                            CHUNK_TYPE_GAP_TYPE_STRING,
608                        )?;
609
610                        // Insert the gap's value.
611                        txn.execute(
612                            r#"
613                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
614                            VALUES (?, ?, ?)
615                        "#,
616                            (new, &hashed_linked_chunk_id, prev_token),
617                        )?;
618                    }
619
620                    Update::RemoveChunk(chunk_identifier) => {
621                        let chunk_id = chunk_identifier.index();
622
623                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
624
625                        // Find chunk to delete.
626                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
627                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
628                            (chunk_id, &hashed_linked_chunk_id),
629                            |row| Ok((row.get(0)?, row.get(1)?))
630                        )?;
631
632                        // Replace its previous' next to its own next.
633                        if let Some(previous) = previous {
634                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
635                        }
636
637                        // Replace its next' previous to its own previous.
638                        if let Some(next) = next {
639                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
640                        }
641
642                        // Now delete it, and let cascading delete corresponding entries in the
643                        // other data tables.
644                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
645                    }
646
647                    Update::PushItems { at, items } => {
648                        if items.is_empty() {
649                            // Should never happens, but better be safe.
650                            continue;
651                        }
652
653                        let chunk_id = at.chunk_identifier().index();
654
655                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
656
657                        let mut chunk_statement = txn.prepare(
658                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
659                        )?;
660
661                        // Note: we use `OR REPLACE` here, because the event might have been
662                        // already inserted in the database. This is the case when an event is
663                        // deduplicated and moved to another position; or because it was inserted
664                        // outside the context of a linked chunk (e.g. pinned event).
665                        let mut content_statement = txn.prepare(
666                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
667                        )?;
668
669                        let invalid_event = |event: TimelineEvent| {
670                            let Some(event_id) = event.event_id() else {
671                                error!(%linked_chunk_id, "Trying to push an event with no ID");
672                                return None;
673                            };
674
675                            let Some(event_type) = event.kind.event_type() else {
676                                error!(%event_id, "Trying to save an event with no event type");
677                                return None;
678                            };
679
680                            Some((event_id.to_string(), event_type, event))
681                        };
682
683                        let room_id = linked_chunk_id.room_id();
684                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
685
686                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
687                            // Insert the location information into the database.
688                            let index = at.index() + i;
689                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
690
691                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
692                            let event_type = this.encode_key(keys::EVENTS, event_type);
693
694                            // Now, insert the event content into the database.
695                            let encoded_event = this.encode_event(&event)?;
696                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
697                        }
698                    }
699
700                    Update::ReplaceItem { at, item: event } => {
701                        let chunk_id = at.chunk_identifier().index();
702
703                        let index = at.index();
704
705                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
706
707                        // The event id should be the same, but just in case it changed…
708                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
709                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
710                            continue;
711                        };
712
713                        let Some(event_type) = event.kind.event_type() else {
714                            error!(%event_id, "Trying to save an event with no event type");
715                            continue;
716                        };
717
718                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
719                        let event_type = this.encode_key(keys::EVENTS, event_type);
720
721                        // Replace the event's content. Really we'd like to update, but in case the
722                        // event id changed, we are a bit lenient here and will allow an insertion
723                        // of the new event.
724                        let encoded_event = this.encode_event(&event)?;
725                        let room_id = linked_chunk_id.room_id();
726                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
727                        txn.execute(
728                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
729                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
730
731                        // Replace the event id in the linked chunk, in case it changed.
732                        txn.execute(
733                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
734                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
735                        )?;
736                    }
737
738                    Update::RemoveItem { at } => {
739                        let chunk_id = at.chunk_identifier().index();
740                        let index = at.index();
741
742                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
743
744                        // Remove the entry in the chunk table.
745                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
746
747                        // Decrement the index of each item after the one we are
748                        // going to remove.
749                        //
750                        // Imagine we have the following events:
751                        //
752                        // | event_id | linked_chunk_id | chunk_id | position |
753                        // |----------|-----------------|----------|----------|
754                        // | $ev0     | !r0             | 42       | 0        |
755                        // | $ev1     | !r0             | 42       | 1        |
756                        // | $ev2     | !r0             | 42       | 2        |
757                        // | $ev3     | !r0             | 42       | 3        |
758                        // | $ev4     | !r0             | 42       | 4        |
759                        //
760                        // `$ev2` has been removed, then we end up in this
761                        // state:
762                        //
763                        // | event_id | linked_chunk_id    | chunk_id | position |
764                        // |----------|--------------------|----------|----------|
765                        // | $ev0     | !r0                | 42       | 0        |
766                        // | $ev1     | !r0                | 42       | 1        |
767                        // |          |                    |          |          | <- no more `$ev2`
768                        // | $ev3     | !r0                | 42       | 3        |
769                        // | $ev4     | !r0                | 42       | 4        |
770                        //
771                        // We need to shift the `position` of `$ev3` and `$ev4`
772                        // to `position - 1`, like so:
773                        //
774                        // | event_id | linked_chunk_id | chunk_id | position |
775                        // |----------|-----------------|----------|----------|
776                        // | $ev0     | !r0             | 42       | 0        |
777                        // | $ev1     | !r0             | 42       | 1        |
778                        // | $ev3     | !r0             | 42       | 2        |
779                        // | $ev4     | !r0             | 42       | 3        |
780                        //
781                        // Usually, it boils down to run the following query:
782                        //
783                        // ```sql
784                        // UPDATE event_chunks
785                        // SET position = position - 1
786                        // WHERE position > 2 AND …
787                        // ```
788                        //
789                        // Okay. But `UPDATE` runs on rows in no particular
790                        // order. It means that it can update `$ev4` before
791                        // `$ev3` for example. What happens in this particular
792                        // case? The `position` of `$ev4` becomes `3`, however
793                        // `$ev3` already has `position = 3`. Because there
794                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
795                        // position)`, it will result in a constraint violation.
796                        //
797                        // There is **no way** to control the execution order of
798                        // `UPDATE` in SQLite. To persuade yourself, try:
799                        //
800                        // ```sql
801                        // UPDATE event_chunks
802                        // SET position = position - 1
803                        // FROM (
804                        //     SELECT event_id
805                        //     FROM event_chunks
806                        //     WHERE position > 2 AND …
807                        //     ORDER BY position ASC
808                        // ) as ordered
809                        // WHERE event_chunks.event_id = ordered.event_id
810                        // ```
811                        //
812                        // It will fail the same way.
813                        //
814                        // Thus, we have 2 solutions:
815                        //
816                        // 1. Remove the `UNIQUE` constraint,
817                        // 2. Be creative.
818                        //
819                        // The `UNIQUE` constraint is a safe belt. Normally, we
820                        // have `event_cache::Deduplicator` that is responsible
821                        // to ensure there is no duplicated event. However,
822                        // relying on this is “fragile” in the sense it can
823                        // contain bugs. Relying on the `UNIQUE` constraint from
824                        // SQLite is more robust. It's “braces and belt” as we
825                        // say here.
826                        //
827                        // So. We need to be creative.
828                        //
829                        // Many solutions exist. Amongst the most popular, we
830                        // see _dropping and re-creating the index_, which is
831                        // no-go for us, it's too expensive. I (@hywan) have
832                        // adopted the following one:
833                        //
834                        // - Do `position = position - 1` but in the negative
835                        //   space, so `position = -(position - 1)`. A position
836                        //   cannot be negative; we are sure it is unique!
837                        // - Once all candidate rows are updated, do `position =
838                        //   -position` to move back to the positive space.
839                        //
840                        // 'told you it's gonna be creative.
841                        //
842                        // This solution is a hack, **but** it is a small
843                        // number of operations, and we can keep the `UNIQUE`
844                        // constraint in place.
845                        txn.execute(
846                            r#"
847                                UPDATE event_chunks
848                                SET position = -(position - 1)
849                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
850                            "#,
851                            (&hashed_linked_chunk_id, chunk_id, index)
852                        )?;
853                        txn.execute(
854                            r#"
855                                UPDATE event_chunks
856                                SET position = -position
857                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
858                            "#,
859                            (&hashed_linked_chunk_id, chunk_id)
860                        )?;
861                    }
862
863                    Update::DetachLastItems { at } => {
864                        let chunk_id = at.chunk_identifier().index();
865                        let index = at.index();
866
867                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
868
869                        // Remove these entries.
870                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
871                    }
872
873                    Update::Clear => {
874                        trace!(%linked_chunk_id, "clearing items");
875
876                        // Remove chunks, and let cascading do its job.
877                        txn.execute(
878                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
879                            (&hashed_linked_chunk_id,),
880                        )?;
881                    }
882
883                    Update::StartReattachItems | Update::EndReattachItems => {
884                        // Nothing.
885                    }
886                }
887            }
888
889            Ok(())
890        })
891        .await?;
892
893        Ok(())
894    }
895
896    #[instrument(skip(self))]
897    async fn load_all_chunks(
898        &self,
899        linked_chunk_id: LinkedChunkId<'_>,
900    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
901        let _timer = timer!("method");
902
903        let hashed_linked_chunk_id =
904            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
905
906        let this = self.clone();
907
908        let result = self
909            .read()
910            .await?
911            .with_transaction(move |txn| -> Result<_> {
912                let mut items = Vec::new();
913
914                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
915                for data in txn
916                    .prepare(
917                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
918                    )?
919                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
920                {
921                    let (id, previous, next, chunk_type) = data?;
922                    let new = txn.rebuild_chunk(
923                        &this,
924                        &hashed_linked_chunk_id,
925                        previous,
926                        id,
927                        next,
928                        chunk_type.as_str(),
929                    )?;
930                    items.push(new);
931                }
932
933                Ok(items)
934            })
935            .await?;
936
937        Ok(result)
938    }
939
940    #[instrument(skip(self))]
941    async fn load_all_chunks_metadata(
942        &self,
943        linked_chunk_id: LinkedChunkId<'_>,
944    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
945        let _timer = timer!("method");
946
947        let hashed_linked_chunk_id =
948            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
949
950        self.read()
951            .await?
952            .with_transaction(move |txn| -> Result<_> {
953                // We want to collect the metadata about each chunk (id, next, previous), and
954                // for event chunks, the number of events in it. For gaps, the
955                // number of events is 0, by convention.
956                //
957                // We've tried different strategies over time:
958                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
959                //   caused a full table traversal for each chunk, including for gaps which
960                //   don't have any events. This happened in
961                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
962                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
963                //   additional `SELECT` query. It was an immense improvement, but still caused
964                //   one select query per event chunk. This happened in
965                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
966                //
967                // The current solution is to run two queries:
968                // - one to get each chunk and its number of events, by doing a single `SELECT`
969                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
970                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
971                //   hashmap.
972                // - one to get each chunk's metadata (id, previous, next, type) from the
973                //   database with a `SELECT`, and then use the hashmap to get the number of
974                //   events.
975                //
976                // This strategy minimizes the number of queries to the database, and keeps them
977                // super simple, while doing a bit more processing here, which is much faster.
978
979                let num_events_by_chunk_ids = txn
980                    .prepare(
981                        r#"
982                            SELECT ec.chunk_id, COUNT(ec.event_id)
983                            FROM event_chunks as ec
984                            WHERE ec.linked_chunk_id = ?
985                            GROUP BY ec.chunk_id
986                        "#,
987                    )?
988                    .query_map((&hashed_linked_chunk_id,), |row| {
989                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
990                    })?
991                    .collect::<Result<HashMap<_, _>, _>>()?;
992
993                txn.prepare(
994                    r#"
995                        SELECT
996                            lc.id,
997                            lc.previous,
998                            lc.next,
999                            lc.type
1000                        FROM linked_chunks as lc
1001                        WHERE lc.linked_chunk_id = ?
1002                        ORDER BY lc.id"#,
1003                )?
1004                .query_map((&hashed_linked_chunk_id,), |row| {
1005                    Ok((
1006                        row.get::<_, u64>(0)?,
1007                        row.get::<_, Option<u64>>(1)?,
1008                        row.get::<_, Option<u64>>(2)?,
1009                        row.get::<_, String>(3)?,
1010                    ))
1011                })?
1012                .map(|data| -> Result<_> {
1013                    let (id, previous, next, chunk_type) = data?;
1014
1015                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
1016                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
1017                    // benchmarking shows that this is slightly slower than matching the chunk
1018                    // type (around 1%, so in the realm of noise), so we keep the explicit
1019                    // check instead.
1020                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1021                        0
1022                    } else {
1023                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1024                    };
1025
1026                    Ok(ChunkMetadata {
1027                        identifier: ChunkIdentifier::new(id),
1028                        previous: previous.map(ChunkIdentifier::new),
1029                        next: next.map(ChunkIdentifier::new),
1030                        num_items,
1031                    })
1032                })
1033                .collect::<Result<Vec<_>, _>>()
1034            })
1035            .await
1036    }
1037
1038    #[instrument(skip(self))]
1039    async fn load_last_chunk(
1040        &self,
1041        linked_chunk_id: LinkedChunkId<'_>,
1042    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1043        let _timer = timer!("method");
1044
1045        let hashed_linked_chunk_id =
1046            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1047
1048        let this = self.clone();
1049
1050        self
1051            .read()
1052            .await?
1053            .with_transaction(move |txn| -> Result<_> {
1054                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1055                let (observed_max_identifier, number_of_chunks) = txn
1056                    .prepare(
1057                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1058                    )?
1059                    .query_row(
1060                        (&hashed_linked_chunk_id,),
1061                        |row| {
1062                            Ok((
1063                                // Read the `MAX(id)` as an `Option<u64>` instead
1064                                // of `u64` in case the `SELECT` returns nothing.
1065                                // Indeed, if it returns no line, the `MAX(id)` is
1066                                // set to `Null`.
1067                                row.get::<_, Option<u64>>(0)?,
1068                                row.get::<_, u64>(1)?,
1069                            ))
1070                        }
1071                    )?;
1072
1073                let chunk_identifier_generator = match observed_max_identifier {
1074                    Some(max_observed_identifier) => {
1075                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1076                            ChunkIdentifier::new(max_observed_identifier)
1077                        )
1078                    },
1079                    None => ChunkIdentifierGenerator::new_from_scratch(),
1080                };
1081
1082                // Find the last chunk.
1083                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1084                    .prepare(
1085                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1086                    )?
1087                    .query_row(
1088                        (&hashed_linked_chunk_id,),
1089                        |row| {
1090                            Ok((
1091                                row.get::<_, u64>(0)?,
1092                                row.get::<_, Option<u64>>(1)?,
1093                                row.get::<_, String>(2)?,
1094                            ))
1095                        }
1096                    )
1097                    .optional()?
1098                else {
1099                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1100                    // good.
1101                    if number_of_chunks == 0 {
1102                        return Ok((None, chunk_identifier_generator));
1103                    }
1104                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1105                    // linked chunk is malformed.
1106                    //
1107                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1108                    else {
1109                        return Err(Error::InvalidData {
1110                            details:
1111                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1112                                    .to_owned()
1113                            }
1114                        )
1115                    }
1116                };
1117
1118                // Build the chunk.
1119                let last_chunk = txn.rebuild_chunk(
1120                    &this,
1121                    &hashed_linked_chunk_id,
1122                    previous_chunk,
1123                    chunk_identifier,
1124                    None,
1125                    &chunk_type
1126                )?;
1127
1128                Ok((Some(last_chunk), chunk_identifier_generator))
1129            })
1130            .await
1131    }
1132
1133    #[instrument(skip(self))]
1134    async fn load_previous_chunk(
1135        &self,
1136        linked_chunk_id: LinkedChunkId<'_>,
1137        before_chunk_identifier: ChunkIdentifier,
1138    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1139        let _timer = timer!("method");
1140
1141        let hashed_linked_chunk_id =
1142            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1143
1144        let this = self.clone();
1145
1146        self
1147            .read()
1148            .await?
1149            .with_transaction(move |txn| -> Result<_> {
1150                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1151                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1152                    .prepare(
1153                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1154                    )?
1155                    .query_row(
1156                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1157                        |row| {
1158                            Ok((
1159                                row.get::<_, u64>(0)?,
1160                                row.get::<_, Option<u64>>(1)?,
1161                                row.get::<_, Option<u64>>(2)?,
1162                                row.get::<_, String>(3)?,
1163                            ))
1164                        }
1165                    )
1166                    .optional()?
1167                else {
1168                    // Chunk is not found.
1169                    return Ok(None);
1170                };
1171
1172                // Build the chunk.
1173                let last_chunk = txn.rebuild_chunk(
1174                    &this,
1175                    &hashed_linked_chunk_id,
1176                    previous_chunk,
1177                    chunk_identifier,
1178                    next_chunk,
1179                    &chunk_type
1180                )?;
1181
1182                Ok(Some(last_chunk))
1183            })
1184            .await
1185    }
1186
1187    #[instrument(skip(self))]
1188    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1189        let _timer = timer!("method");
1190
1191        self.write()
1192            .await?
1193            .with_transaction(move |txn| {
1194                // Remove all the chunks, and let cascading do its job.
1195                txn.execute("DELETE FROM linked_chunks", ())?;
1196                // Also clear all the events' contents.
1197                txn.execute("DELETE FROM events", ())
1198            })
1199            .await?;
1200
1201        Ok(())
1202    }
1203
1204    #[instrument(skip(self, events))]
1205    async fn filter_duplicated_events(
1206        &self,
1207        linked_chunk_id: LinkedChunkId<'_>,
1208        events: Vec<OwnedEventId>,
1209    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1210        let _timer = timer!("method");
1211
1212        // If there's no events for which we want to check duplicates, we can return
1213        // early. It's not only an optimization to do so: it's required, otherwise the
1214        // `repeat_vars` call below will panic.
1215        if events.is_empty() {
1216            return Ok(Vec::new());
1217        }
1218
1219        // Select all events that exist in the store, i.e. the duplicates.
1220        let hashed_linked_chunk_id =
1221            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1222        let linked_chunk_id = linked_chunk_id.to_owned();
1223
1224        self.read()
1225            .await?
1226            .with_transaction(move |txn| -> Result<_> {
1227                txn.chunk_large_query_over(events, None, move |txn, events| {
1228                    let query = format!(
1229                        r#"
1230                            SELECT event_id, chunk_id, position
1231                            FROM event_chunks
1232                            WHERE linked_chunk_id = ? AND event_id IN ({})
1233                            ORDER BY chunk_id ASC, position ASC
1234                        "#,
1235                        repeat_vars(events.len()),
1236                    );
1237
1238                    let parameters = params_from_iter(
1239                        // parameter for `linked_chunk_id = ?`
1240                        once(
1241                            hashed_linked_chunk_id
1242                                .to_sql()
1243                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1244                                .unwrap(),
1245                        )
1246                        // parameters for `event_id IN (…)`
1247                        .chain(events.iter().map(|event| {
1248                            event
1249                                .as_str()
1250                                .to_sql()
1251                                // SAFETY: it cannot fail since `str::to_sql` never fails
1252                                .unwrap()
1253                        })),
1254                    );
1255
1256                    let mut duplicated_events = Vec::new();
1257
1258                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1259                        Ok((
1260                            row.get::<_, String>(0)?,
1261                            row.get::<_, u64>(1)?,
1262                            row.get::<_, usize>(2)?,
1263                        ))
1264                    })? {
1265                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1266
1267                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1268                            // Normally unreachable, but the event ID has been stored even if it is
1269                            // malformed, let's skip it.
1270                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1271                            continue;
1272                        };
1273
1274                        duplicated_events.push((
1275                            duplicated_event,
1276                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1277                        ));
1278                    }
1279
1280                    Ok(duplicated_events)
1281                })
1282            })
1283            .await
1284    }
1285
1286    #[instrument(skip(self, event_id))]
1287    async fn find_event(
1288        &self,
1289        room_id: &RoomId,
1290        event_id: &EventId,
1291    ) -> Result<Option<Event>, Self::Error> {
1292        let _timer = timer!("method");
1293
1294        let event_id = event_id.to_owned();
1295        let this = self.clone();
1296
1297        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1298
1299        self.read()
1300            .await?
1301            .with_transaction(move |txn| -> Result<_> {
1302                let Some(event) = txn
1303                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1304                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1305                    .optional()?
1306                else {
1307                    // Event is not found.
1308                    return Ok(None);
1309                };
1310
1311                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1312
1313                Ok(Some(event))
1314            })
1315            .await
1316    }
1317
1318    #[instrument(skip(self, event_id, filters))]
1319    async fn find_event_relations(
1320        &self,
1321        room_id: &RoomId,
1322        event_id: &EventId,
1323        filters: Option<&[RelationType]>,
1324    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1325        let _timer = timer!("method");
1326
1327        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1328
1329        let hashed_linked_chunk_id =
1330            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1331
1332        let event_id = event_id.to_owned();
1333        let filters = filters.map(ToOwned::to_owned);
1334        let store = self.clone();
1335
1336        self.read()
1337            .await?
1338            .with_transaction(move |txn| -> Result<_> {
1339                find_event_relations_transaction(
1340                    store,
1341                    hashed_room_id,
1342                    hashed_linked_chunk_id,
1343                    event_id,
1344                    filters,
1345                    txn,
1346                )
1347            })
1348            .await
1349    }
1350
1351    #[instrument(skip(self))]
1352    async fn get_room_events(
1353        &self,
1354        room_id: &RoomId,
1355        event_type: Option<&str>,
1356        session_id: Option<&str>,
1357    ) -> Result<Vec<Event>, Self::Error> {
1358        let _timer = timer!("method");
1359
1360        let this = self.clone();
1361
1362        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1363        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1364        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1365
1366        self.read()
1367            .await?
1368            .with_transaction(move |txn| -> Result<_> {
1369                // I'm not sure why clippy claims that the clones aren't required. The compiler
1370                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1371                // much so let's silence things.
1372                #[allow(clippy::redundant_clone)]
1373                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1374                    (None, None) => {
1375                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1376                    }
1377                    (None, Some(session_id)) => (
1378                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1379                        params![hashed_room_id, session_id.to_owned()],
1380                    ),
1381                    (Some(event_type), None) => (
1382                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1383                        params![hashed_room_id, event_type.to_owned()]
1384                    ),
1385                    (Some(event_type), Some(session_id)) => (
1386                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1387                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1388                    ),
1389                };
1390
1391                let mut statement = txn.prepare(query)?;
1392                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1393
1394                let mut events = Vec::new();
1395                for ev in maybe_events {
1396                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1397                    events.push(event);
1398                }
1399
1400                Ok(events)
1401            })
1402            .await
1403    }
1404
1405    #[instrument(skip(self, event))]
1406    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1407        let _timer = timer!("method");
1408
1409        let Some(event_id) = event.event_id() else {
1410            error!("Trying to save an event with no ID");
1411            return Ok(());
1412        };
1413
1414        let Some(event_type) = event.kind.event_type() else {
1415            error!(%event_id, "Trying to save an event with no event type");
1416            return Ok(());
1417        };
1418
1419        let event_type = self.encode_key(keys::EVENTS, event_type);
1420        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1421
1422        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1423        let event_id = event_id.to_string();
1424        let encoded_event = self.encode_event(&event)?;
1425
1426        self.write()
1427            .await?
1428            .with_transaction(move |txn| -> Result<_> {
1429                txn.execute(
1430                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1431                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1432
1433                Ok(())
1434            })
1435            .await
1436    }
1437}
1438
1439fn find_event_relations_transaction(
1440    store: SqliteEventCacheStore,
1441    hashed_room_id: Key,
1442    hashed_linked_chunk_id: Key,
1443    event_id: OwnedEventId,
1444    filters: Option<Vec<RelationType>>,
1445    txn: &Transaction<'_>,
1446) -> Result<Vec<(Event, Option<Position>)>> {
1447    let get_rows = |row: &rusqlite::Row<'_>| {
1448        Ok((
1449            row.get::<_, Vec<u8>>(0)?,
1450            row.get::<_, Option<u64>>(1)?,
1451            row.get::<_, Option<usize>>(2)?,
1452        ))
1453    };
1454
1455    // Collect related events.
1456    let collect_results = |transaction| {
1457        let mut related = Vec::new();
1458
1459        for result in transaction {
1460            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1461
1462            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1463
1464            // Only build the position if both the chunk_id and position were present; in
1465            // theory, they should either be present at the same time, or not at all.
1466            let pos = chunk_id
1467                .zip(index)
1468                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1469
1470            related.push((event, pos));
1471        }
1472
1473        Ok(related)
1474    };
1475
1476    let related = if let Some(filters) = filters {
1477        let question_marks = repeat_vars(filters.len());
1478        let query = format!(
1479            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1480            FROM events
1481            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1482            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1483        );
1484
1485        // First the filters need to be stringified; because `.to_sql()` will borrow
1486        // from them, they also need to be stringified onto the stack, so as to
1487        // get a stable address (to avoid returning a temporary reference in the
1488        // map closure below).
1489        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1490        let filters_params: Vec<_> = filter_strings
1491            .iter()
1492            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1493            .collect();
1494
1495        let parameters = params_from_iter(
1496            [
1497                hashed_linked_chunk_id.to_sql().expect(
1498                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1499                ),
1500                event_id
1501                    .as_str()
1502                    .to_sql()
1503                    .expect("We should be able to convert an event ID to a SQLite value"),
1504                hashed_room_id
1505                    .to_sql()
1506                    .expect("We should be able to convert a room ID to a SQLite value"),
1507            ]
1508            .into_iter()
1509            .chain(filters_params),
1510        );
1511
1512        let mut transaction = txn.prepare(&query)?;
1513        let transaction = transaction.query_map(parameters, get_rows)?;
1514
1515        collect_results(transaction)
1516    } else {
1517        let query =
1518            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1519            FROM events
1520            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1521            WHERE relates_to = ? AND room_id = ?";
1522        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1523
1524        let mut transaction = txn.prepare(query)?;
1525        let transaction = transaction.query_map(parameters, get_rows)?;
1526
1527        collect_results(transaction)
1528    };
1529
1530    related
1531}
1532
1533/// Like `deadpool::managed::Object::with_transaction`, but starts the
1534/// transaction in immediate (write) mode from the beginning, precluding errors
1535/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1536/// both reads and writes, and start with a write.
1537async fn with_immediate_transaction<
1538    T: Send + 'static,
1539    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1540>(
1541    this: &SqliteEventCacheStore,
1542    f: F,
1543) -> Result<T, Error> {
1544    this.write()
1545        .await?
1546        .interact(move |conn| -> Result<T, Error> {
1547            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1548            // to avoid read transactions upgrading to write mode and causing
1549            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1550            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1551
1552            let code = || -> Result<T, Error> {
1553                let txn = conn.transaction()?;
1554                let res = f(&txn)?;
1555                txn.commit()?;
1556                Ok(res)
1557            };
1558
1559            let res = code();
1560
1561            // Reset the transaction behavior to use Deferred, after this transaction has
1562            // been run, whether it was successful or not.
1563            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1564
1565            res
1566        })
1567        .await
1568        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1569        .unwrap()
1570}
1571
1572fn insert_chunk(
1573    txn: &Transaction<'_>,
1574    linked_chunk_id: &Key,
1575    previous: Option<u64>,
1576    new: u64,
1577    next: Option<u64>,
1578    type_str: &str,
1579) -> rusqlite::Result<()> {
1580    // First, insert the new chunk.
1581    txn.execute(
1582        r#"
1583            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1584            VALUES (?, ?, ?, ?, ?)
1585        "#,
1586        (new, linked_chunk_id, previous, next, type_str),
1587    )?;
1588
1589    // If this chunk has a previous one, update its `next` field.
1590    if let Some(previous) = previous {
1591        txn.execute(
1592            r#"
1593                UPDATE linked_chunks
1594                SET next = ?
1595                WHERE id = ? AND linked_chunk_id = ?
1596            "#,
1597            (new, previous, linked_chunk_id),
1598        )?;
1599    }
1600
1601    // If this chunk has a next one, update its `previous` field.
1602    if let Some(next) = next {
1603        txn.execute(
1604            r#"
1605                UPDATE linked_chunks
1606                SET previous = ?
1607                WHERE id = ? AND linked_chunk_id = ?
1608            "#,
1609            (new, next, linked_chunk_id),
1610        )?;
1611    }
1612
1613    Ok(())
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618    use std::{
1619        path::PathBuf,
1620        sync::atomic::{AtomicU32, Ordering::SeqCst},
1621    };
1622
1623    use assert_matches::assert_matches;
1624    use matrix_sdk_base::{
1625        event_cache::{
1626            store::{
1627                integration_tests::{
1628                    check_test_event, make_test_event, make_test_event_with_event_id,
1629                },
1630                EventCacheStore, EventCacheStoreError,
1631            },
1632            Gap,
1633        },
1634        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1635        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1636    };
1637    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1638    use once_cell::sync::Lazy;
1639    use ruma::{event_id, room_id};
1640    use tempfile::{tempdir, TempDir};
1641
1642    use super::SqliteEventCacheStore;
1643    use crate::{
1644        event_cache_store::keys,
1645        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1646        SqliteStoreConfig,
1647    };
1648
1649    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1650    static NUM: AtomicU32 = AtomicU32::new(0);
1651
1652    fn new_event_cache_store_workspace() -> PathBuf {
1653        let name = NUM.fetch_add(1, SeqCst).to_string();
1654        TMP_DIR.path().join(name)
1655    }
1656
1657    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1658        let tmpdir_path = new_event_cache_store_workspace();
1659
1660        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1661
1662        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1663    }
1664
1665    event_cache_store_integration_tests!();
1666    event_cache_store_integration_tests_time!();
1667
1668    #[async_test]
1669    async fn test_pool_size() {
1670        let tmpdir_path = new_event_cache_store_workspace();
1671        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1672
1673        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1674
1675        assert_eq!(store.pool.status().max_size, 42);
1676    }
1677
1678    #[async_test]
1679    async fn test_linked_chunk_new_items_chunk() {
1680        let store = get_event_cache_store().await.expect("creating cache store failed");
1681
1682        let room_id = &DEFAULT_TEST_ROOM_ID;
1683        let linked_chunk_id = LinkedChunkId::Room(room_id);
1684
1685        store
1686            .handle_linked_chunk_updates(
1687                linked_chunk_id,
1688                vec![
1689                    Update::NewItemsChunk {
1690                        previous: None,
1691                        new: ChunkIdentifier::new(42),
1692                        next: None, // Note: the store must link the next entry itself.
1693                    },
1694                    Update::NewItemsChunk {
1695                        previous: Some(ChunkIdentifier::new(42)),
1696                        new: ChunkIdentifier::new(13),
1697                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1698                                                               * the next link ahead of time. */
1699                    },
1700                    Update::NewItemsChunk {
1701                        previous: Some(ChunkIdentifier::new(13)),
1702                        new: ChunkIdentifier::new(37),
1703                        next: None,
1704                    },
1705                ],
1706            )
1707            .await
1708            .unwrap();
1709
1710        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1711
1712        assert_eq!(chunks.len(), 3);
1713
1714        {
1715            // Chunks are ordered from smaller to bigger IDs.
1716            let c = chunks.remove(0);
1717            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1718            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1719            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1720            assert_matches!(c.content, ChunkContent::Items(events) => {
1721                assert!(events.is_empty());
1722            });
1723
1724            let c = chunks.remove(0);
1725            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1726            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1727            assert_eq!(c.next, None);
1728            assert_matches!(c.content, ChunkContent::Items(events) => {
1729                assert!(events.is_empty());
1730            });
1731
1732            let c = chunks.remove(0);
1733            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1734            assert_eq!(c.previous, None);
1735            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1736            assert_matches!(c.content, ChunkContent::Items(events) => {
1737                assert!(events.is_empty());
1738            });
1739        }
1740    }
1741
1742    #[async_test]
1743    async fn test_linked_chunk_new_gap_chunk() {
1744        let store = get_event_cache_store().await.expect("creating cache store failed");
1745
1746        let room_id = &DEFAULT_TEST_ROOM_ID;
1747        let linked_chunk_id = LinkedChunkId::Room(room_id);
1748
1749        store
1750            .handle_linked_chunk_updates(
1751                linked_chunk_id,
1752                vec![Update::NewGapChunk {
1753                    previous: None,
1754                    new: ChunkIdentifier::new(42),
1755                    next: None,
1756                    gap: Gap { prev_token: "raclette".to_owned() },
1757                }],
1758            )
1759            .await
1760            .unwrap();
1761
1762        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1763
1764        assert_eq!(chunks.len(), 1);
1765
1766        // Chunks are ordered from smaller to bigger IDs.
1767        let c = chunks.remove(0);
1768        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1769        assert_eq!(c.previous, None);
1770        assert_eq!(c.next, None);
1771        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1772            assert_eq!(gap.prev_token, "raclette");
1773        });
1774    }
1775
1776    #[async_test]
1777    async fn test_linked_chunk_replace_item() {
1778        let store = get_event_cache_store().await.expect("creating cache store failed");
1779
1780        let room_id = &DEFAULT_TEST_ROOM_ID;
1781        let linked_chunk_id = LinkedChunkId::Room(room_id);
1782        let event_id = event_id!("$world");
1783
1784        store
1785            .handle_linked_chunk_updates(
1786                linked_chunk_id,
1787                vec![
1788                    Update::NewItemsChunk {
1789                        previous: None,
1790                        new: ChunkIdentifier::new(42),
1791                        next: None,
1792                    },
1793                    Update::PushItems {
1794                        at: Position::new(ChunkIdentifier::new(42), 0),
1795                        items: vec![
1796                            make_test_event(room_id, "hello"),
1797                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1798                        ],
1799                    },
1800                    Update::ReplaceItem {
1801                        at: Position::new(ChunkIdentifier::new(42), 1),
1802                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1803                    },
1804                ],
1805            )
1806            .await
1807            .unwrap();
1808
1809        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1810
1811        assert_eq!(chunks.len(), 1);
1812
1813        let c = chunks.remove(0);
1814        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1815        assert_eq!(c.previous, None);
1816        assert_eq!(c.next, None);
1817        assert_matches!(c.content, ChunkContent::Items(events) => {
1818            assert_eq!(events.len(), 2);
1819            check_test_event(&events[0], "hello");
1820            check_test_event(&events[1], "yolo");
1821        });
1822    }
1823
1824    #[async_test]
1825    async fn test_linked_chunk_remove_chunk() {
1826        let store = get_event_cache_store().await.expect("creating cache store failed");
1827
1828        let room_id = &DEFAULT_TEST_ROOM_ID;
1829        let linked_chunk_id = LinkedChunkId::Room(room_id);
1830
1831        store
1832            .handle_linked_chunk_updates(
1833                linked_chunk_id,
1834                vec![
1835                    Update::NewGapChunk {
1836                        previous: None,
1837                        new: ChunkIdentifier::new(42),
1838                        next: None,
1839                        gap: Gap { prev_token: "raclette".to_owned() },
1840                    },
1841                    Update::NewGapChunk {
1842                        previous: Some(ChunkIdentifier::new(42)),
1843                        new: ChunkIdentifier::new(43),
1844                        next: None,
1845                        gap: Gap { prev_token: "fondue".to_owned() },
1846                    },
1847                    Update::NewGapChunk {
1848                        previous: Some(ChunkIdentifier::new(43)),
1849                        new: ChunkIdentifier::new(44),
1850                        next: None,
1851                        gap: Gap { prev_token: "tartiflette".to_owned() },
1852                    },
1853                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1854                ],
1855            )
1856            .await
1857            .unwrap();
1858
1859        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1860
1861        assert_eq!(chunks.len(), 2);
1862
1863        // Chunks are ordered from smaller to bigger IDs.
1864        let c = chunks.remove(0);
1865        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1866        assert_eq!(c.previous, None);
1867        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1868        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1869            assert_eq!(gap.prev_token, "raclette");
1870        });
1871
1872        let c = chunks.remove(0);
1873        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1874        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1875        assert_eq!(c.next, None);
1876        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1877            assert_eq!(gap.prev_token, "tartiflette");
1878        });
1879
1880        // Check that cascading worked. Yes, SQLite, I doubt you.
1881        let gaps = store
1882            .read()
1883            .await
1884            .unwrap()
1885            .with_transaction(|txn| -> rusqlite::Result<_> {
1886                let mut gaps = Vec::new();
1887                for data in txn
1888                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1889                    .query_map((), |row| row.get::<_, u64>(0))?
1890                {
1891                    gaps.push(data?);
1892                }
1893                Ok(gaps)
1894            })
1895            .await
1896            .unwrap();
1897
1898        assert_eq!(gaps, vec![42, 44]);
1899    }
1900
1901    #[async_test]
1902    async fn test_linked_chunk_push_items() {
1903        let store = get_event_cache_store().await.expect("creating cache store failed");
1904
1905        let room_id = &DEFAULT_TEST_ROOM_ID;
1906        let linked_chunk_id = LinkedChunkId::Room(room_id);
1907
1908        store
1909            .handle_linked_chunk_updates(
1910                linked_chunk_id,
1911                vec![
1912                    Update::NewItemsChunk {
1913                        previous: None,
1914                        new: ChunkIdentifier::new(42),
1915                        next: None,
1916                    },
1917                    Update::PushItems {
1918                        at: Position::new(ChunkIdentifier::new(42), 0),
1919                        items: vec![
1920                            make_test_event(room_id, "hello"),
1921                            make_test_event(room_id, "world"),
1922                        ],
1923                    },
1924                    Update::PushItems {
1925                        at: Position::new(ChunkIdentifier::new(42), 2),
1926                        items: vec![make_test_event(room_id, "who?")],
1927                    },
1928                ],
1929            )
1930            .await
1931            .unwrap();
1932
1933        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1934
1935        assert_eq!(chunks.len(), 1);
1936
1937        let c = chunks.remove(0);
1938        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1939        assert_eq!(c.previous, None);
1940        assert_eq!(c.next, None);
1941        assert_matches!(c.content, ChunkContent::Items(events) => {
1942            assert_eq!(events.len(), 3);
1943
1944            check_test_event(&events[0], "hello");
1945            check_test_event(&events[1], "world");
1946            check_test_event(&events[2], "who?");
1947        });
1948    }
1949
1950    #[async_test]
1951    async fn test_linked_chunk_remove_item() {
1952        let store = get_event_cache_store().await.expect("creating cache store failed");
1953
1954        let room_id = *DEFAULT_TEST_ROOM_ID;
1955        let linked_chunk_id = LinkedChunkId::Room(room_id);
1956
1957        store
1958            .handle_linked_chunk_updates(
1959                linked_chunk_id,
1960                vec![
1961                    Update::NewItemsChunk {
1962                        previous: None,
1963                        new: ChunkIdentifier::new(42),
1964                        next: None,
1965                    },
1966                    Update::PushItems {
1967                        at: Position::new(ChunkIdentifier::new(42), 0),
1968                        items: vec![
1969                            make_test_event(room_id, "one"),
1970                            make_test_event(room_id, "two"),
1971                            make_test_event(room_id, "three"),
1972                            make_test_event(room_id, "four"),
1973                            make_test_event(room_id, "five"),
1974                            make_test_event(room_id, "six"),
1975                        ],
1976                    },
1977                    Update::RemoveItem {
1978                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
1979                    },
1980                ],
1981            )
1982            .await
1983            .unwrap();
1984
1985        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1986
1987        assert_eq!(chunks.len(), 1);
1988
1989        let c = chunks.remove(0);
1990        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1991        assert_eq!(c.previous, None);
1992        assert_eq!(c.next, None);
1993        assert_matches!(c.content, ChunkContent::Items(events) => {
1994            assert_eq!(events.len(), 5);
1995            check_test_event(&events[0], "one");
1996            check_test_event(&events[1], "two");
1997            check_test_event(&events[2], "four");
1998            check_test_event(&events[3], "five");
1999            check_test_event(&events[4], "six");
2000        });
2001
2002        // Make sure the position have been updated for the remaining events.
2003        let num_rows: u64 = store
2004            .read()
2005            .await
2006            .unwrap()
2007            .with_transaction(move |txn| {
2008                txn.query_row(
2009                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2010                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2011                    |row| row.get(0),
2012                )
2013            })
2014            .await
2015            .unwrap();
2016        assert_eq!(num_rows, 3);
2017    }
2018
2019    #[async_test]
2020    async fn test_linked_chunk_detach_last_items() {
2021        let store = get_event_cache_store().await.expect("creating cache store failed");
2022
2023        let room_id = *DEFAULT_TEST_ROOM_ID;
2024        let linked_chunk_id = LinkedChunkId::Room(room_id);
2025
2026        store
2027            .handle_linked_chunk_updates(
2028                linked_chunk_id,
2029                vec![
2030                    Update::NewItemsChunk {
2031                        previous: None,
2032                        new: ChunkIdentifier::new(42),
2033                        next: None,
2034                    },
2035                    Update::PushItems {
2036                        at: Position::new(ChunkIdentifier::new(42), 0),
2037                        items: vec![
2038                            make_test_event(room_id, "hello"),
2039                            make_test_event(room_id, "world"),
2040                            make_test_event(room_id, "howdy"),
2041                        ],
2042                    },
2043                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2044                ],
2045            )
2046            .await
2047            .unwrap();
2048
2049        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2050
2051        assert_eq!(chunks.len(), 1);
2052
2053        let c = chunks.remove(0);
2054        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2055        assert_eq!(c.previous, None);
2056        assert_eq!(c.next, None);
2057        assert_matches!(c.content, ChunkContent::Items(events) => {
2058            assert_eq!(events.len(), 1);
2059            check_test_event(&events[0], "hello");
2060        });
2061    }
2062
2063    #[async_test]
2064    async fn test_linked_chunk_start_end_reattach_items() {
2065        let store = get_event_cache_store().await.expect("creating cache store failed");
2066
2067        let room_id = *DEFAULT_TEST_ROOM_ID;
2068        let linked_chunk_id = LinkedChunkId::Room(room_id);
2069
2070        // Same updates and checks as test_linked_chunk_push_items, but with extra
2071        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2072        // effects.
2073        store
2074            .handle_linked_chunk_updates(
2075                linked_chunk_id,
2076                vec![
2077                    Update::NewItemsChunk {
2078                        previous: None,
2079                        new: ChunkIdentifier::new(42),
2080                        next: None,
2081                    },
2082                    Update::PushItems {
2083                        at: Position::new(ChunkIdentifier::new(42), 0),
2084                        items: vec![
2085                            make_test_event(room_id, "hello"),
2086                            make_test_event(room_id, "world"),
2087                            make_test_event(room_id, "howdy"),
2088                        ],
2089                    },
2090                    Update::StartReattachItems,
2091                    Update::EndReattachItems,
2092                ],
2093            )
2094            .await
2095            .unwrap();
2096
2097        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2098
2099        assert_eq!(chunks.len(), 1);
2100
2101        let c = chunks.remove(0);
2102        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2103        assert_eq!(c.previous, None);
2104        assert_eq!(c.next, None);
2105        assert_matches!(c.content, ChunkContent::Items(events) => {
2106            assert_eq!(events.len(), 3);
2107            check_test_event(&events[0], "hello");
2108            check_test_event(&events[1], "world");
2109            check_test_event(&events[2], "howdy");
2110        });
2111    }
2112
2113    #[async_test]
2114    async fn test_linked_chunk_clear() {
2115        let store = get_event_cache_store().await.expect("creating cache store failed");
2116
2117        let room_id = *DEFAULT_TEST_ROOM_ID;
2118        let linked_chunk_id = LinkedChunkId::Room(room_id);
2119        let event_0 = make_test_event(room_id, "hello");
2120        let event_1 = make_test_event(room_id, "world");
2121        let event_2 = make_test_event(room_id, "howdy");
2122
2123        store
2124            .handle_linked_chunk_updates(
2125                linked_chunk_id,
2126                vec![
2127                    Update::NewItemsChunk {
2128                        previous: None,
2129                        new: ChunkIdentifier::new(42),
2130                        next: None,
2131                    },
2132                    Update::NewGapChunk {
2133                        previous: Some(ChunkIdentifier::new(42)),
2134                        new: ChunkIdentifier::new(54),
2135                        next: None,
2136                        gap: Gap { prev_token: "fondue".to_owned() },
2137                    },
2138                    Update::PushItems {
2139                        at: Position::new(ChunkIdentifier::new(42), 0),
2140                        items: vec![event_0.clone(), event_1, event_2],
2141                    },
2142                    Update::Clear,
2143                ],
2144            )
2145            .await
2146            .unwrap();
2147
2148        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2149        assert!(chunks.is_empty());
2150
2151        // Check that cascading worked. Yes, SQLite, I doubt you.
2152        store
2153            .read()
2154            .await
2155            .unwrap()
2156            .with_transaction(|txn| -> rusqlite::Result<_> {
2157                let num_gaps = txn
2158                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2159                    .query_row((), |row| row.get::<_, u64>(0))?;
2160                assert_eq!(num_gaps, 0);
2161
2162                let num_events = txn
2163                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2164                    .query_row((), |row| row.get::<_, u64>(0))?;
2165                assert_eq!(num_events, 0);
2166
2167                Ok(())
2168            })
2169            .await
2170            .unwrap();
2171
2172        // It's okay to re-insert a past event.
2173        store
2174            .handle_linked_chunk_updates(
2175                linked_chunk_id,
2176                vec![
2177                    Update::NewItemsChunk {
2178                        previous: None,
2179                        new: ChunkIdentifier::new(42),
2180                        next: None,
2181                    },
2182                    Update::PushItems {
2183                        at: Position::new(ChunkIdentifier::new(42), 0),
2184                        items: vec![event_0],
2185                    },
2186                ],
2187            )
2188            .await
2189            .unwrap();
2190    }
2191
2192    #[async_test]
2193    async fn test_linked_chunk_multiple_rooms() {
2194        let store = get_event_cache_store().await.expect("creating cache store failed");
2195
2196        let room1 = room_id!("!realcheeselovers:raclette.fr");
2197        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2198        let room2 = room_id!("!realcheeselovers:fondue.ch");
2199        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2200
2201        // Check that applying updates to one room doesn't affect the others.
2202        // Use the same chunk identifier in both rooms to battle-test search.
2203
2204        store
2205            .handle_linked_chunk_updates(
2206                linked_chunk_id1,
2207                vec![
2208                    Update::NewItemsChunk {
2209                        previous: None,
2210                        new: ChunkIdentifier::new(42),
2211                        next: None,
2212                    },
2213                    Update::PushItems {
2214                        at: Position::new(ChunkIdentifier::new(42), 0),
2215                        items: vec![
2216                            make_test_event(room1, "best cheese is raclette"),
2217                            make_test_event(room1, "obviously"),
2218                        ],
2219                    },
2220                ],
2221            )
2222            .await
2223            .unwrap();
2224
2225        store
2226            .handle_linked_chunk_updates(
2227                linked_chunk_id2,
2228                vec![
2229                    Update::NewItemsChunk {
2230                        previous: None,
2231                        new: ChunkIdentifier::new(42),
2232                        next: None,
2233                    },
2234                    Update::PushItems {
2235                        at: Position::new(ChunkIdentifier::new(42), 0),
2236                        items: vec![make_test_event(room1, "beaufort is the best")],
2237                    },
2238                ],
2239            )
2240            .await
2241            .unwrap();
2242
2243        // Check chunks from room 1.
2244        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2245        assert_eq!(chunks_room1.len(), 1);
2246
2247        let c = chunks_room1.remove(0);
2248        assert_matches!(c.content, ChunkContent::Items(events) => {
2249            assert_eq!(events.len(), 2);
2250            check_test_event(&events[0], "best cheese is raclette");
2251            check_test_event(&events[1], "obviously");
2252        });
2253
2254        // Check chunks from room 2.
2255        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2256        assert_eq!(chunks_room2.len(), 1);
2257
2258        let c = chunks_room2.remove(0);
2259        assert_matches!(c.content, ChunkContent::Items(events) => {
2260            assert_eq!(events.len(), 1);
2261            check_test_event(&events[0], "beaufort is the best");
2262        });
2263    }
2264
2265    #[async_test]
2266    async fn test_linked_chunk_update_is_a_transaction() {
2267        let store = get_event_cache_store().await.expect("creating cache store failed");
2268
2269        let room_id = *DEFAULT_TEST_ROOM_ID;
2270        let linked_chunk_id = LinkedChunkId::Room(room_id);
2271
2272        // Trigger a violation of the unique constraint on the (room id, chunk id)
2273        // couple.
2274        let err = store
2275            .handle_linked_chunk_updates(
2276                linked_chunk_id,
2277                vec![
2278                    Update::NewItemsChunk {
2279                        previous: None,
2280                        new: ChunkIdentifier::new(42),
2281                        next: None,
2282                    },
2283                    Update::NewItemsChunk {
2284                        previous: None,
2285                        new: ChunkIdentifier::new(42),
2286                        next: None,
2287                    },
2288                ],
2289            )
2290            .await
2291            .unwrap_err();
2292
2293        // The operation fails with a constraint violation error.
2294        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2295            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2296        });
2297
2298        // If the updates have been handled transactionally, then no new chunks should
2299        // have been added; failure of the second update leads to the first one being
2300        // rolled back.
2301        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2302        assert!(chunks.is_empty());
2303    }
2304
2305    #[async_test]
2306    async fn test_filter_duplicate_events_no_events() {
2307        let store = get_event_cache_store().await.expect("creating cache store failed");
2308
2309        let room_id = *DEFAULT_TEST_ROOM_ID;
2310        let linked_chunk_id = LinkedChunkId::Room(room_id);
2311        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2312        assert!(duplicates.is_empty());
2313    }
2314
2315    #[async_test]
2316    async fn test_load_last_chunk() {
2317        let room_id = room_id!("!r0:matrix.org");
2318        let linked_chunk_id = LinkedChunkId::Room(room_id);
2319        let event = |msg: &str| make_test_event(room_id, msg);
2320        let store = get_event_cache_store().await.expect("creating cache store failed");
2321
2322        // Case #1: no last chunk.
2323        {
2324            let (last_chunk, chunk_identifier_generator) =
2325                store.load_last_chunk(linked_chunk_id).await.unwrap();
2326
2327            assert!(last_chunk.is_none());
2328            assert_eq!(chunk_identifier_generator.current(), 0);
2329        }
2330
2331        // Case #2: only one chunk is present.
2332        {
2333            store
2334                .handle_linked_chunk_updates(
2335                    linked_chunk_id,
2336                    vec![
2337                        Update::NewItemsChunk {
2338                            previous: None,
2339                            new: ChunkIdentifier::new(42),
2340                            next: None,
2341                        },
2342                        Update::PushItems {
2343                            at: Position::new(ChunkIdentifier::new(42), 0),
2344                            items: vec![event("saucisse de morteau"), event("comté")],
2345                        },
2346                    ],
2347                )
2348                .await
2349                .unwrap();
2350
2351            let (last_chunk, chunk_identifier_generator) =
2352                store.load_last_chunk(linked_chunk_id).await.unwrap();
2353
2354            assert_matches!(last_chunk, Some(last_chunk) => {
2355                assert_eq!(last_chunk.identifier, 42);
2356                assert!(last_chunk.previous.is_none());
2357                assert!(last_chunk.next.is_none());
2358                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2359                    assert_eq!(items.len(), 2);
2360                    check_test_event(&items[0], "saucisse de morteau");
2361                    check_test_event(&items[1], "comté");
2362                });
2363            });
2364            assert_eq!(chunk_identifier_generator.current(), 42);
2365        }
2366
2367        // Case #3: more chunks are present.
2368        {
2369            store
2370                .handle_linked_chunk_updates(
2371                    linked_chunk_id,
2372                    vec![
2373                        Update::NewItemsChunk {
2374                            previous: Some(ChunkIdentifier::new(42)),
2375                            new: ChunkIdentifier::new(7),
2376                            next: None,
2377                        },
2378                        Update::PushItems {
2379                            at: Position::new(ChunkIdentifier::new(7), 0),
2380                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2381                        },
2382                    ],
2383                )
2384                .await
2385                .unwrap();
2386
2387            let (last_chunk, chunk_identifier_generator) =
2388                store.load_last_chunk(linked_chunk_id).await.unwrap();
2389
2390            assert_matches!(last_chunk, Some(last_chunk) => {
2391                assert_eq!(last_chunk.identifier, 7);
2392                assert_matches!(last_chunk.previous, Some(previous) => {
2393                    assert_eq!(previous, 42);
2394                });
2395                assert!(last_chunk.next.is_none());
2396                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2397                    assert_eq!(items.len(), 3);
2398                    check_test_event(&items[0], "fondue");
2399                    check_test_event(&items[1], "gruyère");
2400                    check_test_event(&items[2], "mont d'or");
2401                });
2402            });
2403            assert_eq!(chunk_identifier_generator.current(), 42);
2404        }
2405    }
2406
2407    #[async_test]
2408    async fn test_load_last_chunk_with_a_cycle() {
2409        let room_id = room_id!("!r0:matrix.org");
2410        let linked_chunk_id = LinkedChunkId::Room(room_id);
2411        let store = get_event_cache_store().await.expect("creating cache store failed");
2412
2413        store
2414            .handle_linked_chunk_updates(
2415                linked_chunk_id,
2416                vec![
2417                    Update::NewItemsChunk {
2418                        previous: None,
2419                        new: ChunkIdentifier::new(0),
2420                        next: None,
2421                    },
2422                    Update::NewItemsChunk {
2423                        // Because `previous` connects to chunk #0, it will create a cycle.
2424                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2425                        // **does not exist**. We have to detect this cycle.
2426                        previous: Some(ChunkIdentifier::new(0)),
2427                        new: ChunkIdentifier::new(1),
2428                        next: Some(ChunkIdentifier::new(0)),
2429                    },
2430                ],
2431            )
2432            .await
2433            .unwrap();
2434
2435        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2436    }
2437
2438    #[async_test]
2439    async fn test_load_previous_chunk() {
2440        let room_id = room_id!("!r0:matrix.org");
2441        let linked_chunk_id = LinkedChunkId::Room(room_id);
2442        let event = |msg: &str| make_test_event(room_id, msg);
2443        let store = get_event_cache_store().await.expect("creating cache store failed");
2444
2445        // Case #1: no chunk at all, equivalent to having an nonexistent
2446        // `before_chunk_identifier`.
2447        {
2448            let previous_chunk = store
2449                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2450                .await
2451                .unwrap();
2452
2453            assert!(previous_chunk.is_none());
2454        }
2455
2456        // Case #2: there is one chunk only: we request the previous on this
2457        // one, it doesn't exist.
2458        {
2459            store
2460                .handle_linked_chunk_updates(
2461                    linked_chunk_id,
2462                    vec![Update::NewItemsChunk {
2463                        previous: None,
2464                        new: ChunkIdentifier::new(42),
2465                        next: None,
2466                    }],
2467                )
2468                .await
2469                .unwrap();
2470
2471            let previous_chunk =
2472                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2473
2474            assert!(previous_chunk.is_none());
2475        }
2476
2477        // Case #3: there are two chunks.
2478        {
2479            store
2480                .handle_linked_chunk_updates(
2481                    linked_chunk_id,
2482                    vec![
2483                        // new chunk before the one that exists.
2484                        Update::NewItemsChunk {
2485                            previous: None,
2486                            new: ChunkIdentifier::new(7),
2487                            next: Some(ChunkIdentifier::new(42)),
2488                        },
2489                        Update::PushItems {
2490                            at: Position::new(ChunkIdentifier::new(7), 0),
2491                            items: vec![event("brigand du jorat"), event("morbier")],
2492                        },
2493                    ],
2494                )
2495                .await
2496                .unwrap();
2497
2498            let previous_chunk =
2499                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2500
2501            assert_matches!(previous_chunk, Some(previous_chunk) => {
2502                assert_eq!(previous_chunk.identifier, 7);
2503                assert!(previous_chunk.previous.is_none());
2504                assert_matches!(previous_chunk.next, Some(next) => {
2505                    assert_eq!(next, 42);
2506                });
2507                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2508                    assert_eq!(items.len(), 2);
2509                    check_test_event(&items[0], "brigand du jorat");
2510                    check_test_event(&items[1], "morbier");
2511                });
2512            });
2513        }
2514    }
2515}
2516
2517#[cfg(test)]
2518mod encrypted_tests {
2519    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2520
2521    use matrix_sdk_base::{
2522        event_cache::store::{EventCacheStore, EventCacheStoreError},
2523        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2524    };
2525    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2526    use once_cell::sync::Lazy;
2527    use ruma::{
2528        event_id,
2529        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2530        room_id, user_id,
2531    };
2532    use tempfile::{tempdir, TempDir};
2533
2534    use super::SqliteEventCacheStore;
2535
2536    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2537    static NUM: AtomicU32 = AtomicU32::new(0);
2538
2539    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2540        let name = NUM.fetch_add(1, SeqCst).to_string();
2541        let tmpdir_path = TMP_DIR.path().join(name);
2542
2543        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2544
2545        Ok(SqliteEventCacheStore::open(
2546            tmpdir_path.to_str().unwrap(),
2547            Some("default_test_password"),
2548        )
2549        .await
2550        .unwrap())
2551    }
2552
2553    event_cache_store_integration_tests!();
2554    event_cache_store_integration_tests_time!();
2555
2556    #[async_test]
2557    async fn test_no_sqlite_injection_in_find_event_relations() {
2558        let room_id = room_id!("!test:localhost");
2559        let another_room_id = room_id!("!r1:matrix.org");
2560        let sender = user_id!("@alice:localhost");
2561
2562        let store = get_event_cache_store()
2563            .await
2564            .expect("We should be able to create a new, empty, event cache store");
2565
2566        let f = EventFactory::new().room(room_id).sender(sender);
2567
2568        // Create an event for the first room.
2569        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2570        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2571
2572        // Create a related event.
2573        let edit_id = event_id!("$find_me:matrix.org");
2574        let edit = f
2575            .text_msg("Find me")
2576            .event_id(edit_id)
2577            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2578            .into_event();
2579
2580        // Create an event for the second room.
2581        let f = f.room(another_room_id);
2582
2583        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2584        let another_event =
2585            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2586
2587        // Save the events in the DB.
2588        store.save_event(room_id, event).await.unwrap();
2589        store.save_event(room_id, edit).await.unwrap();
2590        store.save_event(another_room_id, another_event).await.unwrap();
2591
2592        // Craft a `RelationType` that will inject some SQL to be executed. The
2593        // `OR 1=1` ensures that all the previous parameters, the room
2594        // ID and event ID are ignored.
2595        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2596
2597        // Attempt to find events in the first room.
2598        let results = store
2599            .find_event_relations(room_id, event_id, filter.as_deref())
2600            .await
2601            .expect("We should be able to attempt to find event relations");
2602
2603        // Ensure that we only got the single related event the first room contains.
2604        similar_asserts::assert_eq!(
2605            results.len(),
2606            1,
2607            "We should only have loaded events for the first room {results:#?}"
2608        );
2609
2610        // The event needs to be the edit event, otherwise something is wrong.
2611        let (found_event, _) = &results[0];
2612        assert_eq!(
2613            found_event.event_id().as_deref(),
2614            Some(edit_id),
2615            "The single event we found should be the edit event"
2616        );
2617    }
2618}