Skip to main content

matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{
18    collections::HashMap,
19    fmt,
20    iter::once,
21    path::{Path, PathBuf},
22    sync::Arc,
23};
24
25use async_trait::async_trait;
26use deadpool::managed::PoolConfig;
27use matrix_sdk_base::{
28    cross_process_lock::CrossProcessLockGeneration,
29    deserialized_responses::TimelineEvent,
30    event_cache::{
31        Event, Gap,
32        store::{EventCacheStore, extract_event_relation},
33    },
34    linked_chunk::{
35        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
36        Position, RawChunk, Update,
37    },
38    timer,
39};
40use matrix_sdk_store_encryption::StoreCipher;
41use ruma::{
42    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
43};
44use rusqlite::{
45    OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
46};
47use tokio::{
48    fs,
49    sync::{Mutex, OwnedMutexGuard},
50};
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54    OpenStoreError, RuntimeConfig, Secret, SqliteStoreConfig,
55    connection::{self, Connection as SqliteAsyncConn, Pool as SqlitePool, SqliteConnections},
56    error::{Error, Result},
57    utils::{
58        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
59        SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
60    },
61};
62
63mod keys {
64    // Tables
65    pub const LINKED_CHUNKS: &str = "linked_chunks";
66    pub const EVENTS: &str = "events";
67}
68
69/// The database name.
70const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
71
72/// The string used to identify a chunk of type events, in the `type` field in
73/// the database.
74const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75/// The string used to identify a chunk of type gap, in the `type` field in the
76/// database.
77const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79/// An SQLite-based event cache store.
80#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82    store_cipher: Option<Arc<StoreCipher>>,
83
84    /// `Some` when active, `None` when closed.
85    connections: Arc<Mutex<Option<SqliteConnections>>>,
86
87    /// Retained so we can rebuild the pool on reopen.
88    db_path: PathBuf,
89
90    /// Retained so we can rebuild the pool on reopen.
91    pool_config: PoolConfig,
92
93    /// Retained so we can re-apply runtime config on reopen.
94    runtime_config: RuntimeConfig,
95}
96
97#[cfg(not(tarpaulin_include))]
98impl fmt::Debug for SqliteEventCacheStore {
99    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
101    }
102}
103
104impl EncryptableStore for SqliteEventCacheStore {
105    fn get_cypher(&self) -> Option<&StoreCipher> {
106        self.store_cipher.as_deref()
107    }
108}
109
110impl SqliteEventCacheStore {
111    /// Open the SQLite-based event cache store at the given path using the
112    /// given passphrase to encrypt private data.
113    pub async fn open(
114        path: impl AsRef<Path>,
115        passphrase: Option<&str>,
116    ) -> Result<Self, OpenStoreError> {
117        Self::open_with_config(&SqliteStoreConfig::new(path).passphrase(passphrase)).await
118    }
119
120    /// Open the SQLite-based event cache store at the given path using the
121    /// given key to encrypt private data.
122    pub async fn open_with_key(
123        path: impl AsRef<Path>,
124        key: Option<&[u8; 32]>,
125    ) -> Result<Self, OpenStoreError> {
126        Self::open_with_config(&SqliteStoreConfig::new(path).key(key)).await
127    }
128
129    /// Open the SQLite-based event cache store with the config open config.
130    #[instrument(skip(config), fields(path = ?config.path))]
131    pub async fn open_with_config(config: &SqliteStoreConfig) -> Result<Self, OpenStoreError> {
132        debug!(?config);
133
134        let _timer = timer!("open_with_config");
135
136        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
137
138        let db_path = config.path.join(DATABASE_NAME);
139        let pool_config = config.pool_config();
140        let runtime_config = config.runtime_config();
141
142        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
143
144        let this =
145            Self::open_with_pool(pool, db_path, pool_config, runtime_config, config.secret.clone())
146                .await?;
147
148        // Apply runtime config on the write connection.
149        this.write().await?.apply_runtime_config(runtime_config).await?;
150
151        Ok(this)
152    }
153
154    /// Open an SQLite-based event cache store using the given SQLite database
155    /// pool. The given secret will be used to encrypt private data.
156    async fn open_with_pool(
157        pool: SqlitePool,
158        db_path: PathBuf,
159        pool_config: PoolConfig,
160        runtime_config: RuntimeConfig,
161        secret: Option<Secret>,
162    ) -> Result<Self, OpenStoreError> {
163        let conn = pool.get().await?;
164
165        let version = conn.db_version().await?;
166
167        run_migrations(&conn, version).await?;
168
169        conn.wal_checkpoint().await;
170
171        let store_cipher = match secret {
172            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
173            None => None,
174        };
175
176        let connections = SqliteConnections {
177            pool,
178            // Use `conn` as our selected write connection.
179            write_connection: Arc::new(Mutex::new(conn)),
180        };
181
182        Ok(Self {
183            store_cipher,
184            connections: Arc::new(Mutex::new(Some(connections))),
185            db_path,
186            pool_config,
187            runtime_config,
188        })
189    }
190
191    /// Acquire a connection for executing read operations.
192    #[instrument(skip_all)]
193    async fn read(&self) -> Result<SqliteAsyncConn> {
194        let pool = {
195            let guard = self.connections.lock().await;
196            let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
197            conns.pool.clone()
198        };
199
200        let connection = pool.get().await?;
201
202        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
203        // support must be enabled on a per-connection basis. Execute it every
204        // time we try to get a connection, since we can't guarantee a previous
205        // connection did enable it before.
206        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
207
208        Ok(connection)
209    }
210
211    /// Acquire a connection for executing write operations.
212    #[instrument(skip_all)]
213    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
214        let write_connection = {
215            let guard = self.connections.lock().await;
216            let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
217            conns.write_connection.clone()
218        };
219
220        let connection = write_connection.lock_owned().await;
221
222        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
223        // support must be enabled on a per-connection basis. Execute it every
224        // time we try to get a connection, since we can't guarantee a previous
225        // connection did enable it before.
226        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
227
228        Ok(connection)
229    }
230
231    fn map_row_to_chunk(
232        row: &rusqlite::Row<'_>,
233    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
234        Ok((
235            row.get::<_, u64>(0)?,
236            row.get::<_, Option<u64>>(1)?,
237            row.get::<_, Option<u64>>(2)?,
238            row.get::<_, String>(3)?,
239        ))
240    }
241
242    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
243        let serialized = serde_json::to_vec(event)?;
244
245        // Extract the relationship info here.
246        let raw_event = event.raw();
247        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
248
249        // The content may be encrypted.
250        let content = self.encode_value(serialized)?;
251
252        Ok(EncodedEvent {
253            content,
254            rel_type,
255            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
256        })
257    }
258
259    pub async fn vacuum(&self) -> Result<()> {
260        let write_connection = {
261            let guard = self.connections.lock().await;
262            let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
263            conns.write_connection.clone()
264        };
265        write_connection.lock().await.vacuum().await
266    }
267
268    async fn get_db_size(&self) -> Result<Option<usize>> {
269        let pool = {
270            let guard = self.connections.lock().await;
271            let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
272            conns.pool.clone()
273        };
274        Ok(Some(pool.get().await?.get_db_size().await?))
275    }
276
277    pub async fn close(&self) -> Result<()> {
278        connection::close_connections(&self.connections, "Event cache store").await;
279        Ok(())
280    }
281
282    pub async fn reopen(&self) -> Result<()> {
283        connection::reopen_connections(
284            &self.connections,
285            self.db_path.clone(),
286            self.pool_config,
287            self.runtime_config,
288        )
289        .await?;
290        Ok(())
291    }
292
293    /// Returns the pool size status, for testing purposes.
294    #[cfg(test)]
295    async fn pool_max_size(&self) -> Option<usize> {
296        let guard = self.connections.lock().await;
297        guard.as_ref().map(|conns| conns.pool.status().max_size)
298    }
299}
300
301struct EncodedEvent {
302    content: Vec<u8>,
303    rel_type: Option<String>,
304    relates_to: Option<String>,
305}
306
307trait TransactionExtForLinkedChunks {
308    fn rebuild_chunk(
309        &self,
310        store: &SqliteEventCacheStore,
311        linked_chunk_id: &Key,
312        previous: Option<u64>,
313        index: u64,
314        next: Option<u64>,
315        chunk_type: &str,
316    ) -> Result<RawChunk<Event, Gap>>;
317
318    fn load_gap_content(
319        &self,
320        store: &SqliteEventCacheStore,
321        linked_chunk_id: &Key,
322        chunk_id: ChunkIdentifier,
323    ) -> Result<Gap>;
324
325    fn load_events_content(
326        &self,
327        store: &SqliteEventCacheStore,
328        linked_chunk_id: &Key,
329        chunk_id: ChunkIdentifier,
330    ) -> Result<Vec<Event>>;
331}
332
333impl TransactionExtForLinkedChunks for Transaction<'_> {
334    fn rebuild_chunk(
335        &self,
336        store: &SqliteEventCacheStore,
337        linked_chunk_id: &Key,
338        previous: Option<u64>,
339        id: u64,
340        next: Option<u64>,
341        chunk_type: &str,
342    ) -> Result<RawChunk<Event, Gap>> {
343        let previous = previous.map(ChunkIdentifier::new);
344        let next = next.map(ChunkIdentifier::new);
345        let id = ChunkIdentifier::new(id);
346
347        match chunk_type {
348            CHUNK_TYPE_GAP_TYPE_STRING => {
349                // It's a gap!
350                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
351                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
352            }
353
354            CHUNK_TYPE_EVENT_TYPE_STRING => {
355                // It's events!
356                let events = self.load_events_content(store, linked_chunk_id, id)?;
357                Ok(RawChunk {
358                    content: ChunkContent::Items(events),
359                    previous,
360                    identifier: id,
361                    next,
362                })
363            }
364
365            other => {
366                // It's an error!
367                Err(Error::InvalidData {
368                    details: format!("a linked chunk has an unknown type {other}"),
369                })
370            }
371        }
372    }
373
374    fn load_gap_content(
375        &self,
376        store: &SqliteEventCacheStore,
377        linked_chunk_id: &Key,
378        chunk_id: ChunkIdentifier,
379    ) -> Result<Gap> {
380        // There's at most one row for it in the database, so a call to `query_row` is
381        // sufficient.
382        let encoded_prev_token: Vec<u8> = self.query_row(
383            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
384            (chunk_id.index(), &linked_chunk_id),
385            |row| row.get(0),
386        )?;
387        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
388        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
389        Ok(Gap { token: prev_token })
390    }
391
392    fn load_events_content(
393        &self,
394        store: &SqliteEventCacheStore,
395        linked_chunk_id: &Key,
396        chunk_id: ChunkIdentifier,
397    ) -> Result<Vec<Event>> {
398        // Retrieve all the events from the database.
399        let mut events = Vec::new();
400
401        for event_data in self
402            .prepare(
403                r#"
404                    SELECT events.content
405                    FROM event_chunks ec, events
406                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
407                    ORDER BY ec.position ASC
408                "#,
409            )?
410            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
411        {
412            let encoded_content = event_data?;
413            let serialized_content = store.decode_value(&encoded_content)?;
414            let event = serde_json::from_slice(&serialized_content)?;
415
416            events.push(event);
417        }
418
419        Ok(events)
420    }
421}
422
423/// Run migrations for the given version of the database.
424async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
425    // Always enable foreign keys for the current connection.
426    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
427
428    if version < 1 {
429        debug!("Creating database");
430        // First turn on WAL mode, this can't be done in the transaction, it fails with
431        // the error message: "cannot change into wal mode from within a transaction".
432        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
433        conn.with_transaction(|txn| {
434            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
435            txn.set_db_version(1)
436        })
437        .await?;
438    }
439
440    if version < 2 {
441        debug!("Upgrading database to version 2");
442        conn.with_transaction(|txn| {
443            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
444            txn.set_db_version(2)
445        })
446        .await?;
447    }
448
449    if version < 3 {
450        debug!("Upgrading database to version 3");
451        conn.with_transaction(|txn| {
452            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
453            txn.set_db_version(3)
454        })
455        .await?;
456    }
457
458    if version < 4 {
459        debug!("Upgrading database to version 4");
460        conn.with_transaction(|txn| {
461            txn.execute_batch(include_str!(
462                "../migrations/event_cache_store/004_ignore_policy.sql"
463            ))?;
464            txn.set_db_version(4)
465        })
466        .await?;
467    }
468
469    if version < 5 {
470        debug!("Upgrading database to version 5");
471        conn.with_transaction(|txn| {
472            txn.execute_batch(include_str!(
473                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
474            ))?;
475            txn.set_db_version(5)
476        })
477        .await?;
478    }
479
480    if version < 6 {
481        debug!("Upgrading database to version 6");
482        conn.with_transaction(|txn| {
483            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
484            txn.set_db_version(6)
485        })
486        .await?;
487    }
488
489    if version < 7 {
490        debug!("Upgrading database to version 7");
491        conn.with_transaction(|txn| {
492            txn.execute_batch(include_str!(
493                "../migrations/event_cache_store/007_event_chunks.sql"
494            ))?;
495            txn.set_db_version(7)
496        })
497        .await?;
498    }
499
500    if version < 8 {
501        debug!("Upgrading database to version 8");
502        conn.with_transaction(|txn| {
503            txn.execute_batch(include_str!(
504                "../migrations/event_cache_store/008_linked_chunk_id.sql"
505            ))?;
506            txn.set_db_version(8)
507        })
508        .await?;
509    }
510
511    if version < 9 {
512        debug!("Upgrading database to version 9");
513        conn.with_transaction(|txn| {
514            txn.execute_batch(include_str!(
515                "../migrations/event_cache_store/009_related_event_index.sql"
516            ))?;
517            txn.set_db_version(9)
518        })
519        .await?;
520    }
521
522    if version < 10 {
523        debug!("Upgrading database to version 10");
524        conn.with_transaction(|txn| {
525            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
526            txn.set_db_version(10)
527        })
528        .await?;
529
530        if version >= 1 {
531            // Defragment the DB and optimize its size on the filesystem now that we removed
532            // the media cache.
533            conn.vacuum().await?;
534        }
535    }
536
537    if version < 11 {
538        debug!("Upgrading database to version 11");
539        conn.with_transaction(|txn| {
540            txn.execute_batch(include_str!(
541                "../migrations/event_cache_store/011_empty_event_cache.sql"
542            ))?;
543            txn.set_db_version(11)
544        })
545        .await?;
546    }
547
548    if version < 12 {
549        debug!("Upgrading database to version 12");
550        conn.with_transaction(|txn| {
551            txn.execute_batch(include_str!(
552                "../migrations/event_cache_store/012_store_event_type.sql"
553            ))?;
554            txn.set_db_version(12)
555        })
556        .await?;
557    }
558
559    if version < 13 {
560        debug!("Upgrading database to version 13");
561        conn.with_transaction(|txn| {
562            txn.execute_batch(include_str!(
563                "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
564            ))?;
565            txn.set_db_version(13)
566        })
567        .await?;
568    }
569
570    if version < 14 {
571        debug!("Upgrading database to version 14");
572        conn.with_transaction(|txn| {
573            txn.execute_batch(include_str!(
574                "../migrations/event_cache_store/014_event_chunks_event_id_index.sql"
575            ))?;
576            txn.set_db_version(14)
577        })
578        .await?;
579    }
580
581    Ok(())
582}
583
584#[async_trait]
585impl EventCacheStore for SqliteEventCacheStore {
586    type Error = Error;
587
588    #[instrument(skip(self))]
589    async fn try_take_leased_lock(
590        &self,
591        lease_duration_ms: u32,
592        key: &str,
593        holder: &str,
594    ) -> Result<Option<CrossProcessLockGeneration>> {
595        let key = key.to_owned();
596        let holder = holder.to_owned();
597
598        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
599        let expiration = now + lease_duration_ms as u64;
600
601        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
602        let generation = self
603            .write()
604            .await?
605            .with_transaction(move |txn| {
606                txn.query_row(
607                    "INSERT INTO lease_locks (key, holder, expiration)
608                    VALUES (?1, ?2, ?3)
609                    ON CONFLICT (key)
610                    DO
611                        UPDATE SET
612                            holder = excluded.holder,
613                            expiration = excluded.expiration,
614                            generation =
615                                CASE holder
616                                    WHEN excluded.holder THEN generation
617                                    ELSE generation + 1
618                                END
619                        WHERE
620                            holder = excluded.holder
621                            OR expiration < ?4
622                    RETURNING generation
623                    ",
624                    (key, holder, expiration, now),
625                    |row| row.get(0),
626                )
627                .optional()
628            })
629            .await?;
630
631        Ok(generation)
632    }
633
634    #[instrument(skip(self, updates))]
635    async fn handle_linked_chunk_updates(
636        &self,
637        linked_chunk_id: LinkedChunkId<'_>,
638        updates: Vec<Update<Event, Gap>>,
639    ) -> Result<(), Self::Error> {
640        let _timer = timer!("method");
641
642        // Use a single transaction throughout this function, so that either all updates
643        // work, or none is taken into account.
644        let hashed_linked_chunk_id =
645            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
646        let linked_chunk_id = linked_chunk_id.to_owned();
647        let this = self.clone();
648
649        with_immediate_transaction(self, move |txn| {
650            for up in updates {
651                match up {
652                    Update::NewItemsChunk { previous, new, next } => {
653                        let previous = previous.as_ref().map(ChunkIdentifier::index);
654                        let new = new.index();
655                        let next = next.as_ref().map(ChunkIdentifier::index);
656
657                        trace!(
658                            %linked_chunk_id,
659                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
660                        );
661
662                        insert_chunk(
663                            txn,
664                            &hashed_linked_chunk_id,
665                            previous,
666                            new,
667                            next,
668                            CHUNK_TYPE_EVENT_TYPE_STRING,
669                        )?;
670                    }
671
672                    Update::NewGapChunk { previous, new, next, gap } => {
673                        let serialized = serde_json::to_vec(&gap.token)?;
674                        let prev_token = this.encode_value(serialized)?;
675
676                        let previous = previous.as_ref().map(ChunkIdentifier::index);
677                        let new = new.index();
678                        let next = next.as_ref().map(ChunkIdentifier::index);
679
680                        trace!(
681                            %linked_chunk_id,
682                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
683                        );
684
685                        // Insert the chunk as a gap.
686                        insert_chunk(
687                            txn,
688                            &hashed_linked_chunk_id,
689                            previous,
690                            new,
691                            next,
692                            CHUNK_TYPE_GAP_TYPE_STRING,
693                        )?;
694
695                        // Insert the gap's value.
696                        txn.execute(
697                            r#"
698                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
699                            VALUES (?, ?, ?)
700                        "#,
701                            (new, &hashed_linked_chunk_id, prev_token),
702                        )?;
703                    }
704
705                    Update::RemoveChunk(chunk_identifier) => {
706                        let chunk_id = chunk_identifier.index();
707
708                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
709
710                        // Find chunk to delete.
711                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
712                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
713                            (chunk_id, &hashed_linked_chunk_id),
714                            |row| Ok((row.get(0)?, row.get(1)?))
715                        )?;
716
717                        // Replace its previous' next to its own next.
718                        if let Some(previous) = previous {
719                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
720                        }
721
722                        // Replace its next' previous to its own previous.
723                        if let Some(next) = next {
724                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
725                        }
726
727                        // Now delete it, and let cascading delete corresponding entries in the
728                        // other data tables.
729                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
730                    }
731
732                    Update::PushItems { at, items } => {
733                        if items.is_empty() {
734                            // Should never happens, but better be safe.
735                            continue;
736                        }
737
738                        let chunk_id = at.chunk_identifier().index();
739
740                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
741
742                        let mut chunk_statement = txn.prepare(
743                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
744                        )?;
745
746                        // Note: we use `OR REPLACE` here, because the event might have been
747                        // already inserted in the database. This is the case when an event is
748                        // deduplicated and moved to another position; or because it was inserted
749                        // outside the context of a linked chunk (e.g. pinned event).
750                        let mut content_statement = txn.prepare(
751                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
752                        )?;
753
754                        let invalid_event = |event: TimelineEvent| {
755                            let Some(event_id) = event.event_id() else {
756                                error!(%linked_chunk_id, "Trying to push an event with no ID");
757                                return None;
758                            };
759
760                            let Some(event_type) = event.kind.event_type() else {
761                                error!(%event_id, "Trying to save an event with no event type");
762                                return None;
763                            };
764
765                            Some((event_id.to_string(), event_type, event))
766                        };
767
768                        let room_id = linked_chunk_id.room_id();
769                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
770
771                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
772                            // Insert the location information into the database.
773                            let index = at.index() + i;
774                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
775
776                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
777                            let event_type = this.encode_key(keys::EVENTS, event_type);
778
779                            // Now, insert the event content into the database.
780                            let encoded_event = this.encode_event(&event)?;
781                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
782                        }
783                    }
784
785                    Update::ReplaceItem { at, item: event } => {
786                        let chunk_id = at.chunk_identifier().index();
787
788                        let index = at.index();
789
790                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
791
792                        // The event id should be the same, but just in case it changed…
793                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
794                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
795                            continue;
796                        };
797
798                        let Some(event_type) = event.kind.event_type() else {
799                            error!(%event_id, "Trying to save an event with no event type");
800                            continue;
801                        };
802
803                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
804                        let event_type = this.encode_key(keys::EVENTS, event_type);
805
806                        // Replace the event's content. Really we'd like to update, but in case the
807                        // event id changed, we are a bit lenient here and will allow an insertion
808                        // of the new event.
809                        let encoded_event = this.encode_event(&event)?;
810                        let room_id = linked_chunk_id.room_id();
811                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
812                        txn.execute(
813                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
814                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
815
816                        // Replace the event id in the linked chunk, in case it changed.
817                        txn.execute(
818                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
819                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
820                        )?;
821                    }
822
823                    Update::RemoveItem { at } => {
824                        let chunk_id = at.chunk_identifier().index();
825                        let index = at.index();
826
827                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
828
829                        // Remove the entry in the chunk table.
830                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
831
832                        // Decrement the index of each item after the one we are
833                        // going to remove.
834                        //
835                        // Imagine we have the following events:
836                        //
837                        // | event_id | linked_chunk_id | chunk_id | position |
838                        // |----------|-----------------|----------|----------|
839                        // | $ev0     | !r0             | 42       | 0        |
840                        // | $ev1     | !r0             | 42       | 1        |
841                        // | $ev2     | !r0             | 42       | 2        |
842                        // | $ev3     | !r0             | 42       | 3        |
843                        // | $ev4     | !r0             | 42       | 4        |
844                        //
845                        // `$ev2` has been removed, then we end up in this
846                        // state:
847                        //
848                        // | event_id | linked_chunk_id    | chunk_id | position |
849                        // |----------|--------------------|----------|----------|
850                        // | $ev0     | !r0                | 42       | 0        |
851                        // | $ev1     | !r0                | 42       | 1        |
852                        // |          |                    |          |          | <- no more `$ev2`
853                        // | $ev3     | !r0                | 42       | 3        |
854                        // | $ev4     | !r0                | 42       | 4        |
855                        //
856                        // We need to shift the `position` of `$ev3` and `$ev4`
857                        // to `position - 1`, like so:
858                        //
859                        // | event_id | linked_chunk_id | chunk_id | position |
860                        // |----------|-----------------|----------|----------|
861                        // | $ev0     | !r0             | 42       | 0        |
862                        // | $ev1     | !r0             | 42       | 1        |
863                        // | $ev3     | !r0             | 42       | 2        |
864                        // | $ev4     | !r0             | 42       | 3        |
865                        //
866                        // Usually, it boils down to run the following query:
867                        //
868                        // ```sql
869                        // UPDATE event_chunks
870                        // SET position = position - 1
871                        // WHERE position > 2 AND …
872                        // ```
873                        //
874                        // Okay. But `UPDATE` runs on rows in no particular
875                        // order. It means that it can update `$ev4` before
876                        // `$ev3` for example. What happens in this particular
877                        // case? The `position` of `$ev4` becomes `3`, however
878                        // `$ev3` already has `position = 3`. Because there
879                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
880                        // position)`, it will result in a constraint violation.
881                        //
882                        // There is **no way** to control the execution order of
883                        // `UPDATE` in SQLite. To persuade yourself, try:
884                        //
885                        // ```sql
886                        // UPDATE event_chunks
887                        // SET position = position - 1
888                        // FROM (
889                        //     SELECT event_id
890                        //     FROM event_chunks
891                        //     WHERE position > 2 AND …
892                        //     ORDER BY position ASC
893                        // ) as ordered
894                        // WHERE event_chunks.event_id = ordered.event_id
895                        // ```
896                        //
897                        // It will fail the same way.
898                        //
899                        // Thus, we have 2 solutions:
900                        //
901                        // 1. Remove the `UNIQUE` constraint,
902                        // 2. Be creative.
903                        //
904                        // The `UNIQUE` constraint is a safe belt. Normally, we
905                        // have `event_cache::Deduplicator` that is responsible
906                        // to ensure there is no duplicated event. However,
907                        // relying on this is “fragile” in the sense it can
908                        // contain bugs. Relying on the `UNIQUE` constraint from
909                        // SQLite is more robust. It's “braces and belt” as we
910                        // say here.
911                        //
912                        // So. We need to be creative.
913                        //
914                        // Many solutions exist. Amongst the most popular, we
915                        // see _dropping and re-creating the index_, which is
916                        // no-go for us, it's too expensive. I (@hywan) have
917                        // adopted the following one:
918                        //
919                        // - Do `position = position - 1` but in the negative
920                        //   space, so `position = -(position - 1)`. A position
921                        //   cannot be negative; we are sure it is unique!
922                        // - Once all candidate rows are updated, do `position =
923                        //   -position` to move back to the positive space.
924                        //
925                        // 'told you it's gonna be creative.
926                        //
927                        // This solution is a hack, **but** it is a small
928                        // number of operations, and we can keep the `UNIQUE`
929                        // constraint in place.
930                        txn.execute(
931                            r#"
932                                UPDATE event_chunks
933                                SET position = -(position - 1)
934                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
935                            "#,
936                            (&hashed_linked_chunk_id, chunk_id, index)
937                        )?;
938                        txn.execute(
939                            r#"
940                                UPDATE event_chunks
941                                SET position = -position
942                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
943                            "#,
944                            (&hashed_linked_chunk_id, chunk_id)
945                        )?;
946                    }
947
948                    Update::DetachLastItems { at } => {
949                        let chunk_id = at.chunk_identifier().index();
950                        let index = at.index();
951
952                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
953
954                        // Remove these entries.
955                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
956                    }
957
958                    Update::Clear => {
959                        trace!(%linked_chunk_id, "clearing items");
960
961                        // Remove chunks, and let cascading do its job.
962                        txn.execute(
963                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
964                            (&hashed_linked_chunk_id,),
965                        )?;
966                    }
967
968                    Update::StartReattachItems | Update::EndReattachItems => {
969                        // Nothing.
970                    }
971                }
972            }
973
974            Ok(())
975        })
976        .await?;
977
978        Ok(())
979    }
980
981    #[instrument(skip(self))]
982    async fn load_all_chunks(
983        &self,
984        linked_chunk_id: LinkedChunkId<'_>,
985    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
986        let _timer = timer!("method");
987
988        let hashed_linked_chunk_id =
989            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
990
991        let this = self.clone();
992
993        let result = self
994            .read()
995            .await?
996            .with_transaction(move |txn| -> Result<_> {
997                let mut items = Vec::new();
998
999                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
1000                for data in txn
1001                    .prepare(
1002                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
1003                    )?
1004                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
1005                {
1006                    let (id, previous, next, chunk_type) = data?;
1007                    let new = txn.rebuild_chunk(
1008                        &this,
1009                        &hashed_linked_chunk_id,
1010                        previous,
1011                        id,
1012                        next,
1013                        chunk_type.as_str(),
1014                    )?;
1015                    items.push(new);
1016                }
1017
1018                Ok(items)
1019            })
1020            .await?;
1021
1022        Ok(result)
1023    }
1024
1025    #[instrument(skip(self))]
1026    async fn load_all_chunks_metadata(
1027        &self,
1028        linked_chunk_id: LinkedChunkId<'_>,
1029    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1030        let _timer = timer!("method");
1031
1032        let hashed_linked_chunk_id =
1033            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1034
1035        self.read()
1036            .await?
1037            .with_transaction(move |txn| -> Result<_> {
1038                // We want to collect the metadata about each chunk (id, next, previous), and
1039                // for event chunks, the number of events in it. For gaps, the
1040                // number of events is 0, by convention.
1041                //
1042                // We've tried different strategies over time:
1043                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
1044                //   caused a full table traversal for each chunk, including for gaps which
1045                //   don't have any events. This happened in
1046                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
1047                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
1048                //   additional `SELECT` query. It was an immense improvement, but still caused
1049                //   one select query per event chunk. This happened in
1050                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
1051                //
1052                // The current solution is to run two queries:
1053                // - one to get each chunk and its number of events, by doing a single `SELECT`
1054                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
1055                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
1056                //   hashmap.
1057                // - one to get each chunk's metadata (id, previous, next, type) from the
1058                //   database with a `SELECT`, and then use the hashmap to get the number of
1059                //   events.
1060                //
1061                // This strategy minimizes the number of queries to the database, and keeps them
1062                // super simple, while doing a bit more processing here, which is much faster.
1063
1064                let num_events_by_chunk_ids = txn
1065                    .prepare(
1066                        r#"
1067                            SELECT ec.chunk_id, COUNT(ec.event_id)
1068                            FROM event_chunks as ec
1069                            WHERE ec.linked_chunk_id = ?
1070                            GROUP BY ec.chunk_id
1071                        "#,
1072                    )?
1073                    .query_map((&hashed_linked_chunk_id,), |row| {
1074                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
1075                    })?
1076                    .collect::<Result<HashMap<_, _>, _>>()?;
1077
1078                txn.prepare(
1079                    r#"
1080                        SELECT
1081                            lc.id,
1082                            lc.previous,
1083                            lc.next,
1084                            lc.type
1085                        FROM linked_chunks as lc
1086                        WHERE lc.linked_chunk_id = ?
1087                        ORDER BY lc.id"#,
1088                )?
1089                .query_map((&hashed_linked_chunk_id,), |row| {
1090                    Ok((
1091                        row.get::<_, u64>(0)?,
1092                        row.get::<_, Option<u64>>(1)?,
1093                        row.get::<_, Option<u64>>(2)?,
1094                        row.get::<_, String>(3)?,
1095                    ))
1096                })?
1097                .map(|data| -> Result<_> {
1098                    let (id, previous, next, chunk_type) = data?;
1099
1100                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
1101                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
1102                    // benchmarking shows that this is slightly slower than matching the chunk
1103                    // type (around 1%, so in the realm of noise), so we keep the explicit
1104                    // check instead.
1105                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1106                        0
1107                    } else {
1108                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1109                    };
1110
1111                    Ok(ChunkMetadata {
1112                        identifier: ChunkIdentifier::new(id),
1113                        previous: previous.map(ChunkIdentifier::new),
1114                        next: next.map(ChunkIdentifier::new),
1115                        num_items,
1116                    })
1117                })
1118                .collect::<Result<Vec<_>, _>>()
1119            })
1120            .await
1121    }
1122
1123    #[instrument(skip(self))]
1124    async fn load_last_chunk(
1125        &self,
1126        linked_chunk_id: LinkedChunkId<'_>,
1127    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1128        let _timer = timer!("method");
1129
1130        let hashed_linked_chunk_id =
1131            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1132
1133        let this = self.clone();
1134
1135        self
1136            .read()
1137            .await?
1138            .with_transaction(move |txn| -> Result<_> {
1139                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1140                let (observed_max_identifier, number_of_chunks) = txn
1141                    .prepare(
1142                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1143                    )?
1144                    .query_row(
1145                        (&hashed_linked_chunk_id,),
1146                        |row| {
1147                            Ok((
1148                                // Read the `MAX(id)` as an `Option<u64>` instead
1149                                // of `u64` in case the `SELECT` returns nothing.
1150                                // Indeed, if it returns no line, the `MAX(id)` is
1151                                // set to `Null`.
1152                                row.get::<_, Option<u64>>(0)?,
1153                                row.get::<_, u64>(1)?,
1154                            ))
1155                        }
1156                    )?;
1157
1158                let chunk_identifier_generator = match observed_max_identifier {
1159                    Some(max_observed_identifier) => {
1160                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1161                            ChunkIdentifier::new(max_observed_identifier)
1162                        )
1163                    },
1164                    None => ChunkIdentifierGenerator::new_from_scratch(),
1165                };
1166
1167                // Find the last chunk.
1168                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1169                    .prepare(
1170                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1171                    )?
1172                    .query_row(
1173                        (&hashed_linked_chunk_id,),
1174                        |row| {
1175                            Ok((
1176                                row.get::<_, u64>(0)?,
1177                                row.get::<_, Option<u64>>(1)?,
1178                                row.get::<_, String>(2)?,
1179                            ))
1180                        }
1181                    )
1182                    .optional()?
1183                else {
1184                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1185                    // good.
1186                    if number_of_chunks == 0 {
1187                        return Ok((None, chunk_identifier_generator));
1188                    }
1189                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1190                    // linked chunk is malformed.
1191                    //
1192                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1193                    else {
1194                        return Err(Error::InvalidData {
1195                            details:
1196                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1197                                    .to_owned()
1198                            }
1199                        )
1200                    }
1201                };
1202
1203                // Build the chunk.
1204                let last_chunk = txn.rebuild_chunk(
1205                    &this,
1206                    &hashed_linked_chunk_id,
1207                    previous_chunk,
1208                    chunk_identifier,
1209                    None,
1210                    &chunk_type
1211                )?;
1212
1213                Ok((Some(last_chunk), chunk_identifier_generator))
1214            })
1215            .await
1216    }
1217
1218    #[instrument(skip(self))]
1219    async fn load_previous_chunk(
1220        &self,
1221        linked_chunk_id: LinkedChunkId<'_>,
1222        before_chunk_identifier: ChunkIdentifier,
1223    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1224        let _timer = timer!("method");
1225
1226        let hashed_linked_chunk_id =
1227            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1228
1229        let this = self.clone();
1230
1231        self
1232            .read()
1233            .await?
1234            .with_transaction(move |txn| -> Result<_> {
1235                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1236                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1237                    .prepare(
1238                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1239                    )?
1240                    .query_row(
1241                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1242                        |row| {
1243                            Ok((
1244                                row.get::<_, u64>(0)?,
1245                                row.get::<_, Option<u64>>(1)?,
1246                                row.get::<_, Option<u64>>(2)?,
1247                                row.get::<_, String>(3)?,
1248                            ))
1249                        }
1250                    )
1251                    .optional()?
1252                else {
1253                    // Chunk is not found.
1254                    return Ok(None);
1255                };
1256
1257                // Build the chunk.
1258                let last_chunk = txn.rebuild_chunk(
1259                    &this,
1260                    &hashed_linked_chunk_id,
1261                    previous_chunk,
1262                    chunk_identifier,
1263                    next_chunk,
1264                    &chunk_type
1265                )?;
1266
1267                Ok(Some(last_chunk))
1268            })
1269            .await
1270    }
1271
1272    #[instrument(skip(self))]
1273    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1274        let _timer = timer!("method");
1275
1276        self.write()
1277            .await?
1278            .with_transaction(move |txn| {
1279                // Remove all the chunks, and let cascading do its job.
1280                txn.execute("DELETE FROM linked_chunks", ())?;
1281                // Also clear all the events' contents.
1282                txn.execute("DELETE FROM events", ())
1283            })
1284            .await?;
1285
1286        Ok(())
1287    }
1288
1289    #[instrument(skip(self, events))]
1290    async fn filter_duplicated_events(
1291        &self,
1292        linked_chunk_id: LinkedChunkId<'_>,
1293        events: Vec<OwnedEventId>,
1294    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1295        let _timer = timer!("method");
1296
1297        // If there's no events for which we want to check duplicates, we can return
1298        // early. It's not only an optimization to do so: it's required, otherwise the
1299        // `repeat_vars` call below will panic.
1300        if events.is_empty() {
1301            return Ok(Vec::new());
1302        }
1303
1304        // Select all events that exist in the store, i.e. the duplicates.
1305        let hashed_linked_chunk_id =
1306            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1307        let linked_chunk_id = linked_chunk_id.to_owned();
1308
1309        self.read()
1310            .await?
1311            .with_transaction(move |txn| -> Result<_> {
1312                txn.chunk_large_query_over(events, None, move |txn, events| {
1313                    let query = format!(
1314                        r#"
1315                            SELECT event_id, chunk_id, position
1316                            FROM event_chunks
1317                            WHERE linked_chunk_id = ? AND event_id IN ({})
1318                            ORDER BY chunk_id ASC, position ASC
1319                        "#,
1320                        repeat_vars(events.len()),
1321                    );
1322
1323                    let parameters = params_from_iter(
1324                        // parameter for `linked_chunk_id = ?`
1325                        once(
1326                            hashed_linked_chunk_id
1327                                .to_sql()
1328                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1329                                .unwrap(),
1330                        )
1331                        // parameters for `event_id IN (…)`
1332                        .chain(events.iter().map(|event| {
1333                            event
1334                                .as_str()
1335                                .to_sql()
1336                                // SAFETY: it cannot fail since `str::to_sql` never fails
1337                                .unwrap()
1338                        })),
1339                    );
1340
1341                    let mut duplicated_events = Vec::new();
1342
1343                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1344                        Ok((
1345                            row.get::<_, String>(0)?,
1346                            row.get::<_, u64>(1)?,
1347                            row.get::<_, usize>(2)?,
1348                        ))
1349                    })? {
1350                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1351
1352                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1353                            // Normally unreachable, but the event ID has been stored even if it is
1354                            // malformed, let's skip it.
1355                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1356                            continue;
1357                        };
1358
1359                        duplicated_events.push((
1360                            duplicated_event,
1361                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1362                        ));
1363                    }
1364
1365                    Ok(duplicated_events)
1366                })
1367            })
1368            .await
1369    }
1370
1371    #[instrument(skip(self, event_id))]
1372    async fn find_event(
1373        &self,
1374        room_id: &RoomId,
1375        event_id: &EventId,
1376    ) -> Result<Option<Event>, Self::Error> {
1377        let _timer = timer!("method");
1378
1379        let event_id = event_id.to_owned();
1380        let this = self.clone();
1381
1382        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1383
1384        self.read()
1385            .await?
1386            .with_transaction(move |txn| -> Result<_> {
1387                let Some(event) = txn
1388                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1389                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1390                    .optional()?
1391                else {
1392                    // Event is not found.
1393                    return Ok(None);
1394                };
1395
1396                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1397
1398                Ok(Some(event))
1399            })
1400            .await
1401    }
1402
1403    #[instrument(skip(self, event_id, filters))]
1404    async fn find_event_relations(
1405        &self,
1406        room_id: &RoomId,
1407        event_id: &EventId,
1408        filters: Option<&[RelationType]>,
1409    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1410        let _timer = timer!("method");
1411
1412        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1413
1414        let hashed_linked_chunk_id =
1415            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1416
1417        let event_id = event_id.to_owned();
1418        let filters = filters.map(ToOwned::to_owned);
1419        let store = self.clone();
1420
1421        self.read()
1422            .await?
1423            .with_transaction(move |txn| -> Result<_> {
1424                find_event_relations_transaction(
1425                    store,
1426                    hashed_room_id,
1427                    hashed_linked_chunk_id,
1428                    event_id,
1429                    filters,
1430                    txn,
1431                )
1432            })
1433            .await
1434    }
1435
1436    #[instrument(skip(self))]
1437    async fn get_room_events(
1438        &self,
1439        room_id: &RoomId,
1440        event_type: Option<&str>,
1441        session_id: Option<&str>,
1442    ) -> Result<Vec<Event>, Self::Error> {
1443        let _timer = timer!("method");
1444
1445        let this = self.clone();
1446
1447        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1448        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1449        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1450
1451        self.read()
1452            .await?
1453            .with_transaction(move |txn| -> Result<_> {
1454                // I'm not sure why clippy claims that the clones aren't required. The compiler
1455                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1456                // much so let's silence things.
1457                #[allow(clippy::redundant_clone)]
1458                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1459                    (None, None) => {
1460                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1461                    }
1462                    (None, Some(session_id)) => (
1463                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1464                        params![hashed_room_id, session_id.to_owned()],
1465                    ),
1466                    (Some(event_type), None) => (
1467                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1468                        params![hashed_room_id, event_type.to_owned()]
1469                    ),
1470                    (Some(event_type), Some(session_id)) => (
1471                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1472                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1473                    ),
1474                };
1475
1476                let mut statement = txn.prepare(query)?;
1477                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1478
1479                let mut events = Vec::new();
1480                for ev in maybe_events {
1481                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1482                    events.push(event);
1483                }
1484
1485                Ok(events)
1486            })
1487            .await
1488    }
1489
1490    #[instrument(skip(self, event))]
1491    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1492        let _timer = timer!("method");
1493
1494        let Some(event_id) = event.event_id() else {
1495            error!("Trying to save an event with no ID");
1496            return Ok(());
1497        };
1498
1499        let Some(event_type) = event.kind.event_type() else {
1500            error!(%event_id, "Trying to save an event with no event type");
1501            return Ok(());
1502        };
1503
1504        let event_type = self.encode_key(keys::EVENTS, event_type);
1505        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1506
1507        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1508        let event_id = event_id.to_string();
1509        let encoded_event = self.encode_event(&event)?;
1510
1511        self.write()
1512            .await?
1513            .with_transaction(move |txn| -> Result<_> {
1514                txn.execute(
1515                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1516                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1517
1518                Ok(())
1519            })
1520            .await
1521    }
1522
1523    async fn close(&self) -> Result<(), Self::Error> {
1524        SqliteEventCacheStore::close(self).await
1525    }
1526
1527    async fn reopen(&self) -> Result<(), Self::Error> {
1528        SqliteEventCacheStore::reopen(self).await
1529    }
1530
1531    async fn optimize(&self) -> Result<(), Self::Error> {
1532        Ok(self.vacuum().await?)
1533    }
1534
1535    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1536        self.get_db_size().await
1537    }
1538}
1539
1540fn find_event_relations_transaction(
1541    store: SqliteEventCacheStore,
1542    hashed_room_id: Key,
1543    hashed_linked_chunk_id: Key,
1544    event_id: OwnedEventId,
1545    filters: Option<Vec<RelationType>>,
1546    txn: &Transaction<'_>,
1547) -> Result<Vec<(Event, Option<Position>)>> {
1548    let get_rows = |row: &rusqlite::Row<'_>| {
1549        Ok((
1550            row.get::<_, Vec<u8>>(0)?,
1551            row.get::<_, Option<u64>>(1)?,
1552            row.get::<_, Option<usize>>(2)?,
1553        ))
1554    };
1555
1556    // Collect related events.
1557    let collect_results = |transaction| {
1558        let mut related = Vec::new();
1559
1560        for result in transaction {
1561            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1562
1563            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1564
1565            // Only build the position if both the chunk_id and position were present; in
1566            // theory, they should either be present at the same time, or not at all.
1567            let pos = chunk_id
1568                .zip(index)
1569                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1570
1571            related.push((event, pos));
1572        }
1573
1574        Ok(related)
1575    };
1576
1577    if let Some(filters) = filters {
1578        let question_marks = repeat_vars(filters.len());
1579        let query = format!(
1580            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1581            FROM events
1582            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1583            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1584        );
1585
1586        // First the filters need to be stringified; because `.to_sql()` will borrow
1587        // from them, they also need to be stringified onto the stack, so as to
1588        // get a stable address (to avoid returning a temporary reference in the
1589        // map closure below).
1590        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1591        let filters_params: Vec<_> = filter_strings
1592            .iter()
1593            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1594            .collect();
1595
1596        let parameters = params_from_iter(
1597            [
1598                hashed_linked_chunk_id.to_sql().expect(
1599                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1600                ),
1601                event_id
1602                    .as_str()
1603                    .to_sql()
1604                    .expect("We should be able to convert an event ID to a SQLite value"),
1605                hashed_room_id
1606                    .to_sql()
1607                    .expect("We should be able to convert a room ID to a SQLite value"),
1608            ]
1609            .into_iter()
1610            .chain(filters_params),
1611        );
1612
1613        let mut transaction = txn.prepare(&query)?;
1614        let transaction = transaction.query_map(parameters, get_rows)?;
1615
1616        collect_results(transaction)
1617    } else {
1618        let query =
1619            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1620            FROM events
1621            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1622            WHERE relates_to = ? AND room_id = ?";
1623        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1624
1625        let mut transaction = txn.prepare(query)?;
1626        let transaction = transaction.query_map(parameters, get_rows)?;
1627
1628        collect_results(transaction)
1629    }
1630}
1631
1632/// Like `deadpool::managed::Object::with_transaction`, but starts the
1633/// transaction in immediate (write) mode from the beginning, precluding errors
1634/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1635/// both reads and writes, and start with a write.
1636async fn with_immediate_transaction<
1637    T: Send + 'static,
1638    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1639>(
1640    this: &SqliteEventCacheStore,
1641    f: F,
1642) -> Result<T, Error> {
1643    this.write()
1644        .await?
1645        .interact(move |conn| -> Result<T, Error> {
1646            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1647            // to avoid read transactions upgrading to write mode and causing
1648            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1649            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1650
1651            let code = || -> Result<T, Error> {
1652                let txn = conn.transaction()?;
1653                let res = f(&txn)?;
1654                txn.commit()?;
1655                Ok(res)
1656            };
1657
1658            let res = code();
1659
1660            // Reset the transaction behavior to use Deferred, after this transaction has
1661            // been run, whether it was successful or not.
1662            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1663
1664            res
1665        })
1666        .await
1667        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1668        .unwrap()
1669}
1670
1671fn insert_chunk(
1672    txn: &Transaction<'_>,
1673    linked_chunk_id: &Key,
1674    previous: Option<u64>,
1675    new: u64,
1676    next: Option<u64>,
1677    type_str: &str,
1678) -> rusqlite::Result<()> {
1679    // First, insert the new chunk.
1680    txn.execute(
1681        r#"
1682            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1683            VALUES (?, ?, ?, ?, ?)
1684        "#,
1685        (new, linked_chunk_id, previous, next, type_str),
1686    )?;
1687
1688    // If this chunk has a previous one, update its `next` field.
1689    if let Some(previous) = previous {
1690        let updated = txn.execute(
1691            r#"
1692                UPDATE linked_chunks
1693                SET next = ?
1694                WHERE id = ? AND linked_chunk_id = ?
1695            "#,
1696            (new, previous, linked_chunk_id),
1697        )?;
1698        if updated < 1 {
1699            return Err(rusqlite::Error::QueryReturnedNoRows);
1700        }
1701        if updated > 1 {
1702            return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1703        }
1704    }
1705
1706    // If this chunk has a next one, update its `previous` field.
1707    if let Some(next) = next {
1708        let updated = txn.execute(
1709            r#"
1710                UPDATE linked_chunks
1711                SET previous = ?
1712                WHERE id = ? AND linked_chunk_id = ?
1713            "#,
1714            (new, next, linked_chunk_id),
1715        )?;
1716        if updated < 1 {
1717            return Err(rusqlite::Error::QueryReturnedNoRows);
1718        }
1719        if updated > 1 {
1720            return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1721        }
1722    }
1723
1724    Ok(())
1725}
1726
1727#[cfg(test)]
1728mod tests {
1729    use std::{
1730        path::PathBuf,
1731        sync::{
1732            LazyLock,
1733            atomic::{AtomicU32, Ordering::SeqCst},
1734        },
1735    };
1736
1737    use assert_matches::assert_matches;
1738    use matrix_sdk_base::{
1739        event_cache::store::{
1740            EventCacheStore, EventCacheStoreError, IntoEventCacheStore,
1741            integration_tests::EventCacheStoreIntegrationTests,
1742        },
1743        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1744        linked_chunk::{ChunkIdentifier, LinkedChunkId, Update},
1745    };
1746    use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1747    use tempfile::{TempDir, tempdir};
1748
1749    use super::SqliteEventCacheStore;
1750    use crate::{
1751        SqliteStoreConfig,
1752        event_cache_store::keys,
1753        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1754    };
1755
1756    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1757    static NUM: AtomicU32 = AtomicU32::new(0);
1758
1759    fn new_event_cache_store_workspace() -> PathBuf {
1760        let name = NUM.fetch_add(1, SeqCst).to_string();
1761        TMP_DIR.path().join(name)
1762    }
1763
1764    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1765        let tmpdir_path = new_event_cache_store_workspace();
1766
1767        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1768
1769        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1770    }
1771
1772    event_cache_store_integration_tests!();
1773    event_cache_store_integration_tests_time!();
1774
1775    #[async_test]
1776    async fn test_pool_size() {
1777        let tmpdir_path = new_event_cache_store_workspace();
1778        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1779
1780        let store = SqliteEventCacheStore::open_with_config(&store_open_config).await.unwrap();
1781
1782        assert_eq!(store.pool_max_size().await, Some(42));
1783    }
1784
1785    #[async_test]
1786    async fn test_linked_chunk_remove_chunk() {
1787        let store = get_event_cache_store().await.expect("creating cache store failed");
1788
1789        // Run corresponding integration test
1790        store.clone().into_event_cache_store().test_linked_chunk_remove_chunk().await;
1791
1792        // Check that cascading worked. Yes, SQLite, I doubt you.
1793        let gaps = store
1794            .read()
1795            .await
1796            .unwrap()
1797            .with_transaction(|txn| -> rusqlite::Result<_> {
1798                let mut gaps = Vec::new();
1799                for data in txn
1800                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1801                    .query_map((), |row| row.get::<_, u64>(0))?
1802                {
1803                    gaps.push(data?);
1804                }
1805                Ok(gaps)
1806            })
1807            .await
1808            .unwrap();
1809
1810        // Check that the gaps match those set up in the corresponding integration test
1811        // above
1812        assert_eq!(gaps, vec![42, 44]);
1813    }
1814
1815    #[async_test]
1816    async fn test_linked_chunk_remove_item() {
1817        let store = get_event_cache_store().await.expect("creating cache store failed");
1818
1819        // Run corresponding integration test
1820        store.clone().into_event_cache_store().test_linked_chunk_remove_item().await;
1821
1822        let room_id = *DEFAULT_TEST_ROOM_ID;
1823        let linked_chunk_id = LinkedChunkId::Room(room_id);
1824
1825        // Make sure the position have been updated for the remaining events.
1826        let num_rows: u64 = store
1827            .read()
1828            .await
1829            .unwrap()
1830            .with_transaction(move |txn| {
1831                txn.query_row(
1832                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1833                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1834                    |row| row.get(0),
1835                )
1836            })
1837            .await
1838            .unwrap();
1839        assert_eq!(num_rows, 3);
1840    }
1841
1842    #[async_test]
1843    async fn test_linked_chunk_clear() {
1844        let store = get_event_cache_store().await.expect("creating cache store failed");
1845
1846        // Run corresponding integration test
1847        store.clone().into_event_cache_store().test_linked_chunk_clear().await;
1848
1849        // Check that cascading worked. Yes, SQLite, I doubt you.
1850        store
1851            .read()
1852            .await
1853            .unwrap()
1854            .with_transaction(|txn| -> rusqlite::Result<_> {
1855                let num_gaps = txn
1856                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
1857                    .query_row((), |row| row.get::<_, u64>(0))?;
1858                assert_eq!(num_gaps, 0);
1859
1860                let num_events = txn
1861                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
1862                    .query_row((), |row| row.get::<_, u64>(0))?;
1863                assert_eq!(num_events, 0);
1864
1865                Ok(())
1866            })
1867            .await
1868            .unwrap();
1869    }
1870
1871    #[async_test]
1872    async fn test_linked_chunk_update_is_a_transaction() {
1873        let store = get_event_cache_store().await.expect("creating cache store failed");
1874
1875        let room_id = *DEFAULT_TEST_ROOM_ID;
1876        let linked_chunk_id = LinkedChunkId::Room(room_id);
1877
1878        // Trigger a violation of the unique constraint on the (room id, chunk id)
1879        // couple.
1880        let err = store
1881            .handle_linked_chunk_updates(
1882                linked_chunk_id,
1883                vec![
1884                    Update::NewItemsChunk {
1885                        previous: None,
1886                        new: ChunkIdentifier::new(42),
1887                        next: None,
1888                    },
1889                    Update::NewItemsChunk {
1890                        previous: None,
1891                        new: ChunkIdentifier::new(42),
1892                        next: None,
1893                    },
1894                ],
1895            )
1896            .await
1897            .unwrap_err();
1898
1899        // The operation fails with a constraint violation error.
1900        assert_matches!(err, crate::error::Error::Sqlite(err) => {
1901            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1902        });
1903
1904        // If the updates have been handled transactionally, then no new chunks should
1905        // have been added; failure of the second update leads to the first one being
1906        // rolled back.
1907        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1908        assert!(chunks.is_empty());
1909    }
1910}
1911
1912#[cfg(test)]
1913mod encrypted_tests {
1914    use std::sync::{
1915        LazyLock,
1916        atomic::{AtomicU32, Ordering::SeqCst},
1917    };
1918
1919    use matrix_sdk_base::{
1920        event_cache::store::{EventCacheStore, EventCacheStoreError},
1921        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1922    };
1923    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1924    use ruma::{
1925        event_id,
1926        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1927        room_id, user_id,
1928    };
1929    use tempfile::{TempDir, tempdir};
1930
1931    use super::SqliteEventCacheStore;
1932
1933    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1934    static NUM: AtomicU32 = AtomicU32::new(0);
1935
1936    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1937        let name = NUM.fetch_add(1, SeqCst).to_string();
1938        let tmpdir_path = TMP_DIR.path().join(name);
1939
1940        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1941
1942        Ok(SqliteEventCacheStore::open(
1943            tmpdir_path.to_str().unwrap(),
1944            Some("default_test_password"),
1945        )
1946        .await
1947        .unwrap())
1948    }
1949
1950    event_cache_store_integration_tests!();
1951    event_cache_store_integration_tests_time!();
1952
1953    #[async_test]
1954    async fn test_no_sqlite_injection_in_find_event_relations() {
1955        let room_id = room_id!("!test:localhost");
1956        let another_room_id = room_id!("!r1:matrix.org");
1957        let sender = user_id!("@alice:localhost");
1958
1959        let store = get_event_cache_store()
1960            .await
1961            .expect("We should be able to create a new, empty, event cache store");
1962
1963        let f = EventFactory::new().room(room_id).sender(sender);
1964
1965        // Create an event for the first room.
1966        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
1967        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
1968
1969        // Create a related event.
1970        let edit_id = event_id!("$find_me:matrix.org");
1971        let edit = f
1972            .text_msg("Find me")
1973            .event_id(edit_id)
1974            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
1975            .into_event();
1976
1977        // Create an event for the second room.
1978        let f = f.room(another_room_id);
1979
1980        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
1981        let another_event =
1982            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
1983
1984        // Save the events in the DB.
1985        store.save_event(room_id, event).await.unwrap();
1986        store.save_event(room_id, edit).await.unwrap();
1987        store.save_event(another_room_id, another_event).await.unwrap();
1988
1989        // Craft a `RelationType` that will inject some SQL to be executed. The
1990        // `OR 1=1` ensures that all the previous parameters, the room
1991        // ID and event ID are ignored.
1992        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
1993
1994        // Attempt to find events in the first room.
1995        let results = store
1996            .find_event_relations(room_id, event_id, filter.as_deref())
1997            .await
1998            .expect("We should be able to attempt to find event relations");
1999
2000        // Ensure that we only got the single related event the first room contains.
2001        similar_asserts::assert_eq!(
2002            results.len(),
2003            1,
2004            "We should only have loaded events for the first room {results:#?}"
2005        );
2006
2007        // The event needs to be the edit event, otherwise something is wrong.
2008        let (found_event, _) = &results[0];
2009        assert_eq!(
2010            found_event.event_id().as_deref(),
2011            Some(edit_id),
2012            "The single event we found should be the edit event"
2013        );
2014    }
2015}
2016
2017#[cfg(test)]
2018mod close_reopen_tests {
2019    use std::sync::{
2020        LazyLock,
2021        atomic::{AtomicU32, Ordering::SeqCst},
2022    };
2023
2024    use matrix_sdk_base::{event_cache::store::EventCacheStore, linked_chunk::LinkedChunkId};
2025    use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
2026    use tempfile::{TempDir, tempdir};
2027
2028    use super::SqliteEventCacheStore;
2029
2030    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2031    static NUM: AtomicU32 = AtomicU32::new(0);
2032
2033    async fn new_store() -> SqliteEventCacheStore {
2034        let name = NUM.fetch_add(1, SeqCst).to_string();
2035        let tmpdir_path = TMP_DIR.path().join(name);
2036        SqliteEventCacheStore::open(tmpdir_path, None).await.unwrap()
2037    }
2038
2039    #[async_test]
2040    async fn test_close_completes_without_timeout() {
2041        let store = new_store().await;
2042
2043        // Close should complete quickly without hitting the 5s timeout.
2044        let start = std::time::Instant::now();
2045        store.close().await.unwrap();
2046        let elapsed = start.elapsed();
2047
2048        assert!(
2049            elapsed < std::time::Duration::from_secs(2),
2050            "close() took {elapsed:?}, expected < 2s (no timeout)"
2051        );
2052
2053        // Connections should be None after close.
2054        let guard = store.connections.lock().await;
2055        assert!(guard.is_none(), "connections should be None after close");
2056    }
2057
2058    #[async_test]
2059    async fn test_reopen_restores_connections() {
2060        let store = new_store().await;
2061
2062        store.close().await.unwrap();
2063
2064        {
2065            let guard = store.connections.lock().await;
2066            assert!(guard.is_none());
2067        }
2068
2069        store.reopen().await.unwrap();
2070
2071        {
2072            let guard = store.connections.lock().await;
2073            assert!(guard.is_some(), "connections should be Some after reopen");
2074        }
2075    }
2076
2077    #[async_test]
2078    async fn test_close_is_idempotent() {
2079        let store = new_store().await;
2080
2081        store.close().await.unwrap();
2082        // Second close should be a no-op.
2083        store.close().await.unwrap();
2084
2085        let guard = store.connections.lock().await;
2086        assert!(guard.is_none());
2087    }
2088
2089    #[async_test]
2090    async fn test_reopen_is_idempotent() {
2091        let store = new_store().await;
2092
2093        // Reopen on an active store should be a no-op.
2094        store.reopen().await.unwrap();
2095
2096        let guard = store.connections.lock().await;
2097        assert!(guard.is_some());
2098    }
2099
2100    #[async_test]
2101    async fn test_read_fails_when_closed() {
2102        let store = new_store().await;
2103        store.close().await.unwrap();
2104
2105        let err = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2106        assert!(err.is_err(), "read should fail when closed");
2107
2108        let err_msg = err.unwrap_err().to_string();
2109        assert!(err_msg.contains("closed"), "error should mention 'closed', got: {err_msg}");
2110    }
2111
2112    #[async_test]
2113    async fn test_write_fails_when_closed() {
2114        let store = new_store().await;
2115        store.close().await.unwrap();
2116
2117        let err = store.try_take_leased_lock(1000, "test_lock", "holder").await;
2118        assert!(err.is_err(), "write should fail when closed");
2119
2120        let err_msg = err.unwrap_err().to_string();
2121        assert!(err_msg.contains("closed"), "error should mention 'closed', got: {err_msg}");
2122    }
2123
2124    #[async_test]
2125    async fn test_data_persists_across_close_reopen() {
2126        let store = new_store().await;
2127
2128        // Take a lease lock — this is persisted in the database.
2129        let result = store.try_take_leased_lock(60_000, "test_lock", "holder").await.unwrap();
2130        assert!(result.is_some(), "should have acquired the lock");
2131
2132        // Close and reopen.
2133        store.close().await.unwrap();
2134        store.reopen().await.unwrap();
2135
2136        // The lock should still be held by the original holder after reopen.
2137        let result = store.try_take_leased_lock(60_000, "test_lock", "other_holder").await.unwrap();
2138        assert!(result.is_none(), "lock should still be held by the original holder after reopen");
2139    }
2140
2141    #[async_test]
2142    async fn test_multiple_close_reopen_cycles() {
2143        let store = new_store().await;
2144
2145        for _ in 0..5 {
2146            store.close().await.unwrap();
2147            store.reopen().await.unwrap();
2148
2149            // After each cycle, the store should be fully operational.
2150            let result = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2151            assert!(result.is_ok(), "store should work after close/reopen cycle");
2152        }
2153    }
2154
2155    #[async_test]
2156    async fn test_pool_is_fully_drained_after_close() {
2157        let store = new_store().await;
2158
2159        // Do a few reads to exercise the pool.
2160        let _ = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2161        let _ = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2162
2163        store.close().await.unwrap();
2164
2165        // After close, the connections field should be None (pool and write
2166        // connection have been fully torn down).
2167        let guard = store.connections.lock().await;
2168        assert!(guard.is_none(), "all connections should be released after close");
2169    }
2170
2171    #[async_test]
2172    async fn test_operations_work_immediately_after_reopen() {
2173        let store = new_store().await;
2174
2175        store.close().await.unwrap();
2176        store.reopen().await.unwrap();
2177
2178        // Read should work immediately after reopen.
2179        let result = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2180        assert!(result.is_ok(), "read should succeed immediately after reopen");
2181
2182        // Write should work immediately after reopen.
2183        let result = store.try_take_leased_lock(1000, "test_lock", "holder").await;
2184        assert!(result.is_ok(), "write should succeed immediately after reopen");
2185    }
2186
2187    #[async_test]
2188    async fn test_close_waits_for_held_read_connection_to_drain() {
2189        let store = new_store().await;
2190
2191        // Acquire a read connection and hold it, simulating an in-flight read.
2192        let held_conn = store.read().await.unwrap();
2193
2194        // Spawn close in a background task — it will close the pool and then
2195        // poll-wait for pool.status().size == 0 in the drain loop.
2196        let store_clone = store.clone();
2197        let close_handle = tokio::spawn(async move {
2198            store_clone.close().await.unwrap();
2199        });
2200
2201        // Give close() a moment to close the pool and enter the drain loop.
2202        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2203
2204        // The close task should still be running because we hold a connection.
2205        assert!(!close_handle.is_finished(), "close should be waiting for the held connection");
2206
2207        // Release the held connection — this lets pool.status().size drop to 0.
2208        drop(held_conn);
2209
2210        // Now close should complete promptly (well within the 5s timeout).
2211        let timeout = tokio::time::timeout(std::time::Duration::from_secs(3), close_handle).await;
2212        assert!(timeout.is_ok(), "close should complete after the held connection is released");
2213        timeout.unwrap().unwrap();
2214
2215        // Verify the store is fully closed.
2216        let guard = store.connections.lock().await;
2217        assert!(guard.is_none(), "connections should be None after close");
2218    }
2219}