Skip to main content

matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        Event, Gap,
25        store::{EventCacheStore, extract_event_relation},
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
36};
37use rusqlite::{
38    OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
39};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47    OpenStoreError, Secret, SqliteStoreConfig,
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
53    },
54};
55
56mod keys {
57    // Tables
58    pub const LINKED_CHUNKS: &str = "linked_chunks";
59    pub const EVENTS: &str = "events";
60}
61
62/// The database name.
63const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65/// The string used to identify a chunk of type events, in the `type` field in
66/// the database.
67const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
68/// The string used to identify a chunk of type gap, in the `type` field in the
69/// database.
70const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
71
72/// An SQLite-based event cache store.
73#[derive(Clone)]
74pub struct SqliteEventCacheStore {
75    store_cipher: Option<Arc<StoreCipher>>,
76
77    /// The pool of connections.
78    pool: SqlitePool,
79
80    /// We make the difference between connections for read operations, and for
81    /// write operations. We keep a single connection apart from write
82    /// operations. All other connections are used for read operations. The
83    /// lock is used to ensure there is one owner at a time.
84    write_connection: Arc<Mutex<SqliteAsyncConn>>,
85}
86
87#[cfg(not(tarpaulin_include))]
88impl fmt::Debug for SqliteEventCacheStore {
89    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
91    }
92}
93
94impl EncryptableStore for SqliteEventCacheStore {
95    fn get_cypher(&self) -> Option<&StoreCipher> {
96        self.store_cipher.as_deref()
97    }
98}
99
100impl SqliteEventCacheStore {
101    /// Open the SQLite-based event cache store at the given path using the
102    /// given passphrase to encrypt private data.
103    pub async fn open(
104        path: impl AsRef<Path>,
105        passphrase: Option<&str>,
106    ) -> Result<Self, OpenStoreError> {
107        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
108    }
109
110    /// Open the SQLite-based event cache store at the given path using the
111    /// given key to encrypt private data.
112    pub async fn open_with_key(
113        path: impl AsRef<Path>,
114        key: Option<&[u8; 32]>,
115    ) -> Result<Self, OpenStoreError> {
116        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
117    }
118
119    /// Open the SQLite-based event cache store with the config open config.
120    #[instrument(skip(config), fields(path = ?config.path))]
121    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
122        debug!(?config);
123
124        let _timer = timer!("open_with_config");
125
126        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
127
128        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
129
130        let this = Self::open_with_pool(pool, config.secret).await?;
131        this.write().await?.apply_runtime_config(config.runtime_config).await?;
132
133        Ok(this)
134    }
135
136    /// Open an SQLite-based event cache store using the given SQLite database
137    /// pool. The given secret will be used to encrypt private data.
138    async fn open_with_pool(
139        pool: SqlitePool,
140        secret: Option<Secret>,
141    ) -> Result<Self, OpenStoreError> {
142        let conn = pool.get().await?;
143
144        let version = conn.db_version().await?;
145
146        run_migrations(&conn, version).await?;
147
148        conn.wal_checkpoint().await;
149
150        let store_cipher = match secret {
151            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
152            None => None,
153        };
154
155        Ok(Self {
156            store_cipher,
157            pool,
158            // Use `conn` as our selected write connections.
159            write_connection: Arc::new(Mutex::new(conn)),
160        })
161    }
162
163    /// Acquire a connection for executing read operations.
164    #[instrument(skip_all)]
165    async fn read(&self) -> Result<SqliteAsyncConn> {
166        let connection = self.pool.get().await?;
167
168        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
169        // support must be enabled on a per-connection basis. Execute it every
170        // time we try to get a connection, since we can't guarantee a previous
171        // connection did enable it before.
172        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
173
174        Ok(connection)
175    }
176
177    /// Acquire a connection for executing write operations.
178    #[instrument(skip_all)]
179    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
180        let connection = self.write_connection.clone().lock_owned().await;
181
182        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
183        // support must be enabled on a per-connection basis. Execute it every
184        // time we try to get a connection, since we can't guarantee a previous
185        // connection did enable it before.
186        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
187
188        Ok(connection)
189    }
190
191    fn map_row_to_chunk(
192        row: &rusqlite::Row<'_>,
193    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
194        Ok((
195            row.get::<_, u64>(0)?,
196            row.get::<_, Option<u64>>(1)?,
197            row.get::<_, Option<u64>>(2)?,
198            row.get::<_, String>(3)?,
199        ))
200    }
201
202    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
203        let serialized = serde_json::to_vec(event)?;
204
205        // Extract the relationship info here.
206        let raw_event = event.raw();
207        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
208
209        // The content may be encrypted.
210        let content = self.encode_value(serialized)?;
211
212        Ok(EncodedEvent {
213            content,
214            rel_type,
215            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
216        })
217    }
218
219    pub async fn vacuum(&self) -> Result<()> {
220        self.write_connection.lock().await.vacuum().await
221    }
222
223    async fn get_db_size(&self) -> Result<Option<usize>> {
224        Ok(Some(self.pool.get().await?.get_db_size().await?))
225    }
226}
227
228struct EncodedEvent {
229    content: Vec<u8>,
230    rel_type: Option<String>,
231    relates_to: Option<String>,
232}
233
234trait TransactionExtForLinkedChunks {
235    fn rebuild_chunk(
236        &self,
237        store: &SqliteEventCacheStore,
238        linked_chunk_id: &Key,
239        previous: Option<u64>,
240        index: u64,
241        next: Option<u64>,
242        chunk_type: &str,
243    ) -> Result<RawChunk<Event, Gap>>;
244
245    fn load_gap_content(
246        &self,
247        store: &SqliteEventCacheStore,
248        linked_chunk_id: &Key,
249        chunk_id: ChunkIdentifier,
250    ) -> Result<Gap>;
251
252    fn load_events_content(
253        &self,
254        store: &SqliteEventCacheStore,
255        linked_chunk_id: &Key,
256        chunk_id: ChunkIdentifier,
257    ) -> Result<Vec<Event>>;
258}
259
260impl TransactionExtForLinkedChunks for Transaction<'_> {
261    fn rebuild_chunk(
262        &self,
263        store: &SqliteEventCacheStore,
264        linked_chunk_id: &Key,
265        previous: Option<u64>,
266        id: u64,
267        next: Option<u64>,
268        chunk_type: &str,
269    ) -> Result<RawChunk<Event, Gap>> {
270        let previous = previous.map(ChunkIdentifier::new);
271        let next = next.map(ChunkIdentifier::new);
272        let id = ChunkIdentifier::new(id);
273
274        match chunk_type {
275            CHUNK_TYPE_GAP_TYPE_STRING => {
276                // It's a gap!
277                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
278                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
279            }
280
281            CHUNK_TYPE_EVENT_TYPE_STRING => {
282                // It's events!
283                let events = self.load_events_content(store, linked_chunk_id, id)?;
284                Ok(RawChunk {
285                    content: ChunkContent::Items(events),
286                    previous,
287                    identifier: id,
288                    next,
289                })
290            }
291
292            other => {
293                // It's an error!
294                Err(Error::InvalidData {
295                    details: format!("a linked chunk has an unknown type {other}"),
296                })
297            }
298        }
299    }
300
301    fn load_gap_content(
302        &self,
303        store: &SqliteEventCacheStore,
304        linked_chunk_id: &Key,
305        chunk_id: ChunkIdentifier,
306    ) -> Result<Gap> {
307        // There's at most one row for it in the database, so a call to `query_row` is
308        // sufficient.
309        let encoded_prev_token: Vec<u8> = self.query_row(
310            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
311            (chunk_id.index(), &linked_chunk_id),
312            |row| row.get(0),
313        )?;
314        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
315        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
316        Ok(Gap { token: prev_token })
317    }
318
319    fn load_events_content(
320        &self,
321        store: &SqliteEventCacheStore,
322        linked_chunk_id: &Key,
323        chunk_id: ChunkIdentifier,
324    ) -> Result<Vec<Event>> {
325        // Retrieve all the events from the database.
326        let mut events = Vec::new();
327
328        for event_data in self
329            .prepare(
330                r#"
331                    SELECT events.content
332                    FROM event_chunks ec, events
333                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
334                    ORDER BY ec.position ASC
335                "#,
336            )?
337            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
338        {
339            let encoded_content = event_data?;
340            let serialized_content = store.decode_value(&encoded_content)?;
341            let event = serde_json::from_slice(&serialized_content)?;
342
343            events.push(event);
344        }
345
346        Ok(events)
347    }
348}
349
350/// Run migrations for the given version of the database.
351async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
352    // Always enable foreign keys for the current connection.
353    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
354
355    if version < 1 {
356        debug!("Creating database");
357        // First turn on WAL mode, this can't be done in the transaction, it fails with
358        // the error message: "cannot change into wal mode from within a transaction".
359        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
360        conn.with_transaction(|txn| {
361            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
362            txn.set_db_version(1)
363        })
364        .await?;
365    }
366
367    if version < 2 {
368        debug!("Upgrading database to version 2");
369        conn.with_transaction(|txn| {
370            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
371            txn.set_db_version(2)
372        })
373        .await?;
374    }
375
376    if version < 3 {
377        debug!("Upgrading database to version 3");
378        conn.with_transaction(|txn| {
379            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
380            txn.set_db_version(3)
381        })
382        .await?;
383    }
384
385    if version < 4 {
386        debug!("Upgrading database to version 4");
387        conn.with_transaction(|txn| {
388            txn.execute_batch(include_str!(
389                "../migrations/event_cache_store/004_ignore_policy.sql"
390            ))?;
391            txn.set_db_version(4)
392        })
393        .await?;
394    }
395
396    if version < 5 {
397        debug!("Upgrading database to version 5");
398        conn.with_transaction(|txn| {
399            txn.execute_batch(include_str!(
400                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
401            ))?;
402            txn.set_db_version(5)
403        })
404        .await?;
405    }
406
407    if version < 6 {
408        debug!("Upgrading database to version 6");
409        conn.with_transaction(|txn| {
410            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
411            txn.set_db_version(6)
412        })
413        .await?;
414    }
415
416    if version < 7 {
417        debug!("Upgrading database to version 7");
418        conn.with_transaction(|txn| {
419            txn.execute_batch(include_str!(
420                "../migrations/event_cache_store/007_event_chunks.sql"
421            ))?;
422            txn.set_db_version(7)
423        })
424        .await?;
425    }
426
427    if version < 8 {
428        debug!("Upgrading database to version 8");
429        conn.with_transaction(|txn| {
430            txn.execute_batch(include_str!(
431                "../migrations/event_cache_store/008_linked_chunk_id.sql"
432            ))?;
433            txn.set_db_version(8)
434        })
435        .await?;
436    }
437
438    if version < 9 {
439        debug!("Upgrading database to version 9");
440        conn.with_transaction(|txn| {
441            txn.execute_batch(include_str!(
442                "../migrations/event_cache_store/009_related_event_index.sql"
443            ))?;
444            txn.set_db_version(9)
445        })
446        .await?;
447    }
448
449    if version < 10 {
450        debug!("Upgrading database to version 10");
451        conn.with_transaction(|txn| {
452            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
453            txn.set_db_version(10)
454        })
455        .await?;
456
457        if version >= 1 {
458            // Defragment the DB and optimize its size on the filesystem now that we removed
459            // the media cache.
460            conn.vacuum().await?;
461        }
462    }
463
464    if version < 11 {
465        debug!("Upgrading database to version 11");
466        conn.with_transaction(|txn| {
467            txn.execute_batch(include_str!(
468                "../migrations/event_cache_store/011_empty_event_cache.sql"
469            ))?;
470            txn.set_db_version(11)
471        })
472        .await?;
473    }
474
475    if version < 12 {
476        debug!("Upgrading database to version 12");
477        conn.with_transaction(|txn| {
478            txn.execute_batch(include_str!(
479                "../migrations/event_cache_store/012_store_event_type.sql"
480            ))?;
481            txn.set_db_version(12)
482        })
483        .await?;
484    }
485
486    if version < 13 {
487        debug!("Upgrading database to version 13");
488        conn.with_transaction(|txn| {
489            txn.execute_batch(include_str!(
490                "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
491            ))?;
492            txn.set_db_version(13)
493        })
494        .await?;
495    }
496
497    if version < 14 {
498        debug!("Upgrading database to version 14");
499        conn.with_transaction(|txn| {
500            txn.execute_batch(include_str!(
501                "../migrations/event_cache_store/014_event_chunks_event_id_index.sql"
502            ))?;
503            txn.set_db_version(14)
504        })
505        .await?;
506    }
507
508    Ok(())
509}
510
511#[async_trait]
512impl EventCacheStore for SqliteEventCacheStore {
513    type Error = Error;
514
515    #[instrument(skip(self))]
516    async fn try_take_leased_lock(
517        &self,
518        lease_duration_ms: u32,
519        key: &str,
520        holder: &str,
521    ) -> Result<Option<CrossProcessLockGeneration>> {
522        let key = key.to_owned();
523        let holder = holder.to_owned();
524
525        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
526        let expiration = now + lease_duration_ms as u64;
527
528        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
529        let generation = self
530            .write()
531            .await?
532            .with_transaction(move |txn| {
533                txn.query_row(
534                    "INSERT INTO lease_locks (key, holder, expiration)
535                    VALUES (?1, ?2, ?3)
536                    ON CONFLICT (key)
537                    DO
538                        UPDATE SET
539                            holder = excluded.holder,
540                            expiration = excluded.expiration,
541                            generation =
542                                CASE holder
543                                    WHEN excluded.holder THEN generation
544                                    ELSE generation + 1
545                                END
546                        WHERE
547                            holder = excluded.holder
548                            OR expiration < ?4
549                    RETURNING generation
550                    ",
551                    (key, holder, expiration, now),
552                    |row| row.get(0),
553                )
554                .optional()
555            })
556            .await?;
557
558        Ok(generation)
559    }
560
561    #[instrument(skip(self, updates))]
562    async fn handle_linked_chunk_updates(
563        &self,
564        linked_chunk_id: LinkedChunkId<'_>,
565        updates: Vec<Update<Event, Gap>>,
566    ) -> Result<(), Self::Error> {
567        let _timer = timer!("method");
568
569        // Use a single transaction throughout this function, so that either all updates
570        // work, or none is taken into account.
571        let hashed_linked_chunk_id =
572            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
573        let linked_chunk_id = linked_chunk_id.to_owned();
574        let this = self.clone();
575
576        with_immediate_transaction(self, move |txn| {
577            for up in updates {
578                match up {
579                    Update::NewItemsChunk { previous, new, next } => {
580                        let previous = previous.as_ref().map(ChunkIdentifier::index);
581                        let new = new.index();
582                        let next = next.as_ref().map(ChunkIdentifier::index);
583
584                        trace!(
585                            %linked_chunk_id,
586                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
587                        );
588
589                        insert_chunk(
590                            txn,
591                            &hashed_linked_chunk_id,
592                            previous,
593                            new,
594                            next,
595                            CHUNK_TYPE_EVENT_TYPE_STRING,
596                        )?;
597                    }
598
599                    Update::NewGapChunk { previous, new, next, gap } => {
600                        let serialized = serde_json::to_vec(&gap.token)?;
601                        let prev_token = this.encode_value(serialized)?;
602
603                        let previous = previous.as_ref().map(ChunkIdentifier::index);
604                        let new = new.index();
605                        let next = next.as_ref().map(ChunkIdentifier::index);
606
607                        trace!(
608                            %linked_chunk_id,
609                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
610                        );
611
612                        // Insert the chunk as a gap.
613                        insert_chunk(
614                            txn,
615                            &hashed_linked_chunk_id,
616                            previous,
617                            new,
618                            next,
619                            CHUNK_TYPE_GAP_TYPE_STRING,
620                        )?;
621
622                        // Insert the gap's value.
623                        txn.execute(
624                            r#"
625                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
626                            VALUES (?, ?, ?)
627                        "#,
628                            (new, &hashed_linked_chunk_id, prev_token),
629                        )?;
630                    }
631
632                    Update::RemoveChunk(chunk_identifier) => {
633                        let chunk_id = chunk_identifier.index();
634
635                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
636
637                        // Find chunk to delete.
638                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
639                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
640                            (chunk_id, &hashed_linked_chunk_id),
641                            |row| Ok((row.get(0)?, row.get(1)?))
642                        )?;
643
644                        // Replace its previous' next to its own next.
645                        if let Some(previous) = previous {
646                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
647                        }
648
649                        // Replace its next' previous to its own previous.
650                        if let Some(next) = next {
651                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
652                        }
653
654                        // Now delete it, and let cascading delete corresponding entries in the
655                        // other data tables.
656                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
657                    }
658
659                    Update::PushItems { at, items } => {
660                        if items.is_empty() {
661                            // Should never happens, but better be safe.
662                            continue;
663                        }
664
665                        let chunk_id = at.chunk_identifier().index();
666
667                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
668
669                        let mut chunk_statement = txn.prepare(
670                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
671                        )?;
672
673                        // Note: we use `OR REPLACE` here, because the event might have been
674                        // already inserted in the database. This is the case when an event is
675                        // deduplicated and moved to another position; or because it was inserted
676                        // outside the context of a linked chunk (e.g. pinned event).
677                        let mut content_statement = txn.prepare(
678                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
679                        )?;
680
681                        let invalid_event = |event: TimelineEvent| {
682                            let Some(event_id) = event.event_id() else {
683                                error!(%linked_chunk_id, "Trying to push an event with no ID");
684                                return None;
685                            };
686
687                            let Some(event_type) = event.kind.event_type() else {
688                                error!(%event_id, "Trying to save an event with no event type");
689                                return None;
690                            };
691
692                            Some((event_id.to_string(), event_type, event))
693                        };
694
695                        let room_id = linked_chunk_id.room_id();
696                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
697
698                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
699                            // Insert the location information into the database.
700                            let index = at.index() + i;
701                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
702
703                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
704                            let event_type = this.encode_key(keys::EVENTS, event_type);
705
706                            // Now, insert the event content into the database.
707                            let encoded_event = this.encode_event(&event)?;
708                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
709                        }
710                    }
711
712                    Update::ReplaceItem { at, item: event } => {
713                        let chunk_id = at.chunk_identifier().index();
714
715                        let index = at.index();
716
717                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
718
719                        // The event id should be the same, but just in case it changed…
720                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
721                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
722                            continue;
723                        };
724
725                        let Some(event_type) = event.kind.event_type() else {
726                            error!(%event_id, "Trying to save an event with no event type");
727                            continue;
728                        };
729
730                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
731                        let event_type = this.encode_key(keys::EVENTS, event_type);
732
733                        // Replace the event's content. Really we'd like to update, but in case the
734                        // event id changed, we are a bit lenient here and will allow an insertion
735                        // of the new event.
736                        let encoded_event = this.encode_event(&event)?;
737                        let room_id = linked_chunk_id.room_id();
738                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
739                        txn.execute(
740                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
741                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
742
743                        // Replace the event id in the linked chunk, in case it changed.
744                        txn.execute(
745                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
746                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
747                        )?;
748                    }
749
750                    Update::RemoveItem { at } => {
751                        let chunk_id = at.chunk_identifier().index();
752                        let index = at.index();
753
754                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
755
756                        // Remove the entry in the chunk table.
757                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
758
759                        // Decrement the index of each item after the one we are
760                        // going to remove.
761                        //
762                        // Imagine we have the following events:
763                        //
764                        // | event_id | linked_chunk_id | chunk_id | position |
765                        // |----------|-----------------|----------|----------|
766                        // | $ev0     | !r0             | 42       | 0        |
767                        // | $ev1     | !r0             | 42       | 1        |
768                        // | $ev2     | !r0             | 42       | 2        |
769                        // | $ev3     | !r0             | 42       | 3        |
770                        // | $ev4     | !r0             | 42       | 4        |
771                        //
772                        // `$ev2` has been removed, then we end up in this
773                        // state:
774                        //
775                        // | event_id | linked_chunk_id    | chunk_id | position |
776                        // |----------|--------------------|----------|----------|
777                        // | $ev0     | !r0                | 42       | 0        |
778                        // | $ev1     | !r0                | 42       | 1        |
779                        // |          |                    |          |          | <- no more `$ev2`
780                        // | $ev3     | !r0                | 42       | 3        |
781                        // | $ev4     | !r0                | 42       | 4        |
782                        //
783                        // We need to shift the `position` of `$ev3` and `$ev4`
784                        // to `position - 1`, like so:
785                        //
786                        // | event_id | linked_chunk_id | chunk_id | position |
787                        // |----------|-----------------|----------|----------|
788                        // | $ev0     | !r0             | 42       | 0        |
789                        // | $ev1     | !r0             | 42       | 1        |
790                        // | $ev3     | !r0             | 42       | 2        |
791                        // | $ev4     | !r0             | 42       | 3        |
792                        //
793                        // Usually, it boils down to run the following query:
794                        //
795                        // ```sql
796                        // UPDATE event_chunks
797                        // SET position = position - 1
798                        // WHERE position > 2 AND …
799                        // ```
800                        //
801                        // Okay. But `UPDATE` runs on rows in no particular
802                        // order. It means that it can update `$ev4` before
803                        // `$ev3` for example. What happens in this particular
804                        // case? The `position` of `$ev4` becomes `3`, however
805                        // `$ev3` already has `position = 3`. Because there
806                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
807                        // position)`, it will result in a constraint violation.
808                        //
809                        // There is **no way** to control the execution order of
810                        // `UPDATE` in SQLite. To persuade yourself, try:
811                        //
812                        // ```sql
813                        // UPDATE event_chunks
814                        // SET position = position - 1
815                        // FROM (
816                        //     SELECT event_id
817                        //     FROM event_chunks
818                        //     WHERE position > 2 AND …
819                        //     ORDER BY position ASC
820                        // ) as ordered
821                        // WHERE event_chunks.event_id = ordered.event_id
822                        // ```
823                        //
824                        // It will fail the same way.
825                        //
826                        // Thus, we have 2 solutions:
827                        //
828                        // 1. Remove the `UNIQUE` constraint,
829                        // 2. Be creative.
830                        //
831                        // The `UNIQUE` constraint is a safe belt. Normally, we
832                        // have `event_cache::Deduplicator` that is responsible
833                        // to ensure there is no duplicated event. However,
834                        // relying on this is “fragile” in the sense it can
835                        // contain bugs. Relying on the `UNIQUE` constraint from
836                        // SQLite is more robust. It's “braces and belt” as we
837                        // say here.
838                        //
839                        // So. We need to be creative.
840                        //
841                        // Many solutions exist. Amongst the most popular, we
842                        // see _dropping and re-creating the index_, which is
843                        // no-go for us, it's too expensive. I (@hywan) have
844                        // adopted the following one:
845                        //
846                        // - Do `position = position - 1` but in the negative
847                        //   space, so `position = -(position - 1)`. A position
848                        //   cannot be negative; we are sure it is unique!
849                        // - Once all candidate rows are updated, do `position =
850                        //   -position` to move back to the positive space.
851                        //
852                        // 'told you it's gonna be creative.
853                        //
854                        // This solution is a hack, **but** it is a small
855                        // number of operations, and we can keep the `UNIQUE`
856                        // constraint in place.
857                        txn.execute(
858                            r#"
859                                UPDATE event_chunks
860                                SET position = -(position - 1)
861                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
862                            "#,
863                            (&hashed_linked_chunk_id, chunk_id, index)
864                        )?;
865                        txn.execute(
866                            r#"
867                                UPDATE event_chunks
868                                SET position = -position
869                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
870                            "#,
871                            (&hashed_linked_chunk_id, chunk_id)
872                        )?;
873                    }
874
875                    Update::DetachLastItems { at } => {
876                        let chunk_id = at.chunk_identifier().index();
877                        let index = at.index();
878
879                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
880
881                        // Remove these entries.
882                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
883                    }
884
885                    Update::Clear => {
886                        trace!(%linked_chunk_id, "clearing items");
887
888                        // Remove chunks, and let cascading do its job.
889                        txn.execute(
890                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
891                            (&hashed_linked_chunk_id,),
892                        )?;
893                    }
894
895                    Update::StartReattachItems | Update::EndReattachItems => {
896                        // Nothing.
897                    }
898                }
899            }
900
901            Ok(())
902        })
903        .await?;
904
905        Ok(())
906    }
907
908    #[instrument(skip(self))]
909    async fn load_all_chunks(
910        &self,
911        linked_chunk_id: LinkedChunkId<'_>,
912    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
913        let _timer = timer!("method");
914
915        let hashed_linked_chunk_id =
916            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
917
918        let this = self.clone();
919
920        let result = self
921            .read()
922            .await?
923            .with_transaction(move |txn| -> Result<_> {
924                let mut items = Vec::new();
925
926                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
927                for data in txn
928                    .prepare(
929                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
930                    )?
931                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
932                {
933                    let (id, previous, next, chunk_type) = data?;
934                    let new = txn.rebuild_chunk(
935                        &this,
936                        &hashed_linked_chunk_id,
937                        previous,
938                        id,
939                        next,
940                        chunk_type.as_str(),
941                    )?;
942                    items.push(new);
943                }
944
945                Ok(items)
946            })
947            .await?;
948
949        Ok(result)
950    }
951
952    #[instrument(skip(self))]
953    async fn load_all_chunks_metadata(
954        &self,
955        linked_chunk_id: LinkedChunkId<'_>,
956    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
957        let _timer = timer!("method");
958
959        let hashed_linked_chunk_id =
960            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
961
962        self.read()
963            .await?
964            .with_transaction(move |txn| -> Result<_> {
965                // We want to collect the metadata about each chunk (id, next, previous), and
966                // for event chunks, the number of events in it. For gaps, the
967                // number of events is 0, by convention.
968                //
969                // We've tried different strategies over time:
970                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
971                //   caused a full table traversal for each chunk, including for gaps which
972                //   don't have any events. This happened in
973                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
974                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
975                //   additional `SELECT` query. It was an immense improvement, but still caused
976                //   one select query per event chunk. This happened in
977                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
978                //
979                // The current solution is to run two queries:
980                // - one to get each chunk and its number of events, by doing a single `SELECT`
981                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
982                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
983                //   hashmap.
984                // - one to get each chunk's metadata (id, previous, next, type) from the
985                //   database with a `SELECT`, and then use the hashmap to get the number of
986                //   events.
987                //
988                // This strategy minimizes the number of queries to the database, and keeps them
989                // super simple, while doing a bit more processing here, which is much faster.
990
991                let num_events_by_chunk_ids = txn
992                    .prepare(
993                        r#"
994                            SELECT ec.chunk_id, COUNT(ec.event_id)
995                            FROM event_chunks as ec
996                            WHERE ec.linked_chunk_id = ?
997                            GROUP BY ec.chunk_id
998                        "#,
999                    )?
1000                    .query_map((&hashed_linked_chunk_id,), |row| {
1001                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
1002                    })?
1003                    .collect::<Result<HashMap<_, _>, _>>()?;
1004
1005                txn.prepare(
1006                    r#"
1007                        SELECT
1008                            lc.id,
1009                            lc.previous,
1010                            lc.next,
1011                            lc.type
1012                        FROM linked_chunks as lc
1013                        WHERE lc.linked_chunk_id = ?
1014                        ORDER BY lc.id"#,
1015                )?
1016                .query_map((&hashed_linked_chunk_id,), |row| {
1017                    Ok((
1018                        row.get::<_, u64>(0)?,
1019                        row.get::<_, Option<u64>>(1)?,
1020                        row.get::<_, Option<u64>>(2)?,
1021                        row.get::<_, String>(3)?,
1022                    ))
1023                })?
1024                .map(|data| -> Result<_> {
1025                    let (id, previous, next, chunk_type) = data?;
1026
1027                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
1028                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
1029                    // benchmarking shows that this is slightly slower than matching the chunk
1030                    // type (around 1%, so in the realm of noise), so we keep the explicit
1031                    // check instead.
1032                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1033                        0
1034                    } else {
1035                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1036                    };
1037
1038                    Ok(ChunkMetadata {
1039                        identifier: ChunkIdentifier::new(id),
1040                        previous: previous.map(ChunkIdentifier::new),
1041                        next: next.map(ChunkIdentifier::new),
1042                        num_items,
1043                    })
1044                })
1045                .collect::<Result<Vec<_>, _>>()
1046            })
1047            .await
1048    }
1049
1050    #[instrument(skip(self))]
1051    async fn load_last_chunk(
1052        &self,
1053        linked_chunk_id: LinkedChunkId<'_>,
1054    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1055        let _timer = timer!("method");
1056
1057        let hashed_linked_chunk_id =
1058            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1059
1060        let this = self.clone();
1061
1062        self
1063            .read()
1064            .await?
1065            .with_transaction(move |txn| -> Result<_> {
1066                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1067                let (observed_max_identifier, number_of_chunks) = txn
1068                    .prepare(
1069                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1070                    )?
1071                    .query_row(
1072                        (&hashed_linked_chunk_id,),
1073                        |row| {
1074                            Ok((
1075                                // Read the `MAX(id)` as an `Option<u64>` instead
1076                                // of `u64` in case the `SELECT` returns nothing.
1077                                // Indeed, if it returns no line, the `MAX(id)` is
1078                                // set to `Null`.
1079                                row.get::<_, Option<u64>>(0)?,
1080                                row.get::<_, u64>(1)?,
1081                            ))
1082                        }
1083                    )?;
1084
1085                let chunk_identifier_generator = match observed_max_identifier {
1086                    Some(max_observed_identifier) => {
1087                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1088                            ChunkIdentifier::new(max_observed_identifier)
1089                        )
1090                    },
1091                    None => ChunkIdentifierGenerator::new_from_scratch(),
1092                };
1093
1094                // Find the last chunk.
1095                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1096                    .prepare(
1097                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1098                    )?
1099                    .query_row(
1100                        (&hashed_linked_chunk_id,),
1101                        |row| {
1102                            Ok((
1103                                row.get::<_, u64>(0)?,
1104                                row.get::<_, Option<u64>>(1)?,
1105                                row.get::<_, String>(2)?,
1106                            ))
1107                        }
1108                    )
1109                    .optional()?
1110                else {
1111                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1112                    // good.
1113                    if number_of_chunks == 0 {
1114                        return Ok((None, chunk_identifier_generator));
1115                    }
1116                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1117                    // linked chunk is malformed.
1118                    //
1119                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1120                    else {
1121                        return Err(Error::InvalidData {
1122                            details:
1123                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1124                                    .to_owned()
1125                            }
1126                        )
1127                    }
1128                };
1129
1130                // Build the chunk.
1131                let last_chunk = txn.rebuild_chunk(
1132                    &this,
1133                    &hashed_linked_chunk_id,
1134                    previous_chunk,
1135                    chunk_identifier,
1136                    None,
1137                    &chunk_type
1138                )?;
1139
1140                Ok((Some(last_chunk), chunk_identifier_generator))
1141            })
1142            .await
1143    }
1144
1145    #[instrument(skip(self))]
1146    async fn load_previous_chunk(
1147        &self,
1148        linked_chunk_id: LinkedChunkId<'_>,
1149        before_chunk_identifier: ChunkIdentifier,
1150    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1151        let _timer = timer!("method");
1152
1153        let hashed_linked_chunk_id =
1154            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1155
1156        let this = self.clone();
1157
1158        self
1159            .read()
1160            .await?
1161            .with_transaction(move |txn| -> Result<_> {
1162                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1163                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1164                    .prepare(
1165                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1166                    )?
1167                    .query_row(
1168                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1169                        |row| {
1170                            Ok((
1171                                row.get::<_, u64>(0)?,
1172                                row.get::<_, Option<u64>>(1)?,
1173                                row.get::<_, Option<u64>>(2)?,
1174                                row.get::<_, String>(3)?,
1175                            ))
1176                        }
1177                    )
1178                    .optional()?
1179                else {
1180                    // Chunk is not found.
1181                    return Ok(None);
1182                };
1183
1184                // Build the chunk.
1185                let last_chunk = txn.rebuild_chunk(
1186                    &this,
1187                    &hashed_linked_chunk_id,
1188                    previous_chunk,
1189                    chunk_identifier,
1190                    next_chunk,
1191                    &chunk_type
1192                )?;
1193
1194                Ok(Some(last_chunk))
1195            })
1196            .await
1197    }
1198
1199    #[instrument(skip(self))]
1200    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1201        let _timer = timer!("method");
1202
1203        self.write()
1204            .await?
1205            .with_transaction(move |txn| {
1206                // Remove all the chunks, and let cascading do its job.
1207                txn.execute("DELETE FROM linked_chunks", ())?;
1208                // Also clear all the events' contents.
1209                txn.execute("DELETE FROM events", ())
1210            })
1211            .await?;
1212
1213        Ok(())
1214    }
1215
1216    #[instrument(skip(self, events))]
1217    async fn filter_duplicated_events(
1218        &self,
1219        linked_chunk_id: LinkedChunkId<'_>,
1220        events: Vec<OwnedEventId>,
1221    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1222        let _timer = timer!("method");
1223
1224        // If there's no events for which we want to check duplicates, we can return
1225        // early. It's not only an optimization to do so: it's required, otherwise the
1226        // `repeat_vars` call below will panic.
1227        if events.is_empty() {
1228            return Ok(Vec::new());
1229        }
1230
1231        // Select all events that exist in the store, i.e. the duplicates.
1232        let hashed_linked_chunk_id =
1233            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1234        let linked_chunk_id = linked_chunk_id.to_owned();
1235
1236        self.read()
1237            .await?
1238            .with_transaction(move |txn| -> Result<_> {
1239                txn.chunk_large_query_over(events, None, move |txn, events| {
1240                    let query = format!(
1241                        r#"
1242                            SELECT event_id, chunk_id, position
1243                            FROM event_chunks
1244                            WHERE linked_chunk_id = ? AND event_id IN ({})
1245                            ORDER BY chunk_id ASC, position ASC
1246                        "#,
1247                        repeat_vars(events.len()),
1248                    );
1249
1250                    let parameters = params_from_iter(
1251                        // parameter for `linked_chunk_id = ?`
1252                        once(
1253                            hashed_linked_chunk_id
1254                                .to_sql()
1255                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1256                                .unwrap(),
1257                        )
1258                        // parameters for `event_id IN (…)`
1259                        .chain(events.iter().map(|event| {
1260                            event
1261                                .as_str()
1262                                .to_sql()
1263                                // SAFETY: it cannot fail since `str::to_sql` never fails
1264                                .unwrap()
1265                        })),
1266                    );
1267
1268                    let mut duplicated_events = Vec::new();
1269
1270                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1271                        Ok((
1272                            row.get::<_, String>(0)?,
1273                            row.get::<_, u64>(1)?,
1274                            row.get::<_, usize>(2)?,
1275                        ))
1276                    })? {
1277                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1278
1279                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1280                            // Normally unreachable, but the event ID has been stored even if it is
1281                            // malformed, let's skip it.
1282                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1283                            continue;
1284                        };
1285
1286                        duplicated_events.push((
1287                            duplicated_event,
1288                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1289                        ));
1290                    }
1291
1292                    Ok(duplicated_events)
1293                })
1294            })
1295            .await
1296    }
1297
1298    #[instrument(skip(self, event_id))]
1299    async fn find_event(
1300        &self,
1301        room_id: &RoomId,
1302        event_id: &EventId,
1303    ) -> Result<Option<Event>, Self::Error> {
1304        let _timer = timer!("method");
1305
1306        let event_id = event_id.to_owned();
1307        let this = self.clone();
1308
1309        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1310
1311        self.read()
1312            .await?
1313            .with_transaction(move |txn| -> Result<_> {
1314                let Some(event) = txn
1315                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1316                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1317                    .optional()?
1318                else {
1319                    // Event is not found.
1320                    return Ok(None);
1321                };
1322
1323                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1324
1325                Ok(Some(event))
1326            })
1327            .await
1328    }
1329
1330    #[instrument(skip(self, event_id, filters))]
1331    async fn find_event_relations(
1332        &self,
1333        room_id: &RoomId,
1334        event_id: &EventId,
1335        filters: Option<&[RelationType]>,
1336    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1337        let _timer = timer!("method");
1338
1339        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1340
1341        let hashed_linked_chunk_id =
1342            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1343
1344        let event_id = event_id.to_owned();
1345        let filters = filters.map(ToOwned::to_owned);
1346        let store = self.clone();
1347
1348        self.read()
1349            .await?
1350            .with_transaction(move |txn| -> Result<_> {
1351                find_event_relations_transaction(
1352                    store,
1353                    hashed_room_id,
1354                    hashed_linked_chunk_id,
1355                    event_id,
1356                    filters,
1357                    txn,
1358                )
1359            })
1360            .await
1361    }
1362
1363    #[instrument(skip(self))]
1364    async fn get_room_events(
1365        &self,
1366        room_id: &RoomId,
1367        event_type: Option<&str>,
1368        session_id: Option<&str>,
1369    ) -> Result<Vec<Event>, Self::Error> {
1370        let _timer = timer!("method");
1371
1372        let this = self.clone();
1373
1374        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1375        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1376        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1377
1378        self.read()
1379            .await?
1380            .with_transaction(move |txn| -> Result<_> {
1381                // I'm not sure why clippy claims that the clones aren't required. The compiler
1382                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1383                // much so let's silence things.
1384                #[allow(clippy::redundant_clone)]
1385                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1386                    (None, None) => {
1387                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1388                    }
1389                    (None, Some(session_id)) => (
1390                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1391                        params![hashed_room_id, session_id.to_owned()],
1392                    ),
1393                    (Some(event_type), None) => (
1394                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1395                        params![hashed_room_id, event_type.to_owned()]
1396                    ),
1397                    (Some(event_type), Some(session_id)) => (
1398                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1399                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1400                    ),
1401                };
1402
1403                let mut statement = txn.prepare(query)?;
1404                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1405
1406                let mut events = Vec::new();
1407                for ev in maybe_events {
1408                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1409                    events.push(event);
1410                }
1411
1412                Ok(events)
1413            })
1414            .await
1415    }
1416
1417    #[instrument(skip(self, event))]
1418    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1419        let _timer = timer!("method");
1420
1421        let Some(event_id) = event.event_id() else {
1422            error!("Trying to save an event with no ID");
1423            return Ok(());
1424        };
1425
1426        let Some(event_type) = event.kind.event_type() else {
1427            error!(%event_id, "Trying to save an event with no event type");
1428            return Ok(());
1429        };
1430
1431        let event_type = self.encode_key(keys::EVENTS, event_type);
1432        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1433
1434        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1435        let event_id = event_id.to_string();
1436        let encoded_event = self.encode_event(&event)?;
1437
1438        self.write()
1439            .await?
1440            .with_transaction(move |txn| -> Result<_> {
1441                txn.execute(
1442                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1443                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1444
1445                Ok(())
1446            })
1447            .await
1448    }
1449
1450    async fn optimize(&self) -> Result<(), Self::Error> {
1451        Ok(self.vacuum().await?)
1452    }
1453
1454    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1455        self.get_db_size().await
1456    }
1457}
1458
1459fn find_event_relations_transaction(
1460    store: SqliteEventCacheStore,
1461    hashed_room_id: Key,
1462    hashed_linked_chunk_id: Key,
1463    event_id: OwnedEventId,
1464    filters: Option<Vec<RelationType>>,
1465    txn: &Transaction<'_>,
1466) -> Result<Vec<(Event, Option<Position>)>> {
1467    let get_rows = |row: &rusqlite::Row<'_>| {
1468        Ok((
1469            row.get::<_, Vec<u8>>(0)?,
1470            row.get::<_, Option<u64>>(1)?,
1471            row.get::<_, Option<usize>>(2)?,
1472        ))
1473    };
1474
1475    // Collect related events.
1476    let collect_results = |transaction| {
1477        let mut related = Vec::new();
1478
1479        for result in transaction {
1480            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1481
1482            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1483
1484            // Only build the position if both the chunk_id and position were present; in
1485            // theory, they should either be present at the same time, or not at all.
1486            let pos = chunk_id
1487                .zip(index)
1488                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1489
1490            related.push((event, pos));
1491        }
1492
1493        Ok(related)
1494    };
1495
1496    if let Some(filters) = filters {
1497        let question_marks = repeat_vars(filters.len());
1498        let query = format!(
1499            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1500            FROM events
1501            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1502            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1503        );
1504
1505        // First the filters need to be stringified; because `.to_sql()` will borrow
1506        // from them, they also need to be stringified onto the stack, so as to
1507        // get a stable address (to avoid returning a temporary reference in the
1508        // map closure below).
1509        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1510        let filters_params: Vec<_> = filter_strings
1511            .iter()
1512            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1513            .collect();
1514
1515        let parameters = params_from_iter(
1516            [
1517                hashed_linked_chunk_id.to_sql().expect(
1518                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1519                ),
1520                event_id
1521                    .as_str()
1522                    .to_sql()
1523                    .expect("We should be able to convert an event ID to a SQLite value"),
1524                hashed_room_id
1525                    .to_sql()
1526                    .expect("We should be able to convert a room ID to a SQLite value"),
1527            ]
1528            .into_iter()
1529            .chain(filters_params),
1530        );
1531
1532        let mut transaction = txn.prepare(&query)?;
1533        let transaction = transaction.query_map(parameters, get_rows)?;
1534
1535        collect_results(transaction)
1536    } else {
1537        let query =
1538            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1539            FROM events
1540            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1541            WHERE relates_to = ? AND room_id = ?";
1542        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1543
1544        let mut transaction = txn.prepare(query)?;
1545        let transaction = transaction.query_map(parameters, get_rows)?;
1546
1547        collect_results(transaction)
1548    }
1549}
1550
1551/// Like `deadpool::managed::Object::with_transaction`, but starts the
1552/// transaction in immediate (write) mode from the beginning, precluding errors
1553/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1554/// both reads and writes, and start with a write.
1555async fn with_immediate_transaction<
1556    T: Send + 'static,
1557    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1558>(
1559    this: &SqliteEventCacheStore,
1560    f: F,
1561) -> Result<T, Error> {
1562    this.write()
1563        .await?
1564        .interact(move |conn| -> Result<T, Error> {
1565            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1566            // to avoid read transactions upgrading to write mode and causing
1567            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1568            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1569
1570            let code = || -> Result<T, Error> {
1571                let txn = conn.transaction()?;
1572                let res = f(&txn)?;
1573                txn.commit()?;
1574                Ok(res)
1575            };
1576
1577            let res = code();
1578
1579            // Reset the transaction behavior to use Deferred, after this transaction has
1580            // been run, whether it was successful or not.
1581            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1582
1583            res
1584        })
1585        .await
1586        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1587        .unwrap()
1588}
1589
1590fn insert_chunk(
1591    txn: &Transaction<'_>,
1592    linked_chunk_id: &Key,
1593    previous: Option<u64>,
1594    new: u64,
1595    next: Option<u64>,
1596    type_str: &str,
1597) -> rusqlite::Result<()> {
1598    // First, insert the new chunk.
1599    txn.execute(
1600        r#"
1601            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1602            VALUES (?, ?, ?, ?, ?)
1603        "#,
1604        (new, linked_chunk_id, previous, next, type_str),
1605    )?;
1606
1607    // If this chunk has a previous one, update its `next` field.
1608    if let Some(previous) = previous {
1609        let updated = txn.execute(
1610            r#"
1611                UPDATE linked_chunks
1612                SET next = ?
1613                WHERE id = ? AND linked_chunk_id = ?
1614            "#,
1615            (new, previous, linked_chunk_id),
1616        )?;
1617        if updated < 1 {
1618            return Err(rusqlite::Error::QueryReturnedNoRows);
1619        }
1620        if updated > 1 {
1621            return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1622        }
1623    }
1624
1625    // If this chunk has a next one, update its `previous` field.
1626    if let Some(next) = next {
1627        let updated = txn.execute(
1628            r#"
1629                UPDATE linked_chunks
1630                SET previous = ?
1631                WHERE id = ? AND linked_chunk_id = ?
1632            "#,
1633            (new, next, linked_chunk_id),
1634        )?;
1635        if updated < 1 {
1636            return Err(rusqlite::Error::QueryReturnedNoRows);
1637        }
1638        if updated > 1 {
1639            return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1640        }
1641    }
1642
1643    Ok(())
1644}
1645
1646#[cfg(test)]
1647mod tests {
1648    use std::{
1649        path::PathBuf,
1650        sync::{
1651            LazyLock,
1652            atomic::{AtomicU32, Ordering::SeqCst},
1653        },
1654    };
1655
1656    use assert_matches::assert_matches;
1657    use matrix_sdk_base::{
1658        event_cache::store::{
1659            EventCacheStore, EventCacheStoreError, IntoEventCacheStore,
1660            integration_tests::{EventCacheStoreIntegrationTests, make_test_event_with_event_id},
1661        },
1662        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1663        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1664    };
1665    use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1666    use ruma::event_id;
1667    use tempfile::{TempDir, tempdir};
1668
1669    use super::SqliteEventCacheStore;
1670    use crate::{
1671        SqliteStoreConfig,
1672        event_cache_store::keys,
1673        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1674    };
1675
1676    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1677    static NUM: AtomicU32 = AtomicU32::new(0);
1678
1679    fn new_event_cache_store_workspace() -> PathBuf {
1680        let name = NUM.fetch_add(1, SeqCst).to_string();
1681        TMP_DIR.path().join(name)
1682    }
1683
1684    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1685        let tmpdir_path = new_event_cache_store_workspace();
1686
1687        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1688
1689        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1690    }
1691
1692    event_cache_store_integration_tests!();
1693    event_cache_store_integration_tests_time!();
1694
1695    #[async_test]
1696    async fn test_pool_size() {
1697        let tmpdir_path = new_event_cache_store_workspace();
1698        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1699
1700        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1701
1702        assert_eq!(store.pool.status().max_size, 42);
1703    }
1704
1705    #[async_test]
1706    async fn test_linked_chunk_remove_chunk() {
1707        let store = get_event_cache_store().await.expect("creating cache store failed");
1708
1709        // Run corresponding integration test
1710        store.clone().into_event_cache_store().test_linked_chunk_remove_chunk().await;
1711
1712        // Check that cascading worked. Yes, SQLite, I doubt you.
1713        let gaps = store
1714            .read()
1715            .await
1716            .unwrap()
1717            .with_transaction(|txn| -> rusqlite::Result<_> {
1718                let mut gaps = Vec::new();
1719                for data in txn
1720                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1721                    .query_map((), |row| row.get::<_, u64>(0))?
1722                {
1723                    gaps.push(data?);
1724                }
1725                Ok(gaps)
1726            })
1727            .await
1728            .unwrap();
1729
1730        // Check that the gaps match those set up in the corresponding integration test
1731        // above
1732        assert_eq!(gaps, vec![42, 44]);
1733    }
1734
1735    #[async_test]
1736    async fn test_linked_chunk_remove_item() {
1737        let store = get_event_cache_store().await.expect("creating cache store failed");
1738
1739        // Run corresponding integration test
1740        store.clone().into_event_cache_store().test_linked_chunk_remove_item().await;
1741
1742        let room_id = *DEFAULT_TEST_ROOM_ID;
1743        let linked_chunk_id = LinkedChunkId::Room(room_id);
1744
1745        // Make sure the position have been updated for the remaining events.
1746        let num_rows: u64 = store
1747            .read()
1748            .await
1749            .unwrap()
1750            .with_transaction(move |txn| {
1751                txn.query_row(
1752                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1753                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1754                    |row| row.get(0),
1755                )
1756            })
1757            .await
1758            .unwrap();
1759        assert_eq!(num_rows, 3);
1760    }
1761
1762    #[async_test]
1763    async fn test_linked_chunk_clear() {
1764        let store = get_event_cache_store().await.expect("creating cache store failed");
1765
1766        // Run corresponding integration test
1767        store.clone().into_event_cache_store().test_linked_chunk_clear().await;
1768
1769        // Check that cascading worked. Yes, SQLite, I doubt you.
1770        store
1771            .read()
1772            .await
1773            .unwrap()
1774            .with_transaction(|txn| -> rusqlite::Result<_> {
1775                let num_gaps = txn
1776                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
1777                    .query_row((), |row| row.get::<_, u64>(0))?;
1778                assert_eq!(num_gaps, 0);
1779
1780                let num_events = txn
1781                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
1782                    .query_row((), |row| row.get::<_, u64>(0))?;
1783                assert_eq!(num_events, 0);
1784
1785                Ok(())
1786            })
1787            .await
1788            .unwrap();
1789    }
1790
1791    #[async_test]
1792    async fn test_linked_chunk_update_is_a_transaction() {
1793        let store = get_event_cache_store().await.expect("creating cache store failed");
1794
1795        let room_id = *DEFAULT_TEST_ROOM_ID;
1796        let linked_chunk_id = LinkedChunkId::Room(room_id);
1797
1798        // Trigger a violation of the unique constraint on the (room id, chunk id)
1799        // couple.
1800        let err = store
1801            .handle_linked_chunk_updates(
1802                linked_chunk_id,
1803                vec![
1804                    Update::NewItemsChunk {
1805                        previous: None,
1806                        new: ChunkIdentifier::new(42),
1807                        next: None,
1808                    },
1809                    Update::NewItemsChunk {
1810                        previous: None,
1811                        new: ChunkIdentifier::new(42),
1812                        next: None,
1813                    },
1814                ],
1815            )
1816            .await
1817            .unwrap_err();
1818
1819        // The operation fails with a constraint violation error.
1820        assert_matches!(err, crate::error::Error::Sqlite(err) => {
1821            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1822        });
1823
1824        // If the updates have been handled transactionally, then no new chunks should
1825        // have been added; failure of the second update leads to the first one being
1826        // rolled back.
1827        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1828        assert!(chunks.is_empty());
1829    }
1830
1831    #[async_test]
1832    async fn test_event_chunks_allows_same_event_in_room_and_thread() {
1833        // This test verifies that the same event can appear in both a room's linked
1834        // chunk and a thread's linked chunk. This is the real-world use case:
1835        // a thread reply appears in both the main room timeline and the thread.
1836
1837        let store = get_event_cache_store().await.expect("creating cache store failed");
1838
1839        let room_id = *DEFAULT_TEST_ROOM_ID;
1840        let thread_root = event_id!("$thread_root");
1841
1842        // Create an event that will be inserted into both the room and thread linked
1843        // chunks.
1844        let event = make_test_event_with_event_id(
1845            room_id,
1846            "thread reply",
1847            Some(event_id!("$thread_reply")),
1848        );
1849
1850        let room_linked_chunk_id = LinkedChunkId::Room(room_id);
1851        let thread_linked_chunk_id = LinkedChunkId::Thread(room_id, thread_root);
1852
1853        // Insert the event into the room's linked chunk.
1854        store
1855            .handle_linked_chunk_updates(
1856                room_linked_chunk_id,
1857                vec![
1858                    Update::NewItemsChunk {
1859                        previous: None,
1860                        new: ChunkIdentifier::new(1),
1861                        next: None,
1862                    },
1863                    Update::PushItems {
1864                        at: Position::new(ChunkIdentifier::new(1), 0),
1865                        items: vec![event.clone()],
1866                    },
1867                ],
1868            )
1869            .await
1870            .unwrap();
1871
1872        // Insert the same event into the thread's linked chunk.
1873        store
1874            .handle_linked_chunk_updates(
1875                thread_linked_chunk_id,
1876                vec![
1877                    Update::NewItemsChunk {
1878                        previous: None,
1879                        new: ChunkIdentifier::new(1),
1880                        next: None,
1881                    },
1882                    Update::PushItems {
1883                        at: Position::new(ChunkIdentifier::new(1), 0),
1884                        items: vec![event],
1885                    },
1886                ],
1887            )
1888            .await
1889            .unwrap();
1890
1891        // Verify both entries exist by loading chunks from both linked chunk IDs.
1892        let room_chunks = store.load_all_chunks(room_linked_chunk_id).await.unwrap();
1893        let thread_chunks = store.load_all_chunks(thread_linked_chunk_id).await.unwrap();
1894
1895        assert_eq!(room_chunks.len(), 1);
1896        assert_eq!(thread_chunks.len(), 1);
1897
1898        // Verify the event is in both.
1899        assert_matches!(&room_chunks[0].content, ChunkContent::Items(events) => {
1900            assert_eq!(events.len(), 1);
1901            assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1902        });
1903        assert_matches!(&thread_chunks[0].content, ChunkContent::Items(events) => {
1904            assert_eq!(events.len(), 1);
1905            assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1906        });
1907    }
1908}
1909
1910#[cfg(test)]
1911mod encrypted_tests {
1912    use std::sync::{
1913        LazyLock,
1914        atomic::{AtomicU32, Ordering::SeqCst},
1915    };
1916
1917    use matrix_sdk_base::{
1918        event_cache::store::{EventCacheStore, EventCacheStoreError},
1919        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1920    };
1921    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1922    use ruma::{
1923        event_id,
1924        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1925        room_id, user_id,
1926    };
1927    use tempfile::{TempDir, tempdir};
1928
1929    use super::SqliteEventCacheStore;
1930
1931    static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1932    static NUM: AtomicU32 = AtomicU32::new(0);
1933
1934    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1935        let name = NUM.fetch_add(1, SeqCst).to_string();
1936        let tmpdir_path = TMP_DIR.path().join(name);
1937
1938        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1939
1940        Ok(SqliteEventCacheStore::open(
1941            tmpdir_path.to_str().unwrap(),
1942            Some("default_test_password"),
1943        )
1944        .await
1945        .unwrap())
1946    }
1947
1948    event_cache_store_integration_tests!();
1949    event_cache_store_integration_tests_time!();
1950
1951    #[async_test]
1952    async fn test_no_sqlite_injection_in_find_event_relations() {
1953        let room_id = room_id!("!test:localhost");
1954        let another_room_id = room_id!("!r1:matrix.org");
1955        let sender = user_id!("@alice:localhost");
1956
1957        let store = get_event_cache_store()
1958            .await
1959            .expect("We should be able to create a new, empty, event cache store");
1960
1961        let f = EventFactory::new().room(room_id).sender(sender);
1962
1963        // Create an event for the first room.
1964        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
1965        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
1966
1967        // Create a related event.
1968        let edit_id = event_id!("$find_me:matrix.org");
1969        let edit = f
1970            .text_msg("Find me")
1971            .event_id(edit_id)
1972            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
1973            .into_event();
1974
1975        // Create an event for the second room.
1976        let f = f.room(another_room_id);
1977
1978        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
1979        let another_event =
1980            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
1981
1982        // Save the events in the DB.
1983        store.save_event(room_id, event).await.unwrap();
1984        store.save_event(room_id, edit).await.unwrap();
1985        store.save_event(another_room_id, another_event).await.unwrap();
1986
1987        // Craft a `RelationType` that will inject some SQL to be executed. The
1988        // `OR 1=1` ensures that all the previous parameters, the room
1989        // ID and event ID are ignored.
1990        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
1991
1992        // Attempt to find events in the first room.
1993        let results = store
1994            .find_event_relations(room_id, event_id, filter.as_deref())
1995            .await
1996            .expect("We should be able to attempt to find event relations");
1997
1998        // Ensure that we only got the single related event the first room contains.
1999        similar_asserts::assert_eq!(
2000            results.len(),
2001            1,
2002            "We should only have loaded events for the first room {results:#?}"
2003        );
2004
2005        // The event needs to be the edit event, otherwise something is wrong.
2006        let (found_event, _) = &results[0];
2007        assert_eq!(
2008            found_event.event_id().as_deref(),
2009            Some(edit_id),
2010            "The single event we found should be the edit event"
2011        );
2012    }
2013}