matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        store::{extract_event_relation, EventCacheStore},
25        Event, Gap,
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{
38    params, params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior,
39};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47    error::{Error, Result},
48    utils::{
49        repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
50        SqliteKeyValueStoreConnExt, SqliteTransactionExt,
51    },
52    OpenStoreError, Secret, SqliteStoreConfig,
53};
54
55mod keys {
56    // Tables
57    pub const LINKED_CHUNKS: &str = "linked_chunks";
58    pub const EVENTS: &str = "events";
59}
60
61/// The database name.
62const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
63
64/// Identifier of the latest database version.
65///
66/// This is used to figure whether the SQLite database requires a migration.
67/// Every new SQL migration should imply a bump of this number, and changes in
68/// the [`run_migrations`] function.
69const DATABASE_VERSION: u8 = 12;
70
71/// The string used to identify a chunk of type events, in the `type` field in
72/// the database.
73const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
74/// The string used to identify a chunk of type gap, in the `type` field in the
75/// database.
76const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
77
78/// An SQLite-based event cache store.
79#[derive(Clone)]
80pub struct SqliteEventCacheStore {
81    store_cipher: Option<Arc<StoreCipher>>,
82
83    /// The pool of connections.
84    pool: SqlitePool,
85
86    /// We make the difference between connections for read operations, and for
87    /// write operations. We keep a single connection apart from write
88    /// operations. All other connections are used for read operations. The
89    /// lock is used to ensure there is one owner at a time.
90    write_connection: Arc<Mutex<SqliteAsyncConn>>,
91}
92
93#[cfg(not(tarpaulin_include))]
94impl fmt::Debug for SqliteEventCacheStore {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
97    }
98}
99
100impl EncryptableStore for SqliteEventCacheStore {
101    fn get_cypher(&self) -> Option<&StoreCipher> {
102        self.store_cipher.as_deref()
103    }
104}
105
106impl SqliteEventCacheStore {
107    /// Open the SQLite-based event cache store at the given path using the
108    /// given passphrase to encrypt private data.
109    pub async fn open(
110        path: impl AsRef<Path>,
111        passphrase: Option<&str>,
112    ) -> Result<Self, OpenStoreError> {
113        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
114    }
115
116    /// Open the SQLite-based event cache store at the given path using the
117    /// given key to encrypt private data.
118    pub async fn open_with_key(
119        path: impl AsRef<Path>,
120        key: Option<&[u8; 32]>,
121    ) -> Result<Self, OpenStoreError> {
122        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
123    }
124
125    /// Open the SQLite-based event cache store with the config open config.
126    #[instrument(skip(config), fields(path = ?config.path))]
127    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
128        debug!(?config);
129
130        let _timer = timer!("open_with_config");
131
132        let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
133
134        fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
135
136        let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
137        config.pool = Some(pool_config);
138
139        let pool = config.create_pool(Runtime::Tokio1)?;
140
141        let this = Self::open_with_pool(pool, secret).await?;
142        this.write().await?.apply_runtime_config(runtime_config).await?;
143
144        Ok(this)
145    }
146
147    /// Open an SQLite-based event cache store using the given SQLite database
148    /// pool. The given secret will be used to encrypt private data.
149    async fn open_with_pool(
150        pool: SqlitePool,
151        secret: Option<Secret>,
152    ) -> Result<Self, OpenStoreError> {
153        let conn = pool.get().await?;
154
155        let version = conn.db_version().await?;
156        run_migrations(&conn, version).await?;
157
158        let store_cipher = match secret {
159            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
160            None => None,
161        };
162
163        Ok(Self {
164            store_cipher,
165            pool,
166            // Use `conn` as our selected write connections.
167            write_connection: Arc::new(Mutex::new(conn)),
168        })
169    }
170
171    /// Acquire a connection for executing read operations.
172    #[instrument(skip_all)]
173    async fn read(&self) -> Result<SqliteAsyncConn> {
174        trace!("Taking a `read` connection");
175        let _timer = timer!("connection");
176
177        let connection = self.pool.get().await?;
178
179        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
180        // support must be enabled on a per-connection basis. Execute it every
181        // time we try to get a connection, since we can't guarantee a previous
182        // connection did enable it before.
183        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
184
185        Ok(connection)
186    }
187
188    /// Acquire a connection for executing write operations.
189    #[instrument(skip_all)]
190    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
191        trace!("Taking a `write` connection");
192        let _timer = timer!("connection");
193
194        let connection = self.write_connection.clone().lock_owned().await;
195
196        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
197        // support must be enabled on a per-connection basis. Execute it every
198        // time we try to get a connection, since we can't guarantee a previous
199        // connection did enable it before.
200        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
201
202        Ok(connection)
203    }
204
205    fn map_row_to_chunk(
206        row: &rusqlite::Row<'_>,
207    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
208        Ok((
209            row.get::<_, u64>(0)?,
210            row.get::<_, Option<u64>>(1)?,
211            row.get::<_, Option<u64>>(2)?,
212            row.get::<_, String>(3)?,
213        ))
214    }
215
216    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
217        let serialized = serde_json::to_vec(event)?;
218
219        // Extract the relationship info here.
220        let raw_event = event.raw();
221        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
222
223        // The content may be encrypted.
224        let content = self.encode_value(serialized)?;
225
226        Ok(EncodedEvent {
227            content,
228            rel_type,
229            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
230        })
231    }
232}
233
234struct EncodedEvent {
235    content: Vec<u8>,
236    rel_type: Option<String>,
237    relates_to: Option<String>,
238}
239
240trait TransactionExtForLinkedChunks {
241    fn rebuild_chunk(
242        &self,
243        store: &SqliteEventCacheStore,
244        linked_chunk_id: &Key,
245        previous: Option<u64>,
246        index: u64,
247        next: Option<u64>,
248        chunk_type: &str,
249    ) -> Result<RawChunk<Event, Gap>>;
250
251    fn load_gap_content(
252        &self,
253        store: &SqliteEventCacheStore,
254        linked_chunk_id: &Key,
255        chunk_id: ChunkIdentifier,
256    ) -> Result<Gap>;
257
258    fn load_events_content(
259        &self,
260        store: &SqliteEventCacheStore,
261        linked_chunk_id: &Key,
262        chunk_id: ChunkIdentifier,
263    ) -> Result<Vec<Event>>;
264}
265
266impl TransactionExtForLinkedChunks for Transaction<'_> {
267    fn rebuild_chunk(
268        &self,
269        store: &SqliteEventCacheStore,
270        linked_chunk_id: &Key,
271        previous: Option<u64>,
272        id: u64,
273        next: Option<u64>,
274        chunk_type: &str,
275    ) -> Result<RawChunk<Event, Gap>> {
276        let previous = previous.map(ChunkIdentifier::new);
277        let next = next.map(ChunkIdentifier::new);
278        let id = ChunkIdentifier::new(id);
279
280        match chunk_type {
281            CHUNK_TYPE_GAP_TYPE_STRING => {
282                // It's a gap!
283                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
284                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
285            }
286
287            CHUNK_TYPE_EVENT_TYPE_STRING => {
288                // It's events!
289                let events = self.load_events_content(store, linked_chunk_id, id)?;
290                Ok(RawChunk {
291                    content: ChunkContent::Items(events),
292                    previous,
293                    identifier: id,
294                    next,
295                })
296            }
297
298            other => {
299                // It's an error!
300                Err(Error::InvalidData {
301                    details: format!("a linked chunk has an unknown type {other}"),
302                })
303            }
304        }
305    }
306
307    fn load_gap_content(
308        &self,
309        store: &SqliteEventCacheStore,
310        linked_chunk_id: &Key,
311        chunk_id: ChunkIdentifier,
312    ) -> Result<Gap> {
313        // There's at most one row for it in the database, so a call to `query_row` is
314        // sufficient.
315        let encoded_prev_token: Vec<u8> = self.query_row(
316            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
317            (chunk_id.index(), &linked_chunk_id),
318            |row| row.get(0),
319        )?;
320        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
321        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
322        Ok(Gap { prev_token })
323    }
324
325    fn load_events_content(
326        &self,
327        store: &SqliteEventCacheStore,
328        linked_chunk_id: &Key,
329        chunk_id: ChunkIdentifier,
330    ) -> Result<Vec<Event>> {
331        // Retrieve all the events from the database.
332        let mut events = Vec::new();
333
334        for event_data in self
335            .prepare(
336                r#"
337                    SELECT events.content
338                    FROM event_chunks ec, events
339                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
340                    ORDER BY ec.position ASC
341                "#,
342            )?
343            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
344        {
345            let encoded_content = event_data?;
346            let serialized_content = store.decode_value(&encoded_content)?;
347            let event = serde_json::from_slice(&serialized_content)?;
348
349            events.push(event);
350        }
351
352        Ok(events)
353    }
354}
355
356/// Run migrations for the given version of the database.
357async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
358    if version == 0 {
359        debug!("Creating database");
360    } else if version < DATABASE_VERSION {
361        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
362    } else {
363        return Ok(());
364    }
365
366    // Always enable foreign keys for the current connection.
367    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
368
369    if version < 1 {
370        // First turn on WAL mode, this can't be done in the transaction, it fails with
371        // the error message: "cannot change into wal mode from within a transaction".
372        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
373        conn.with_transaction(|txn| {
374            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
375            txn.set_db_version(1)
376        })
377        .await?;
378    }
379
380    if version < 2 {
381        conn.with_transaction(|txn| {
382            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
383            txn.set_db_version(2)
384        })
385        .await?;
386    }
387
388    if version < 3 {
389        conn.with_transaction(|txn| {
390            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
391            txn.set_db_version(3)
392        })
393        .await?;
394    }
395
396    if version < 4 {
397        conn.with_transaction(|txn| {
398            txn.execute_batch(include_str!(
399                "../migrations/event_cache_store/004_ignore_policy.sql"
400            ))?;
401            txn.set_db_version(4)
402        })
403        .await?;
404    }
405
406    if version < 5 {
407        conn.with_transaction(|txn| {
408            txn.execute_batch(include_str!(
409                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
410            ))?;
411            txn.set_db_version(5)
412        })
413        .await?;
414    }
415
416    if version < 6 {
417        conn.with_transaction(|txn| {
418            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
419            txn.set_db_version(6)
420        })
421        .await?;
422    }
423
424    if version < 7 {
425        conn.with_transaction(|txn| {
426            txn.execute_batch(include_str!(
427                "../migrations/event_cache_store/007_event_chunks.sql"
428            ))?;
429            txn.set_db_version(7)
430        })
431        .await?;
432    }
433
434    if version < 8 {
435        conn.with_transaction(|txn| {
436            txn.execute_batch(include_str!(
437                "../migrations/event_cache_store/008_linked_chunk_id.sql"
438            ))?;
439            txn.set_db_version(8)
440        })
441        .await?;
442    }
443
444    if version < 9 {
445        conn.with_transaction(|txn| {
446            txn.execute_batch(include_str!(
447                "../migrations/event_cache_store/009_related_event_index.sql"
448            ))?;
449            txn.set_db_version(9)
450        })
451        .await?;
452    }
453
454    if version < 10 {
455        conn.with_transaction(|txn| {
456            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
457            txn.set_db_version(10)
458        })
459        .await?;
460
461        if version >= 1 {
462            // Defragment the DB and optimize its size on the filesystem now that we removed
463            // the media cache.
464            conn.vacuum().await?;
465        }
466    }
467
468    if version < 11 {
469        conn.with_transaction(|txn| {
470            txn.execute_batch(include_str!(
471                "../migrations/event_cache_store/011_empty_event_cache.sql"
472            ))?;
473            txn.set_db_version(11)
474        })
475        .await?;
476    }
477
478    if version < 12 {
479        conn.with_transaction(|txn| {
480            txn.execute_batch(include_str!(
481                "../migrations/event_cache_store/012_store_event_type.sql"
482            ))?;
483            txn.set_db_version(12)
484        })
485        .await?;
486    }
487
488    Ok(())
489}
490
491#[async_trait]
492impl EventCacheStore for SqliteEventCacheStore {
493    type Error = Error;
494
495    #[instrument(skip(self))]
496    async fn try_take_leased_lock(
497        &self,
498        lease_duration_ms: u32,
499        key: &str,
500        holder: &str,
501    ) -> Result<bool> {
502        let _timer = timer!("method");
503
504        let key = key.to_owned();
505        let holder = holder.to_owned();
506
507        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
508        let expiration = now + lease_duration_ms as u64;
509
510        let num_touched = self
511            .write()
512            .await?
513            .with_transaction(move |txn| {
514                txn.execute(
515                    "INSERT INTO lease_locks (key, holder, expiration)
516                    VALUES (?1, ?2, ?3)
517                    ON CONFLICT (key)
518                    DO
519                        UPDATE SET holder = ?2, expiration = ?3
520                        WHERE holder = ?2
521                        OR expiration < ?4
522                ",
523                    (key, holder, expiration, now),
524                )
525            })
526            .await?;
527
528        Ok(num_touched == 1)
529    }
530
531    #[instrument(skip(self, updates))]
532    async fn handle_linked_chunk_updates(
533        &self,
534        linked_chunk_id: LinkedChunkId<'_>,
535        updates: Vec<Update<Event, Gap>>,
536    ) -> Result<(), Self::Error> {
537        let _timer = timer!("method");
538
539        // Use a single transaction throughout this function, so that either all updates
540        // work, or none is taken into account.
541        let hashed_linked_chunk_id =
542            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
543        let linked_chunk_id = linked_chunk_id.to_owned();
544        let this = self.clone();
545
546        with_immediate_transaction(self, move |txn| {
547            for up in updates {
548                match up {
549                    Update::NewItemsChunk { previous, new, next } => {
550                        let previous = previous.as_ref().map(ChunkIdentifier::index);
551                        let new = new.index();
552                        let next = next.as_ref().map(ChunkIdentifier::index);
553
554                        trace!(
555                            %linked_chunk_id,
556                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
557                        );
558
559                        insert_chunk(
560                            txn,
561                            &hashed_linked_chunk_id,
562                            previous,
563                            new,
564                            next,
565                            CHUNK_TYPE_EVENT_TYPE_STRING,
566                        )?;
567                    }
568
569                    Update::NewGapChunk { previous, new, next, gap } => {
570                        let serialized = serde_json::to_vec(&gap.prev_token)?;
571                        let prev_token = this.encode_value(serialized)?;
572
573                        let previous = previous.as_ref().map(ChunkIdentifier::index);
574                        let new = new.index();
575                        let next = next.as_ref().map(ChunkIdentifier::index);
576
577                        trace!(
578                            %linked_chunk_id,
579                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
580                        );
581
582                        // Insert the chunk as a gap.
583                        insert_chunk(
584                            txn,
585                            &hashed_linked_chunk_id,
586                            previous,
587                            new,
588                            next,
589                            CHUNK_TYPE_GAP_TYPE_STRING,
590                        )?;
591
592                        // Insert the gap's value.
593                        txn.execute(
594                            r#"
595                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
596                            VALUES (?, ?, ?)
597                        "#,
598                            (new, &hashed_linked_chunk_id, prev_token),
599                        )?;
600                    }
601
602                    Update::RemoveChunk(chunk_identifier) => {
603                        let chunk_id = chunk_identifier.index();
604
605                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
606
607                        // Find chunk to delete.
608                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
609                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
610                            (chunk_id, &hashed_linked_chunk_id),
611                            |row| Ok((row.get(0)?, row.get(1)?))
612                        )?;
613
614                        // Replace its previous' next to its own next.
615                        if let Some(previous) = previous {
616                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
617                        }
618
619                        // Replace its next' previous to its own previous.
620                        if let Some(next) = next {
621                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
622                        }
623
624                        // Now delete it, and let cascading delete corresponding entries in the
625                        // other data tables.
626                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
627                    }
628
629                    Update::PushItems { at, items } => {
630                        if items.is_empty() {
631                            // Should never happens, but better be safe.
632                            continue;
633                        }
634
635                        let chunk_id = at.chunk_identifier().index();
636
637                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
638
639                        let mut chunk_statement = txn.prepare(
640                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
641                        )?;
642
643                        // Note: we use `OR REPLACE` here, because the event might have been
644                        // already inserted in the database. This is the case when an event is
645                        // deduplicated and moved to another position; or because it was inserted
646                        // outside the context of a linked chunk (e.g. pinned event).
647                        let mut content_statement = txn.prepare(
648                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
649                        )?;
650
651                        let invalid_event = |event: TimelineEvent| {
652                            let Some(event_id) = event.event_id() else {
653                                error!(%linked_chunk_id, "Trying to push an event with no ID");
654                                return None;
655                            };
656
657                            let Some(event_type) = event.kind.event_type() else {
658                                error!(%event_id, "Trying to save an event with no event type");
659                                return None;
660                            };
661
662                            Some((event_id.to_string(), event_type, event))
663                        };
664
665                        let room_id = linked_chunk_id.room_id();
666                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
667
668                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
669                            // Insert the location information into the database.
670                            let index = at.index() + i;
671                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
672
673                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
674                            let event_type = this.encode_key(keys::EVENTS, event_type);
675
676                            // Now, insert the event content into the database.
677                            let encoded_event = this.encode_event(&event)?;
678                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
679                        }
680                    }
681
682                    Update::ReplaceItem { at, item: event } => {
683                        let chunk_id = at.chunk_identifier().index();
684
685                        let index = at.index();
686
687                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
688
689                        // The event id should be the same, but just in case it changed…
690                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
691                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
692                            continue;
693                        };
694
695                        let Some(event_type) = event.kind.event_type() else {
696                            error!(%event_id, "Trying to save an event with no event type");
697                            continue;
698                        };
699
700                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
701                        let event_type = this.encode_key(keys::EVENTS, event_type);
702
703                        // Replace the event's content. Really we'd like to update, but in case the
704                        // event id changed, we are a bit lenient here and will allow an insertion
705                        // of the new event.
706                        let encoded_event = this.encode_event(&event)?;
707                        let room_id = linked_chunk_id.room_id();
708                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
709                        txn.execute(
710                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
711                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
712
713                        // Replace the event id in the linked chunk, in case it changed.
714                        txn.execute(
715                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
716                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
717                        )?;
718                    }
719
720                    Update::RemoveItem { at } => {
721                        let chunk_id = at.chunk_identifier().index();
722                        let index = at.index();
723
724                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
725
726                        // Remove the entry in the chunk table.
727                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
728
729                        // Decrement the index of each item after the one we are
730                        // going to remove.
731                        //
732                        // Imagine we have the following events:
733                        //
734                        // | event_id | linked_chunk_id | chunk_id | position |
735                        // |----------|-----------------|----------|----------|
736                        // | $ev0     | !r0             | 42       | 0        |
737                        // | $ev1     | !r0             | 42       | 1        |
738                        // | $ev2     | !r0             | 42       | 2        |
739                        // | $ev3     | !r0             | 42       | 3        |
740                        // | $ev4     | !r0             | 42       | 4        |
741                        //
742                        // `$ev2` has been removed, then we end up in this
743                        // state:
744                        //
745                        // | event_id | linked_chunk_id    | chunk_id | position |
746                        // |----------|--------------------|----------|----------|
747                        // | $ev0     | !r0                | 42       | 0        |
748                        // | $ev1     | !r0                | 42       | 1        |
749                        // |          |                    |          |          | <- no more `$ev2`
750                        // | $ev3     | !r0                | 42       | 3        |
751                        // | $ev4     | !r0                | 42       | 4        |
752                        //
753                        // We need to shift the `position` of `$ev3` and `$ev4`
754                        // to `position - 1`, like so:
755                        //
756                        // | event_id | linked_chunk_id | chunk_id | position |
757                        // |----------|-----------------|----------|----------|
758                        // | $ev0     | !r0             | 42       | 0        |
759                        // | $ev1     | !r0             | 42       | 1        |
760                        // | $ev3     | !r0             | 42       | 2        |
761                        // | $ev4     | !r0             | 42       | 3        |
762                        //
763                        // Usually, it boils down to run the following query:
764                        //
765                        // ```sql
766                        // UPDATE event_chunks
767                        // SET position = position - 1
768                        // WHERE position > 2 AND …
769                        // ```
770                        //
771                        // Okay. But `UPDATE` runs on rows in no particular
772                        // order. It means that it can update `$ev4` before
773                        // `$ev3` for example. What happens in this particular
774                        // case? The `position` of `$ev4` becomes `3`, however
775                        // `$ev3` already has `position = 3`. Because there
776                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
777                        // position)`, it will result in a constraint violation.
778                        //
779                        // There is **no way** to control the execution order of
780                        // `UPDATE` in SQLite. To persuade yourself, try:
781                        //
782                        // ```sql
783                        // UPDATE event_chunks
784                        // SET position = position - 1
785                        // FROM (
786                        //     SELECT event_id
787                        //     FROM event_chunks
788                        //     WHERE position > 2 AND …
789                        //     ORDER BY position ASC
790                        // ) as ordered
791                        // WHERE event_chunks.event_id = ordered.event_id
792                        // ```
793                        //
794                        // It will fail the same way.
795                        //
796                        // Thus, we have 2 solutions:
797                        //
798                        // 1. Remove the `UNIQUE` constraint,
799                        // 2. Be creative.
800                        //
801                        // The `UNIQUE` constraint is a safe belt. Normally, we
802                        // have `event_cache::Deduplicator` that is responsible
803                        // to ensure there is no duplicated event. However,
804                        // relying on this is “fragile” in the sense it can
805                        // contain bugs. Relying on the `UNIQUE` constraint from
806                        // SQLite is more robust. It's “braces and belt” as we
807                        // say here.
808                        //
809                        // So. We need to be creative.
810                        //
811                        // Many solutions exist. Amongst the most popular, we
812                        // see _dropping and re-creating the index_, which is
813                        // no-go for us, it's too expensive. I (@hywan) have
814                        // adopted the following one:
815                        //
816                        // - Do `position = position - 1` but in the negative
817                        //   space, so `position = -(position - 1)`. A position
818                        //   cannot be negative; we are sure it is unique!
819                        // - Once all candidate rows are updated, do `position =
820                        //   -position` to move back to the positive space.
821                        //
822                        // 'told you it's gonna be creative.
823                        //
824                        // This solution is a hack, **but** it is a small
825                        // number of operations, and we can keep the `UNIQUE`
826                        // constraint in place.
827                        txn.execute(
828                            r#"
829                                UPDATE event_chunks
830                                SET position = -(position - 1)
831                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
832                            "#,
833                            (&hashed_linked_chunk_id, chunk_id, index)
834                        )?;
835                        txn.execute(
836                            r#"
837                                UPDATE event_chunks
838                                SET position = -position
839                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
840                            "#,
841                            (&hashed_linked_chunk_id, chunk_id)
842                        )?;
843                    }
844
845                    Update::DetachLastItems { at } => {
846                        let chunk_id = at.chunk_identifier().index();
847                        let index = at.index();
848
849                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
850
851                        // Remove these entries.
852                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
853                    }
854
855                    Update::Clear => {
856                        trace!(%linked_chunk_id, "clearing items");
857
858                        // Remove chunks, and let cascading do its job.
859                        txn.execute(
860                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
861                            (&hashed_linked_chunk_id,),
862                        )?;
863                    }
864
865                    Update::StartReattachItems | Update::EndReattachItems => {
866                        // Nothing.
867                    }
868                }
869            }
870
871            Ok(())
872        })
873        .await?;
874
875        Ok(())
876    }
877
878    #[instrument(skip(self))]
879    async fn load_all_chunks(
880        &self,
881        linked_chunk_id: LinkedChunkId<'_>,
882    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
883        let _timer = timer!("method");
884
885        let hashed_linked_chunk_id =
886            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
887
888        let this = self.clone();
889
890        let result = self
891            .read()
892            .await?
893            .with_transaction(move |txn| -> Result<_> {
894                let mut items = Vec::new();
895
896                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
897                for data in txn
898                    .prepare(
899                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
900                    )?
901                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
902                {
903                    let (id, previous, next, chunk_type) = data?;
904                    let new = txn.rebuild_chunk(
905                        &this,
906                        &hashed_linked_chunk_id,
907                        previous,
908                        id,
909                        next,
910                        chunk_type.as_str(),
911                    )?;
912                    items.push(new);
913                }
914
915                Ok(items)
916            })
917            .await?;
918
919        Ok(result)
920    }
921
922    #[instrument(skip(self))]
923    async fn load_all_chunks_metadata(
924        &self,
925        linked_chunk_id: LinkedChunkId<'_>,
926    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
927        let _timer = timer!("method");
928
929        let hashed_linked_chunk_id =
930            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
931
932        self.read()
933            .await?
934            .with_transaction(move |txn| -> Result<_> {
935                // We want to collect the metadata about each chunk (id, next, previous), and
936                // for event chunks, the number of events in it. For gaps, the
937                // number of events is 0, by convention.
938                //
939                // We've tried different strategies over time:
940                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
941                //   caused a full table traversal for each chunk, including for gaps which
942                //   don't have any events. This happened in
943                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
944                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
945                //   additional `SELECT` query. It was an immense improvement, but still caused
946                //   one select query per event chunk. This happened in
947                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
948                //
949                // The current solution is to run two queries:
950                // - one to get each chunk and its number of events, by doing a single `SELECT`
951                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
952                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
953                //   hashmap.
954                // - one to get each chunk's metadata (id, previous, next, type) from the
955                //   database with a `SELECT`, and then use the hashmap to get the number of
956                //   events.
957                //
958                // This strategy minimizes the number of queries to the database, and keeps them
959                // super simple, while doing a bit more processing here, which is much faster.
960
961                let num_events_by_chunk_ids = txn
962                    .prepare(
963                        r#"
964                            SELECT ec.chunk_id, COUNT(ec.event_id)
965                            FROM event_chunks as ec
966                            WHERE ec.linked_chunk_id = ?
967                            GROUP BY ec.chunk_id
968                        "#,
969                    )?
970                    .query_map((&hashed_linked_chunk_id,), |row| {
971                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
972                    })?
973                    .collect::<Result<HashMap<_, _>, _>>()?;
974
975                txn.prepare(
976                    r#"
977                        SELECT
978                            lc.id,
979                            lc.previous,
980                            lc.next,
981                            lc.type
982                        FROM linked_chunks as lc
983                        WHERE lc.linked_chunk_id = ?
984                        ORDER BY lc.id"#,
985                )?
986                .query_map((&hashed_linked_chunk_id,), |row| {
987                    Ok((
988                        row.get::<_, u64>(0)?,
989                        row.get::<_, Option<u64>>(1)?,
990                        row.get::<_, Option<u64>>(2)?,
991                        row.get::<_, String>(3)?,
992                    ))
993                })?
994                .map(|data| -> Result<_> {
995                    let (id, previous, next, chunk_type) = data?;
996
997                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
998                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
999                    // benchmarking shows that this is slightly slower than matching the chunk
1000                    // type (around 1%, so in the realm of noise), so we keep the explicit
1001                    // check instead.
1002                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1003                        0
1004                    } else {
1005                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1006                    };
1007
1008                    Ok(ChunkMetadata {
1009                        identifier: ChunkIdentifier::new(id),
1010                        previous: previous.map(ChunkIdentifier::new),
1011                        next: next.map(ChunkIdentifier::new),
1012                        num_items,
1013                    })
1014                })
1015                .collect::<Result<Vec<_>, _>>()
1016            })
1017            .await
1018    }
1019
1020    #[instrument(skip(self))]
1021    async fn load_last_chunk(
1022        &self,
1023        linked_chunk_id: LinkedChunkId<'_>,
1024    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1025        let _timer = timer!("method");
1026
1027        let hashed_linked_chunk_id =
1028            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1029
1030        let this = self.clone();
1031
1032        self
1033            .read()
1034            .await?
1035            .with_transaction(move |txn| -> Result<_> {
1036                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1037                let (observed_max_identifier, number_of_chunks) = txn
1038                    .prepare(
1039                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1040                    )?
1041                    .query_row(
1042                        (&hashed_linked_chunk_id,),
1043                        |row| {
1044                            Ok((
1045                                // Read the `MAX(id)` as an `Option<u64>` instead
1046                                // of `u64` in case the `SELECT` returns nothing.
1047                                // Indeed, if it returns no line, the `MAX(id)` is
1048                                // set to `Null`.
1049                                row.get::<_, Option<u64>>(0)?,
1050                                row.get::<_, u64>(1)?,
1051                            ))
1052                        }
1053                    )?;
1054
1055                let chunk_identifier_generator = match observed_max_identifier {
1056                    Some(max_observed_identifier) => {
1057                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1058                            ChunkIdentifier::new(max_observed_identifier)
1059                        )
1060                    },
1061                    None => ChunkIdentifierGenerator::new_from_scratch(),
1062                };
1063
1064                // Find the last chunk.
1065                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1066                    .prepare(
1067                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1068                    )?
1069                    .query_row(
1070                        (&hashed_linked_chunk_id,),
1071                        |row| {
1072                            Ok((
1073                                row.get::<_, u64>(0)?,
1074                                row.get::<_, Option<u64>>(1)?,
1075                                row.get::<_, String>(2)?,
1076                            ))
1077                        }
1078                    )
1079                    .optional()?
1080                else {
1081                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1082                    // good.
1083                    if number_of_chunks == 0 {
1084                        return Ok((None, chunk_identifier_generator));
1085                    }
1086                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1087                    // linked chunk is malformed.
1088                    //
1089                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1090                    else {
1091                        return Err(Error::InvalidData {
1092                            details:
1093                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1094                                    .to_owned()
1095                            }
1096                        )
1097                    }
1098                };
1099
1100                // Build the chunk.
1101                let last_chunk = txn.rebuild_chunk(
1102                    &this,
1103                    &hashed_linked_chunk_id,
1104                    previous_chunk,
1105                    chunk_identifier,
1106                    None,
1107                    &chunk_type
1108                )?;
1109
1110                Ok((Some(last_chunk), chunk_identifier_generator))
1111            })
1112            .await
1113    }
1114
1115    #[instrument(skip(self))]
1116    async fn load_previous_chunk(
1117        &self,
1118        linked_chunk_id: LinkedChunkId<'_>,
1119        before_chunk_identifier: ChunkIdentifier,
1120    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1121        let _timer = timer!("method");
1122
1123        let hashed_linked_chunk_id =
1124            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1125
1126        let this = self.clone();
1127
1128        self
1129            .read()
1130            .await?
1131            .with_transaction(move |txn| -> Result<_> {
1132                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1133                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1134                    .prepare(
1135                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1136                    )?
1137                    .query_row(
1138                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1139                        |row| {
1140                            Ok((
1141                                row.get::<_, u64>(0)?,
1142                                row.get::<_, Option<u64>>(1)?,
1143                                row.get::<_, Option<u64>>(2)?,
1144                                row.get::<_, String>(3)?,
1145                            ))
1146                        }
1147                    )
1148                    .optional()?
1149                else {
1150                    // Chunk is not found.
1151                    return Ok(None);
1152                };
1153
1154                // Build the chunk.
1155                let last_chunk = txn.rebuild_chunk(
1156                    &this,
1157                    &hashed_linked_chunk_id,
1158                    previous_chunk,
1159                    chunk_identifier,
1160                    next_chunk,
1161                    &chunk_type
1162                )?;
1163
1164                Ok(Some(last_chunk))
1165            })
1166            .await
1167    }
1168
1169    #[instrument(skip(self))]
1170    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1171        let _timer = timer!("method");
1172
1173        self.write()
1174            .await?
1175            .with_transaction(move |txn| {
1176                // Remove all the chunks, and let cascading do its job.
1177                txn.execute("DELETE FROM linked_chunks", ())?;
1178                // Also clear all the events' contents.
1179                txn.execute("DELETE FROM events", ())
1180            })
1181            .await?;
1182
1183        Ok(())
1184    }
1185
1186    #[instrument(skip(self, events))]
1187    async fn filter_duplicated_events(
1188        &self,
1189        linked_chunk_id: LinkedChunkId<'_>,
1190        events: Vec<OwnedEventId>,
1191    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1192        let _timer = timer!("method");
1193
1194        // If there's no events for which we want to check duplicates, we can return
1195        // early. It's not only an optimization to do so: it's required, otherwise the
1196        // `repeat_vars` call below will panic.
1197        if events.is_empty() {
1198            return Ok(Vec::new());
1199        }
1200
1201        // Select all events that exist in the store, i.e. the duplicates.
1202        let hashed_linked_chunk_id =
1203            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1204        let linked_chunk_id = linked_chunk_id.to_owned();
1205
1206        self.read()
1207            .await?
1208            .with_transaction(move |txn| -> Result<_> {
1209                txn.chunk_large_query_over(events, None, move |txn, events| {
1210                    let query = format!(
1211                        r#"
1212                            SELECT event_id, chunk_id, position
1213                            FROM event_chunks
1214                            WHERE linked_chunk_id = ? AND event_id IN ({})
1215                            ORDER BY chunk_id ASC, position ASC
1216                        "#,
1217                        repeat_vars(events.len()),
1218                    );
1219
1220                    let parameters = params_from_iter(
1221                        // parameter for `linked_chunk_id = ?`
1222                        once(
1223                            hashed_linked_chunk_id
1224                                .to_sql()
1225                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1226                                .unwrap(),
1227                        )
1228                        // parameters for `event_id IN (…)`
1229                        .chain(events.iter().map(|event| {
1230                            event
1231                                .as_str()
1232                                .to_sql()
1233                                // SAFETY: it cannot fail since `str::to_sql` never fails
1234                                .unwrap()
1235                        })),
1236                    );
1237
1238                    let mut duplicated_events = Vec::new();
1239
1240                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1241                        Ok((
1242                            row.get::<_, String>(0)?,
1243                            row.get::<_, u64>(1)?,
1244                            row.get::<_, usize>(2)?,
1245                        ))
1246                    })? {
1247                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1248
1249                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1250                            // Normally unreachable, but the event ID has been stored even if it is
1251                            // malformed, let's skip it.
1252                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1253                            continue;
1254                        };
1255
1256                        duplicated_events.push((
1257                            duplicated_event,
1258                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1259                        ));
1260                    }
1261
1262                    Ok(duplicated_events)
1263                })
1264            })
1265            .await
1266    }
1267
1268    #[instrument(skip(self, event_id))]
1269    async fn find_event(
1270        &self,
1271        room_id: &RoomId,
1272        event_id: &EventId,
1273    ) -> Result<Option<Event>, Self::Error> {
1274        let _timer = timer!("method");
1275
1276        let event_id = event_id.to_owned();
1277        let this = self.clone();
1278
1279        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1280
1281        self.read()
1282            .await?
1283            .with_transaction(move |txn| -> Result<_> {
1284                let Some(event) = txn
1285                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1286                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1287                    .optional()?
1288                else {
1289                    // Event is not found.
1290                    return Ok(None);
1291                };
1292
1293                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1294
1295                Ok(Some(event))
1296            })
1297            .await
1298    }
1299
1300    #[instrument(skip(self, event_id, filters))]
1301    async fn find_event_relations(
1302        &self,
1303        room_id: &RoomId,
1304        event_id: &EventId,
1305        filters: Option<&[RelationType]>,
1306    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1307        let _timer = timer!("method");
1308
1309        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1310
1311        let hashed_linked_chunk_id =
1312            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1313
1314        let event_id = event_id.to_owned();
1315        let filters = filters.map(ToOwned::to_owned);
1316        let store = self.clone();
1317
1318        self.read()
1319            .await?
1320            .with_transaction(move |txn| -> Result<_> {
1321                find_event_relations_transaction(
1322                    store,
1323                    hashed_room_id,
1324                    hashed_linked_chunk_id,
1325                    event_id,
1326                    filters,
1327                    txn,
1328                )
1329            })
1330            .await
1331    }
1332
1333    #[instrument(skip(self))]
1334    async fn get_room_events(
1335        &self,
1336        room_id: &RoomId,
1337        event_type: Option<&str>,
1338        session_id: Option<&str>,
1339    ) -> Result<Vec<Event>, Self::Error> {
1340        let _timer = timer!("method");
1341
1342        let this = self.clone();
1343
1344        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1345        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1346        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1347
1348        self.read()
1349            .await?
1350            .with_transaction(move |txn| -> Result<_> {
1351                // I'm not sure why clippy claims that the clones aren't required. The compiler
1352                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1353                // much so let's silence things.
1354                #[allow(clippy::redundant_clone)]
1355                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1356                    (None, None) => {
1357                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1358                    }
1359                    (None, Some(session_id)) => (
1360                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1361                        params![hashed_room_id, session_id.to_owned()],
1362                    ),
1363                    (Some(event_type), None) => (
1364                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1365                        params![hashed_room_id, event_type.to_owned()]
1366                    ),
1367                    (Some(event_type), Some(session_id)) => (
1368                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1369                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1370                    ),
1371                };
1372
1373                let mut statement = txn.prepare(query)?;
1374                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1375
1376                let mut events = Vec::new();
1377                for ev in maybe_events {
1378                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1379                    events.push(event);
1380                }
1381
1382                Ok(events)
1383            })
1384            .await
1385    }
1386
1387    #[instrument(skip(self, event))]
1388    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1389        let _timer = timer!("method");
1390
1391        let Some(event_id) = event.event_id() else {
1392            error!("Trying to save an event with no ID");
1393            return Ok(());
1394        };
1395
1396        let Some(event_type) = event.kind.event_type() else {
1397            error!(%event_id, "Trying to save an event with no event type");
1398            return Ok(());
1399        };
1400
1401        let event_type = self.encode_key(keys::EVENTS, event_type);
1402        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1403
1404        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1405        let event_id = event_id.to_string();
1406        let encoded_event = self.encode_event(&event)?;
1407
1408        self.write()
1409            .await?
1410            .with_transaction(move |txn| -> Result<_> {
1411                txn.execute(
1412                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1413                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1414
1415                Ok(())
1416            })
1417            .await
1418    }
1419}
1420
1421fn find_event_relations_transaction(
1422    store: SqliteEventCacheStore,
1423    hashed_room_id: Key,
1424    hashed_linked_chunk_id: Key,
1425    event_id: OwnedEventId,
1426    filters: Option<Vec<RelationType>>,
1427    txn: &Transaction<'_>,
1428) -> Result<Vec<(Event, Option<Position>)>> {
1429    let get_rows = |row: &rusqlite::Row<'_>| {
1430        Ok((
1431            row.get::<_, Vec<u8>>(0)?,
1432            row.get::<_, Option<u64>>(1)?,
1433            row.get::<_, Option<usize>>(2)?,
1434        ))
1435    };
1436
1437    // Collect related events.
1438    let collect_results = |transaction| {
1439        let mut related = Vec::new();
1440
1441        for result in transaction {
1442            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1443
1444            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1445
1446            // Only build the position if both the chunk_id and position were present; in
1447            // theory, they should either be present at the same time, or not at all.
1448            let pos = chunk_id
1449                .zip(index)
1450                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1451
1452            related.push((event, pos));
1453        }
1454
1455        Ok(related)
1456    };
1457
1458    let related = if let Some(filters) = filters {
1459        let question_marks = repeat_vars(filters.len());
1460        let query = format!(
1461            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1462            FROM events
1463            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1464            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1465        );
1466
1467        // First the filters need to be stringified; because `.to_sql()` will borrow
1468        // from them, they also need to be stringified onto the stack, so as to
1469        // get a stable address (to avoid returning a temporary reference in the
1470        // map closure below).
1471        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1472        let filters_params: Vec<_> = filter_strings
1473            .iter()
1474            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1475            .collect();
1476
1477        let parameters = params_from_iter(
1478            [
1479                hashed_linked_chunk_id.to_sql().expect(
1480                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1481                ),
1482                event_id
1483                    .as_str()
1484                    .to_sql()
1485                    .expect("We should be able to convert an event ID to a SQLite value"),
1486                hashed_room_id
1487                    .to_sql()
1488                    .expect("We should be able to convert a room ID to a SQLite value"),
1489            ]
1490            .into_iter()
1491            .chain(filters_params),
1492        );
1493
1494        let mut transaction = txn.prepare(&query)?;
1495        let transaction = transaction.query_map(parameters, get_rows)?;
1496
1497        collect_results(transaction)
1498    } else {
1499        let query =
1500            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1501            FROM events
1502            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1503            WHERE relates_to = ? AND room_id = ?";
1504        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1505
1506        let mut transaction = txn.prepare(query)?;
1507        let transaction = transaction.query_map(parameters, get_rows)?;
1508
1509        collect_results(transaction)
1510    };
1511
1512    related
1513}
1514
1515/// Like `deadpool::managed::Object::with_transaction`, but starts the
1516/// transaction in immediate (write) mode from the beginning, precluding errors
1517/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1518/// both reads and writes, and start with a write.
1519async fn with_immediate_transaction<
1520    T: Send + 'static,
1521    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1522>(
1523    this: &SqliteEventCacheStore,
1524    f: F,
1525) -> Result<T, Error> {
1526    this.write()
1527        .await?
1528        .interact(move |conn| -> Result<T, Error> {
1529            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1530            // to avoid read transactions upgrading to write mode and causing
1531            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1532            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1533
1534            let code = || -> Result<T, Error> {
1535                let txn = conn.transaction()?;
1536                let res = f(&txn)?;
1537                txn.commit()?;
1538                Ok(res)
1539            };
1540
1541            let res = code();
1542
1543            // Reset the transaction behavior to use Deferred, after this transaction has
1544            // been run, whether it was successful or not.
1545            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1546
1547            res
1548        })
1549        .await
1550        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1551        .unwrap()
1552}
1553
1554fn insert_chunk(
1555    txn: &Transaction<'_>,
1556    linked_chunk_id: &Key,
1557    previous: Option<u64>,
1558    new: u64,
1559    next: Option<u64>,
1560    type_str: &str,
1561) -> rusqlite::Result<()> {
1562    // First, insert the new chunk.
1563    txn.execute(
1564        r#"
1565            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1566            VALUES (?, ?, ?, ?, ?)
1567        "#,
1568        (new, linked_chunk_id, previous, next, type_str),
1569    )?;
1570
1571    // If this chunk has a previous one, update its `next` field.
1572    if let Some(previous) = previous {
1573        txn.execute(
1574            r#"
1575                UPDATE linked_chunks
1576                SET next = ?
1577                WHERE id = ? AND linked_chunk_id = ?
1578            "#,
1579            (new, previous, linked_chunk_id),
1580        )?;
1581    }
1582
1583    // If this chunk has a next one, update its `previous` field.
1584    if let Some(next) = next {
1585        txn.execute(
1586            r#"
1587                UPDATE linked_chunks
1588                SET previous = ?
1589                WHERE id = ? AND linked_chunk_id = ?
1590            "#,
1591            (new, next, linked_chunk_id),
1592        )?;
1593    }
1594
1595    Ok(())
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600    use std::{
1601        path::PathBuf,
1602        sync::atomic::{AtomicU32, Ordering::SeqCst},
1603    };
1604
1605    use assert_matches::assert_matches;
1606    use matrix_sdk_base::{
1607        event_cache::{
1608            store::{
1609                integration_tests::{
1610                    check_test_event, make_test_event, make_test_event_with_event_id,
1611                },
1612                EventCacheStore, EventCacheStoreError,
1613            },
1614            Gap,
1615        },
1616        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1617        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1618    };
1619    use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1620    use once_cell::sync::Lazy;
1621    use ruma::{event_id, room_id};
1622    use tempfile::{tempdir, TempDir};
1623
1624    use super::SqliteEventCacheStore;
1625    use crate::{
1626        event_cache_store::keys,
1627        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1628        SqliteStoreConfig,
1629    };
1630
1631    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1632    static NUM: AtomicU32 = AtomicU32::new(0);
1633
1634    fn new_event_cache_store_workspace() -> PathBuf {
1635        let name = NUM.fetch_add(1, SeqCst).to_string();
1636        TMP_DIR.path().join(name)
1637    }
1638
1639    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1640        let tmpdir_path = new_event_cache_store_workspace();
1641
1642        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1643
1644        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1645    }
1646
1647    event_cache_store_integration_tests!();
1648    event_cache_store_integration_tests_time!();
1649
1650    #[async_test]
1651    async fn test_pool_size() {
1652        let tmpdir_path = new_event_cache_store_workspace();
1653        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1654
1655        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1656
1657        assert_eq!(store.pool.status().max_size, 42);
1658    }
1659
1660    #[async_test]
1661    async fn test_linked_chunk_new_items_chunk() {
1662        let store = get_event_cache_store().await.expect("creating cache store failed");
1663
1664        let room_id = &DEFAULT_TEST_ROOM_ID;
1665        let linked_chunk_id = LinkedChunkId::Room(room_id);
1666
1667        store
1668            .handle_linked_chunk_updates(
1669                linked_chunk_id,
1670                vec![
1671                    Update::NewItemsChunk {
1672                        previous: None,
1673                        new: ChunkIdentifier::new(42),
1674                        next: None, // Note: the store must link the next entry itself.
1675                    },
1676                    Update::NewItemsChunk {
1677                        previous: Some(ChunkIdentifier::new(42)),
1678                        new: ChunkIdentifier::new(13),
1679                        next: Some(ChunkIdentifier::new(37)), /* But it's fine to explicitly pass
1680                                                               * the next link ahead of time. */
1681                    },
1682                    Update::NewItemsChunk {
1683                        previous: Some(ChunkIdentifier::new(13)),
1684                        new: ChunkIdentifier::new(37),
1685                        next: None,
1686                    },
1687                ],
1688            )
1689            .await
1690            .unwrap();
1691
1692        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1693
1694        assert_eq!(chunks.len(), 3);
1695
1696        {
1697            // Chunks are ordered from smaller to bigger IDs.
1698            let c = chunks.remove(0);
1699            assert_eq!(c.identifier, ChunkIdentifier::new(13));
1700            assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1701            assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1702            assert_matches!(c.content, ChunkContent::Items(events) => {
1703                assert!(events.is_empty());
1704            });
1705
1706            let c = chunks.remove(0);
1707            assert_eq!(c.identifier, ChunkIdentifier::new(37));
1708            assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1709            assert_eq!(c.next, None);
1710            assert_matches!(c.content, ChunkContent::Items(events) => {
1711                assert!(events.is_empty());
1712            });
1713
1714            let c = chunks.remove(0);
1715            assert_eq!(c.identifier, ChunkIdentifier::new(42));
1716            assert_eq!(c.previous, None);
1717            assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1718            assert_matches!(c.content, ChunkContent::Items(events) => {
1719                assert!(events.is_empty());
1720            });
1721        }
1722    }
1723
1724    #[async_test]
1725    async fn test_linked_chunk_new_gap_chunk() {
1726        let store = get_event_cache_store().await.expect("creating cache store failed");
1727
1728        let room_id = &DEFAULT_TEST_ROOM_ID;
1729        let linked_chunk_id = LinkedChunkId::Room(room_id);
1730
1731        store
1732            .handle_linked_chunk_updates(
1733                linked_chunk_id,
1734                vec![Update::NewGapChunk {
1735                    previous: None,
1736                    new: ChunkIdentifier::new(42),
1737                    next: None,
1738                    gap: Gap { prev_token: "raclette".to_owned() },
1739                }],
1740            )
1741            .await
1742            .unwrap();
1743
1744        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1745
1746        assert_eq!(chunks.len(), 1);
1747
1748        // Chunks are ordered from smaller to bigger IDs.
1749        let c = chunks.remove(0);
1750        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1751        assert_eq!(c.previous, None);
1752        assert_eq!(c.next, None);
1753        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1754            assert_eq!(gap.prev_token, "raclette");
1755        });
1756    }
1757
1758    #[async_test]
1759    async fn test_linked_chunk_replace_item() {
1760        let store = get_event_cache_store().await.expect("creating cache store failed");
1761
1762        let room_id = &DEFAULT_TEST_ROOM_ID;
1763        let linked_chunk_id = LinkedChunkId::Room(room_id);
1764        let event_id = event_id!("$world");
1765
1766        store
1767            .handle_linked_chunk_updates(
1768                linked_chunk_id,
1769                vec![
1770                    Update::NewItemsChunk {
1771                        previous: None,
1772                        new: ChunkIdentifier::new(42),
1773                        next: None,
1774                    },
1775                    Update::PushItems {
1776                        at: Position::new(ChunkIdentifier::new(42), 0),
1777                        items: vec![
1778                            make_test_event(room_id, "hello"),
1779                            make_test_event_with_event_id(room_id, "world", Some(event_id)),
1780                        ],
1781                    },
1782                    Update::ReplaceItem {
1783                        at: Position::new(ChunkIdentifier::new(42), 1),
1784                        item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1785                    },
1786                ],
1787            )
1788            .await
1789            .unwrap();
1790
1791        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1792
1793        assert_eq!(chunks.len(), 1);
1794
1795        let c = chunks.remove(0);
1796        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1797        assert_eq!(c.previous, None);
1798        assert_eq!(c.next, None);
1799        assert_matches!(c.content, ChunkContent::Items(events) => {
1800            assert_eq!(events.len(), 2);
1801            check_test_event(&events[0], "hello");
1802            check_test_event(&events[1], "yolo");
1803        });
1804    }
1805
1806    #[async_test]
1807    async fn test_linked_chunk_remove_chunk() {
1808        let store = get_event_cache_store().await.expect("creating cache store failed");
1809
1810        let room_id = &DEFAULT_TEST_ROOM_ID;
1811        let linked_chunk_id = LinkedChunkId::Room(room_id);
1812
1813        store
1814            .handle_linked_chunk_updates(
1815                linked_chunk_id,
1816                vec![
1817                    Update::NewGapChunk {
1818                        previous: None,
1819                        new: ChunkIdentifier::new(42),
1820                        next: None,
1821                        gap: Gap { prev_token: "raclette".to_owned() },
1822                    },
1823                    Update::NewGapChunk {
1824                        previous: Some(ChunkIdentifier::new(42)),
1825                        new: ChunkIdentifier::new(43),
1826                        next: None,
1827                        gap: Gap { prev_token: "fondue".to_owned() },
1828                    },
1829                    Update::NewGapChunk {
1830                        previous: Some(ChunkIdentifier::new(43)),
1831                        new: ChunkIdentifier::new(44),
1832                        next: None,
1833                        gap: Gap { prev_token: "tartiflette".to_owned() },
1834                    },
1835                    Update::RemoveChunk(ChunkIdentifier::new(43)),
1836                ],
1837            )
1838            .await
1839            .unwrap();
1840
1841        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1842
1843        assert_eq!(chunks.len(), 2);
1844
1845        // Chunks are ordered from smaller to bigger IDs.
1846        let c = chunks.remove(0);
1847        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1848        assert_eq!(c.previous, None);
1849        assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1850        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1851            assert_eq!(gap.prev_token, "raclette");
1852        });
1853
1854        let c = chunks.remove(0);
1855        assert_eq!(c.identifier, ChunkIdentifier::new(44));
1856        assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1857        assert_eq!(c.next, None);
1858        assert_matches!(c.content, ChunkContent::Gap(gap) => {
1859            assert_eq!(gap.prev_token, "tartiflette");
1860        });
1861
1862        // Check that cascading worked. Yes, SQLite, I doubt you.
1863        let gaps = store
1864            .read()
1865            .await
1866            .unwrap()
1867            .with_transaction(|txn| -> rusqlite::Result<_> {
1868                let mut gaps = Vec::new();
1869                for data in txn
1870                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1871                    .query_map((), |row| row.get::<_, u64>(0))?
1872                {
1873                    gaps.push(data?);
1874                }
1875                Ok(gaps)
1876            })
1877            .await
1878            .unwrap();
1879
1880        assert_eq!(gaps, vec![42, 44]);
1881    }
1882
1883    #[async_test]
1884    async fn test_linked_chunk_push_items() {
1885        let store = get_event_cache_store().await.expect("creating cache store failed");
1886
1887        let room_id = &DEFAULT_TEST_ROOM_ID;
1888        let linked_chunk_id = LinkedChunkId::Room(room_id);
1889
1890        store
1891            .handle_linked_chunk_updates(
1892                linked_chunk_id,
1893                vec![
1894                    Update::NewItemsChunk {
1895                        previous: None,
1896                        new: ChunkIdentifier::new(42),
1897                        next: None,
1898                    },
1899                    Update::PushItems {
1900                        at: Position::new(ChunkIdentifier::new(42), 0),
1901                        items: vec![
1902                            make_test_event(room_id, "hello"),
1903                            make_test_event(room_id, "world"),
1904                        ],
1905                    },
1906                    Update::PushItems {
1907                        at: Position::new(ChunkIdentifier::new(42), 2),
1908                        items: vec![make_test_event(room_id, "who?")],
1909                    },
1910                ],
1911            )
1912            .await
1913            .unwrap();
1914
1915        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1916
1917        assert_eq!(chunks.len(), 1);
1918
1919        let c = chunks.remove(0);
1920        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1921        assert_eq!(c.previous, None);
1922        assert_eq!(c.next, None);
1923        assert_matches!(c.content, ChunkContent::Items(events) => {
1924            assert_eq!(events.len(), 3);
1925
1926            check_test_event(&events[0], "hello");
1927            check_test_event(&events[1], "world");
1928            check_test_event(&events[2], "who?");
1929        });
1930    }
1931
1932    #[async_test]
1933    async fn test_linked_chunk_remove_item() {
1934        let store = get_event_cache_store().await.expect("creating cache store failed");
1935
1936        let room_id = *DEFAULT_TEST_ROOM_ID;
1937        let linked_chunk_id = LinkedChunkId::Room(room_id);
1938
1939        store
1940            .handle_linked_chunk_updates(
1941                linked_chunk_id,
1942                vec![
1943                    Update::NewItemsChunk {
1944                        previous: None,
1945                        new: ChunkIdentifier::new(42),
1946                        next: None,
1947                    },
1948                    Update::PushItems {
1949                        at: Position::new(ChunkIdentifier::new(42), 0),
1950                        items: vec![
1951                            make_test_event(room_id, "one"),
1952                            make_test_event(room_id, "two"),
1953                            make_test_event(room_id, "three"),
1954                            make_test_event(room_id, "four"),
1955                            make_test_event(room_id, "five"),
1956                            make_test_event(room_id, "six"),
1957                        ],
1958                    },
1959                    Update::RemoveItem {
1960                        at: Position::new(ChunkIdentifier::new(42), 2), /* "three" */
1961                    },
1962                ],
1963            )
1964            .await
1965            .unwrap();
1966
1967        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1968
1969        assert_eq!(chunks.len(), 1);
1970
1971        let c = chunks.remove(0);
1972        assert_eq!(c.identifier, ChunkIdentifier::new(42));
1973        assert_eq!(c.previous, None);
1974        assert_eq!(c.next, None);
1975        assert_matches!(c.content, ChunkContent::Items(events) => {
1976            assert_eq!(events.len(), 5);
1977            check_test_event(&events[0], "one");
1978            check_test_event(&events[1], "two");
1979            check_test_event(&events[2], "four");
1980            check_test_event(&events[3], "five");
1981            check_test_event(&events[4], "six");
1982        });
1983
1984        // Make sure the position have been updated for the remaining events.
1985        let num_rows: u64 = store
1986            .read()
1987            .await
1988            .unwrap()
1989            .with_transaction(move |txn| {
1990                txn.query_row(
1991                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1992                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1993                    |row| row.get(0),
1994                )
1995            })
1996            .await
1997            .unwrap();
1998        assert_eq!(num_rows, 3);
1999    }
2000
2001    #[async_test]
2002    async fn test_linked_chunk_detach_last_items() {
2003        let store = get_event_cache_store().await.expect("creating cache store failed");
2004
2005        let room_id = *DEFAULT_TEST_ROOM_ID;
2006        let linked_chunk_id = LinkedChunkId::Room(room_id);
2007
2008        store
2009            .handle_linked_chunk_updates(
2010                linked_chunk_id,
2011                vec![
2012                    Update::NewItemsChunk {
2013                        previous: None,
2014                        new: ChunkIdentifier::new(42),
2015                        next: None,
2016                    },
2017                    Update::PushItems {
2018                        at: Position::new(ChunkIdentifier::new(42), 0),
2019                        items: vec![
2020                            make_test_event(room_id, "hello"),
2021                            make_test_event(room_id, "world"),
2022                            make_test_event(room_id, "howdy"),
2023                        ],
2024                    },
2025                    Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2026                ],
2027            )
2028            .await
2029            .unwrap();
2030
2031        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2032
2033        assert_eq!(chunks.len(), 1);
2034
2035        let c = chunks.remove(0);
2036        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2037        assert_eq!(c.previous, None);
2038        assert_eq!(c.next, None);
2039        assert_matches!(c.content, ChunkContent::Items(events) => {
2040            assert_eq!(events.len(), 1);
2041            check_test_event(&events[0], "hello");
2042        });
2043    }
2044
2045    #[async_test]
2046    async fn test_linked_chunk_start_end_reattach_items() {
2047        let store = get_event_cache_store().await.expect("creating cache store failed");
2048
2049        let room_id = *DEFAULT_TEST_ROOM_ID;
2050        let linked_chunk_id = LinkedChunkId::Room(room_id);
2051
2052        // Same updates and checks as test_linked_chunk_push_items, but with extra
2053        // `StartReattachItems` and `EndReattachItems` updates, which must have no
2054        // effects.
2055        store
2056            .handle_linked_chunk_updates(
2057                linked_chunk_id,
2058                vec![
2059                    Update::NewItemsChunk {
2060                        previous: None,
2061                        new: ChunkIdentifier::new(42),
2062                        next: None,
2063                    },
2064                    Update::PushItems {
2065                        at: Position::new(ChunkIdentifier::new(42), 0),
2066                        items: vec![
2067                            make_test_event(room_id, "hello"),
2068                            make_test_event(room_id, "world"),
2069                            make_test_event(room_id, "howdy"),
2070                        ],
2071                    },
2072                    Update::StartReattachItems,
2073                    Update::EndReattachItems,
2074                ],
2075            )
2076            .await
2077            .unwrap();
2078
2079        let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2080
2081        assert_eq!(chunks.len(), 1);
2082
2083        let c = chunks.remove(0);
2084        assert_eq!(c.identifier, ChunkIdentifier::new(42));
2085        assert_eq!(c.previous, None);
2086        assert_eq!(c.next, None);
2087        assert_matches!(c.content, ChunkContent::Items(events) => {
2088            assert_eq!(events.len(), 3);
2089            check_test_event(&events[0], "hello");
2090            check_test_event(&events[1], "world");
2091            check_test_event(&events[2], "howdy");
2092        });
2093    }
2094
2095    #[async_test]
2096    async fn test_linked_chunk_clear() {
2097        let store = get_event_cache_store().await.expect("creating cache store failed");
2098
2099        let room_id = *DEFAULT_TEST_ROOM_ID;
2100        let linked_chunk_id = LinkedChunkId::Room(room_id);
2101        let event_0 = make_test_event(room_id, "hello");
2102        let event_1 = make_test_event(room_id, "world");
2103        let event_2 = make_test_event(room_id, "howdy");
2104
2105        store
2106            .handle_linked_chunk_updates(
2107                linked_chunk_id,
2108                vec![
2109                    Update::NewItemsChunk {
2110                        previous: None,
2111                        new: ChunkIdentifier::new(42),
2112                        next: None,
2113                    },
2114                    Update::NewGapChunk {
2115                        previous: Some(ChunkIdentifier::new(42)),
2116                        new: ChunkIdentifier::new(54),
2117                        next: None,
2118                        gap: Gap { prev_token: "fondue".to_owned() },
2119                    },
2120                    Update::PushItems {
2121                        at: Position::new(ChunkIdentifier::new(42), 0),
2122                        items: vec![event_0.clone(), event_1, event_2],
2123                    },
2124                    Update::Clear,
2125                ],
2126            )
2127            .await
2128            .unwrap();
2129
2130        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2131        assert!(chunks.is_empty());
2132
2133        // Check that cascading worked. Yes, SQLite, I doubt you.
2134        store
2135            .read()
2136            .await
2137            .unwrap()
2138            .with_transaction(|txn| -> rusqlite::Result<_> {
2139                let num_gaps = txn
2140                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2141                    .query_row((), |row| row.get::<_, u64>(0))?;
2142                assert_eq!(num_gaps, 0);
2143
2144                let num_events = txn
2145                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2146                    .query_row((), |row| row.get::<_, u64>(0))?;
2147                assert_eq!(num_events, 0);
2148
2149                Ok(())
2150            })
2151            .await
2152            .unwrap();
2153
2154        // It's okay to re-insert a past event.
2155        store
2156            .handle_linked_chunk_updates(
2157                linked_chunk_id,
2158                vec![
2159                    Update::NewItemsChunk {
2160                        previous: None,
2161                        new: ChunkIdentifier::new(42),
2162                        next: None,
2163                    },
2164                    Update::PushItems {
2165                        at: Position::new(ChunkIdentifier::new(42), 0),
2166                        items: vec![event_0],
2167                    },
2168                ],
2169            )
2170            .await
2171            .unwrap();
2172    }
2173
2174    #[async_test]
2175    async fn test_linked_chunk_multiple_rooms() {
2176        let store = get_event_cache_store().await.expect("creating cache store failed");
2177
2178        let room1 = room_id!("!realcheeselovers:raclette.fr");
2179        let linked_chunk_id1 = LinkedChunkId::Room(room1);
2180        let room2 = room_id!("!realcheeselovers:fondue.ch");
2181        let linked_chunk_id2 = LinkedChunkId::Room(room2);
2182
2183        // Check that applying updates to one room doesn't affect the others.
2184        // Use the same chunk identifier in both rooms to battle-test search.
2185
2186        store
2187            .handle_linked_chunk_updates(
2188                linked_chunk_id1,
2189                vec![
2190                    Update::NewItemsChunk {
2191                        previous: None,
2192                        new: ChunkIdentifier::new(42),
2193                        next: None,
2194                    },
2195                    Update::PushItems {
2196                        at: Position::new(ChunkIdentifier::new(42), 0),
2197                        items: vec![
2198                            make_test_event(room1, "best cheese is raclette"),
2199                            make_test_event(room1, "obviously"),
2200                        ],
2201                    },
2202                ],
2203            )
2204            .await
2205            .unwrap();
2206
2207        store
2208            .handle_linked_chunk_updates(
2209                linked_chunk_id2,
2210                vec![
2211                    Update::NewItemsChunk {
2212                        previous: None,
2213                        new: ChunkIdentifier::new(42),
2214                        next: None,
2215                    },
2216                    Update::PushItems {
2217                        at: Position::new(ChunkIdentifier::new(42), 0),
2218                        items: vec![make_test_event(room1, "beaufort is the best")],
2219                    },
2220                ],
2221            )
2222            .await
2223            .unwrap();
2224
2225        // Check chunks from room 1.
2226        let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2227        assert_eq!(chunks_room1.len(), 1);
2228
2229        let c = chunks_room1.remove(0);
2230        assert_matches!(c.content, ChunkContent::Items(events) => {
2231            assert_eq!(events.len(), 2);
2232            check_test_event(&events[0], "best cheese is raclette");
2233            check_test_event(&events[1], "obviously");
2234        });
2235
2236        // Check chunks from room 2.
2237        let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2238        assert_eq!(chunks_room2.len(), 1);
2239
2240        let c = chunks_room2.remove(0);
2241        assert_matches!(c.content, ChunkContent::Items(events) => {
2242            assert_eq!(events.len(), 1);
2243            check_test_event(&events[0], "beaufort is the best");
2244        });
2245    }
2246
2247    #[async_test]
2248    async fn test_linked_chunk_update_is_a_transaction() {
2249        let store = get_event_cache_store().await.expect("creating cache store failed");
2250
2251        let room_id = *DEFAULT_TEST_ROOM_ID;
2252        let linked_chunk_id = LinkedChunkId::Room(room_id);
2253
2254        // Trigger a violation of the unique constraint on the (room id, chunk id)
2255        // couple.
2256        let err = store
2257            .handle_linked_chunk_updates(
2258                linked_chunk_id,
2259                vec![
2260                    Update::NewItemsChunk {
2261                        previous: None,
2262                        new: ChunkIdentifier::new(42),
2263                        next: None,
2264                    },
2265                    Update::NewItemsChunk {
2266                        previous: None,
2267                        new: ChunkIdentifier::new(42),
2268                        next: None,
2269                    },
2270                ],
2271            )
2272            .await
2273            .unwrap_err();
2274
2275        // The operation fails with a constraint violation error.
2276        assert_matches!(err, crate::error::Error::Sqlite(err) => {
2277            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2278        });
2279
2280        // If the updates have been handled transactionally, then no new chunks should
2281        // have been added; failure of the second update leads to the first one being
2282        // rolled back.
2283        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2284        assert!(chunks.is_empty());
2285    }
2286
2287    #[async_test]
2288    async fn test_filter_duplicate_events_no_events() {
2289        let store = get_event_cache_store().await.expect("creating cache store failed");
2290
2291        let room_id = *DEFAULT_TEST_ROOM_ID;
2292        let linked_chunk_id = LinkedChunkId::Room(room_id);
2293        let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2294        assert!(duplicates.is_empty());
2295    }
2296
2297    #[async_test]
2298    async fn test_load_last_chunk() {
2299        let room_id = room_id!("!r0:matrix.org");
2300        let linked_chunk_id = LinkedChunkId::Room(room_id);
2301        let event = |msg: &str| make_test_event(room_id, msg);
2302        let store = get_event_cache_store().await.expect("creating cache store failed");
2303
2304        // Case #1: no last chunk.
2305        {
2306            let (last_chunk, chunk_identifier_generator) =
2307                store.load_last_chunk(linked_chunk_id).await.unwrap();
2308
2309            assert!(last_chunk.is_none());
2310            assert_eq!(chunk_identifier_generator.current(), 0);
2311        }
2312
2313        // Case #2: only one chunk is present.
2314        {
2315            store
2316                .handle_linked_chunk_updates(
2317                    linked_chunk_id,
2318                    vec![
2319                        Update::NewItemsChunk {
2320                            previous: None,
2321                            new: ChunkIdentifier::new(42),
2322                            next: None,
2323                        },
2324                        Update::PushItems {
2325                            at: Position::new(ChunkIdentifier::new(42), 0),
2326                            items: vec![event("saucisse de morteau"), event("comté")],
2327                        },
2328                    ],
2329                )
2330                .await
2331                .unwrap();
2332
2333            let (last_chunk, chunk_identifier_generator) =
2334                store.load_last_chunk(linked_chunk_id).await.unwrap();
2335
2336            assert_matches!(last_chunk, Some(last_chunk) => {
2337                assert_eq!(last_chunk.identifier, 42);
2338                assert!(last_chunk.previous.is_none());
2339                assert!(last_chunk.next.is_none());
2340                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2341                    assert_eq!(items.len(), 2);
2342                    check_test_event(&items[0], "saucisse de morteau");
2343                    check_test_event(&items[1], "comté");
2344                });
2345            });
2346            assert_eq!(chunk_identifier_generator.current(), 42);
2347        }
2348
2349        // Case #3: more chunks are present.
2350        {
2351            store
2352                .handle_linked_chunk_updates(
2353                    linked_chunk_id,
2354                    vec![
2355                        Update::NewItemsChunk {
2356                            previous: Some(ChunkIdentifier::new(42)),
2357                            new: ChunkIdentifier::new(7),
2358                            next: None,
2359                        },
2360                        Update::PushItems {
2361                            at: Position::new(ChunkIdentifier::new(7), 0),
2362                            items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2363                        },
2364                    ],
2365                )
2366                .await
2367                .unwrap();
2368
2369            let (last_chunk, chunk_identifier_generator) =
2370                store.load_last_chunk(linked_chunk_id).await.unwrap();
2371
2372            assert_matches!(last_chunk, Some(last_chunk) => {
2373                assert_eq!(last_chunk.identifier, 7);
2374                assert_matches!(last_chunk.previous, Some(previous) => {
2375                    assert_eq!(previous, 42);
2376                });
2377                assert!(last_chunk.next.is_none());
2378                assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2379                    assert_eq!(items.len(), 3);
2380                    check_test_event(&items[0], "fondue");
2381                    check_test_event(&items[1], "gruyère");
2382                    check_test_event(&items[2], "mont d'or");
2383                });
2384            });
2385            assert_eq!(chunk_identifier_generator.current(), 42);
2386        }
2387    }
2388
2389    #[async_test]
2390    async fn test_load_last_chunk_with_a_cycle() {
2391        let room_id = room_id!("!r0:matrix.org");
2392        let linked_chunk_id = LinkedChunkId::Room(room_id);
2393        let store = get_event_cache_store().await.expect("creating cache store failed");
2394
2395        store
2396            .handle_linked_chunk_updates(
2397                linked_chunk_id,
2398                vec![
2399                    Update::NewItemsChunk {
2400                        previous: None,
2401                        new: ChunkIdentifier::new(0),
2402                        next: None,
2403                    },
2404                    Update::NewItemsChunk {
2405                        // Because `previous` connects to chunk #0, it will create a cycle.
2406                        // Chunk #0 will have a `next` set to chunk #1! Consequently, the last chunk
2407                        // **does not exist**. We have to detect this cycle.
2408                        previous: Some(ChunkIdentifier::new(0)),
2409                        new: ChunkIdentifier::new(1),
2410                        next: Some(ChunkIdentifier::new(0)),
2411                    },
2412                ],
2413            )
2414            .await
2415            .unwrap();
2416
2417        store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2418    }
2419
2420    #[async_test]
2421    async fn test_load_previous_chunk() {
2422        let room_id = room_id!("!r0:matrix.org");
2423        let linked_chunk_id = LinkedChunkId::Room(room_id);
2424        let event = |msg: &str| make_test_event(room_id, msg);
2425        let store = get_event_cache_store().await.expect("creating cache store failed");
2426
2427        // Case #1: no chunk at all, equivalent to having an nonexistent
2428        // `before_chunk_identifier`.
2429        {
2430            let previous_chunk = store
2431                .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2432                .await
2433                .unwrap();
2434
2435            assert!(previous_chunk.is_none());
2436        }
2437
2438        // Case #2: there is one chunk only: we request the previous on this
2439        // one, it doesn't exist.
2440        {
2441            store
2442                .handle_linked_chunk_updates(
2443                    linked_chunk_id,
2444                    vec![Update::NewItemsChunk {
2445                        previous: None,
2446                        new: ChunkIdentifier::new(42),
2447                        next: None,
2448                    }],
2449                )
2450                .await
2451                .unwrap();
2452
2453            let previous_chunk =
2454                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2455
2456            assert!(previous_chunk.is_none());
2457        }
2458
2459        // Case #3: there are two chunks.
2460        {
2461            store
2462                .handle_linked_chunk_updates(
2463                    linked_chunk_id,
2464                    vec![
2465                        // new chunk before the one that exists.
2466                        Update::NewItemsChunk {
2467                            previous: None,
2468                            new: ChunkIdentifier::new(7),
2469                            next: Some(ChunkIdentifier::new(42)),
2470                        },
2471                        Update::PushItems {
2472                            at: Position::new(ChunkIdentifier::new(7), 0),
2473                            items: vec![event("brigand du jorat"), event("morbier")],
2474                        },
2475                    ],
2476                )
2477                .await
2478                .unwrap();
2479
2480            let previous_chunk =
2481                store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2482
2483            assert_matches!(previous_chunk, Some(previous_chunk) => {
2484                assert_eq!(previous_chunk.identifier, 7);
2485                assert!(previous_chunk.previous.is_none());
2486                assert_matches!(previous_chunk.next, Some(next) => {
2487                    assert_eq!(next, 42);
2488                });
2489                assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2490                    assert_eq!(items.len(), 2);
2491                    check_test_event(&items[0], "brigand du jorat");
2492                    check_test_event(&items[1], "morbier");
2493                });
2494            });
2495        }
2496    }
2497}
2498
2499#[cfg(test)]
2500mod encrypted_tests {
2501    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2502
2503    use matrix_sdk_base::{
2504        event_cache::store::{EventCacheStore, EventCacheStoreError},
2505        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2506    };
2507    use matrix_sdk_test::{async_test, event_factory::EventFactory};
2508    use once_cell::sync::Lazy;
2509    use ruma::{
2510        event_id,
2511        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2512        room_id, user_id,
2513    };
2514    use tempfile::{tempdir, TempDir};
2515
2516    use super::SqliteEventCacheStore;
2517
2518    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2519    static NUM: AtomicU32 = AtomicU32::new(0);
2520
2521    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2522        let name = NUM.fetch_add(1, SeqCst).to_string();
2523        let tmpdir_path = TMP_DIR.path().join(name);
2524
2525        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2526
2527        Ok(SqliteEventCacheStore::open(
2528            tmpdir_path.to_str().unwrap(),
2529            Some("default_test_password"),
2530        )
2531        .await
2532        .unwrap())
2533    }
2534
2535    event_cache_store_integration_tests!();
2536    event_cache_store_integration_tests_time!();
2537
2538    #[async_test]
2539    async fn test_no_sqlite_injection_in_find_event_relations() {
2540        let room_id = room_id!("!test:localhost");
2541        let another_room_id = room_id!("!r1:matrix.org");
2542        let sender = user_id!("@alice:localhost");
2543
2544        let store = get_event_cache_store()
2545            .await
2546            .expect("We should be able to create a new, empty, event cache store");
2547
2548        let f = EventFactory::new().room(room_id).sender(sender);
2549
2550        // Create an event for the first room.
2551        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2552        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2553
2554        // Create a related event.
2555        let edit_id = event_id!("$find_me:matrix.org");
2556        let edit = f
2557            .text_msg("Find me")
2558            .event_id(edit_id)
2559            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2560            .into_event();
2561
2562        // Create an event for the second room.
2563        let f = f.room(another_room_id);
2564
2565        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2566        let another_event =
2567            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2568
2569        // Save the events in the DB.
2570        store.save_event(room_id, event).await.unwrap();
2571        store.save_event(room_id, edit).await.unwrap();
2572        store.save_event(another_room_id, another_event).await.unwrap();
2573
2574        // Craft a `RelationType` that will inject some SQL to be executed. The
2575        // `OR 1=1` ensures that all the previous parameters, the room
2576        // ID and event ID are ignored.
2577        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2578
2579        // Attempt to find events in the first room.
2580        let results = store
2581            .find_event_relations(room_id, event_id, filter.as_deref())
2582            .await
2583            .expect("We should be able to attempt to find event relations");
2584
2585        // Ensure that we only got the single related event the first room contains.
2586        similar_asserts::assert_eq!(
2587            results.len(),
2588            1,
2589            "We should only have loaded events for the first room {results:#?}"
2590        );
2591
2592        // The event needs to be the edit event, otherwise something is wrong.
2593        let (found_event, _) = &results[0];
2594        assert_eq!(
2595            found_event.event_id().as_deref(),
2596            Some(edit_id),
2597            "The single event we found should be the edit event"
2598        );
2599    }
2600}