matrix_sdk_sqlite/
event_cache_store.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An SQLite-based backend for the [`EventCacheStore`].
16
17use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21    cross_process_lock::CrossProcessLockGeneration,
22    deserialized_responses::TimelineEvent,
23    event_cache::{
24        Event, Gap,
25        store::{EventCacheStore, extract_event_relation},
26    },
27    linked_chunk::{
28        ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29        Position, RawChunk, Update,
30    },
31    timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
36};
37use rusqlite::{
38    OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
39};
40use tokio::{
41    fs,
42    sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47    OpenStoreError, Secret, SqliteStoreConfig,
48    connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49    error::{Error, Result},
50    utils::{
51        EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52        SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
53    },
54};
55
56mod keys {
57    // Tables
58    pub const LINKED_CHUNKS: &str = "linked_chunks";
59    pub const EVENTS: &str = "events";
60}
61
62/// The database name.
63const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65/// Identifier of the latest database version.
66///
67/// This is used to figure whether the SQLite database requires a migration.
68/// Every new SQL migration should imply a bump of this number, and changes in
69/// the [`run_migrations`] function.
70const DATABASE_VERSION: u8 = 14;
71
72/// The string used to identify a chunk of type events, in the `type` field in
73/// the database.
74const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75/// The string used to identify a chunk of type gap, in the `type` field in the
76/// database.
77const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79/// An SQLite-based event cache store.
80#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82    store_cipher: Option<Arc<StoreCipher>>,
83
84    /// The pool of connections.
85    pool: SqlitePool,
86
87    /// We make the difference between connections for read operations, and for
88    /// write operations. We keep a single connection apart from write
89    /// operations. All other connections are used for read operations. The
90    /// lock is used to ensure there is one owner at a time.
91    write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97        f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98    }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102    fn get_cypher(&self) -> Option<&StoreCipher> {
103        self.store_cipher.as_deref()
104    }
105}
106
107impl SqliteEventCacheStore {
108    /// Open the SQLite-based event cache store at the given path using the
109    /// given passphrase to encrypt private data.
110    pub async fn open(
111        path: impl AsRef<Path>,
112        passphrase: Option<&str>,
113    ) -> Result<Self, OpenStoreError> {
114        Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115    }
116
117    /// Open the SQLite-based event cache store at the given path using the
118    /// given key to encrypt private data.
119    pub async fn open_with_key(
120        path: impl AsRef<Path>,
121        key: Option<&[u8; 32]>,
122    ) -> Result<Self, OpenStoreError> {
123        Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124    }
125
126    /// Open the SQLite-based event cache store with the config open config.
127    #[instrument(skip(config), fields(path = ?config.path))]
128    pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129        debug!(?config);
130
131        let _timer = timer!("open_with_config");
132
133        fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135        let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137        let this = Self::open_with_pool(pool, config.secret).await?;
138        this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140        Ok(this)
141    }
142
143    /// Open an SQLite-based event cache store using the given SQLite database
144    /// pool. The given secret will be used to encrypt private data.
145    async fn open_with_pool(
146        pool: SqlitePool,
147        secret: Option<Secret>,
148    ) -> Result<Self, OpenStoreError> {
149        let conn = pool.get().await?;
150
151        let version = conn.db_version().await?;
152
153        run_migrations(&conn, version).await?;
154
155        conn.wal_checkpoint().await;
156
157        let store_cipher = match secret {
158            Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
159            None => None,
160        };
161
162        Ok(Self {
163            store_cipher,
164            pool,
165            // Use `conn` as our selected write connections.
166            write_connection: Arc::new(Mutex::new(conn)),
167        })
168    }
169
170    /// Acquire a connection for executing read operations.
171    #[instrument(skip_all)]
172    async fn read(&self) -> Result<SqliteAsyncConn> {
173        let connection = self.pool.get().await?;
174
175        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
176        // support must be enabled on a per-connection basis. Execute it every
177        // time we try to get a connection, since we can't guarantee a previous
178        // connection did enable it before.
179        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
180
181        Ok(connection)
182    }
183
184    /// Acquire a connection for executing write operations.
185    #[instrument(skip_all)]
186    async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
187        let connection = self.write_connection.clone().lock_owned().await;
188
189        // Per https://www.sqlite.org/foreignkeys.html#fk_enable, foreign key
190        // support must be enabled on a per-connection basis. Execute it every
191        // time we try to get a connection, since we can't guarantee a previous
192        // connection did enable it before.
193        connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
194
195        Ok(connection)
196    }
197
198    fn map_row_to_chunk(
199        row: &rusqlite::Row<'_>,
200    ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
201        Ok((
202            row.get::<_, u64>(0)?,
203            row.get::<_, Option<u64>>(1)?,
204            row.get::<_, Option<u64>>(2)?,
205            row.get::<_, String>(3)?,
206        ))
207    }
208
209    fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
210        let serialized = serde_json::to_vec(event)?;
211
212        // Extract the relationship info here.
213        let raw_event = event.raw();
214        let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
215
216        // The content may be encrypted.
217        let content = self.encode_value(serialized)?;
218
219        Ok(EncodedEvent {
220            content,
221            rel_type,
222            relates_to: relates_to.map(|relates_to| relates_to.to_string()),
223        })
224    }
225
226    pub async fn vacuum(&self) -> Result<()> {
227        self.write_connection.lock().await.vacuum().await
228    }
229
230    async fn get_db_size(&self) -> Result<Option<usize>> {
231        Ok(Some(self.pool.get().await?.get_db_size().await?))
232    }
233}
234
235struct EncodedEvent {
236    content: Vec<u8>,
237    rel_type: Option<String>,
238    relates_to: Option<String>,
239}
240
241trait TransactionExtForLinkedChunks {
242    fn rebuild_chunk(
243        &self,
244        store: &SqliteEventCacheStore,
245        linked_chunk_id: &Key,
246        previous: Option<u64>,
247        index: u64,
248        next: Option<u64>,
249        chunk_type: &str,
250    ) -> Result<RawChunk<Event, Gap>>;
251
252    fn load_gap_content(
253        &self,
254        store: &SqliteEventCacheStore,
255        linked_chunk_id: &Key,
256        chunk_id: ChunkIdentifier,
257    ) -> Result<Gap>;
258
259    fn load_events_content(
260        &self,
261        store: &SqliteEventCacheStore,
262        linked_chunk_id: &Key,
263        chunk_id: ChunkIdentifier,
264    ) -> Result<Vec<Event>>;
265}
266
267impl TransactionExtForLinkedChunks for Transaction<'_> {
268    fn rebuild_chunk(
269        &self,
270        store: &SqliteEventCacheStore,
271        linked_chunk_id: &Key,
272        previous: Option<u64>,
273        id: u64,
274        next: Option<u64>,
275        chunk_type: &str,
276    ) -> Result<RawChunk<Event, Gap>> {
277        let previous = previous.map(ChunkIdentifier::new);
278        let next = next.map(ChunkIdentifier::new);
279        let id = ChunkIdentifier::new(id);
280
281        match chunk_type {
282            CHUNK_TYPE_GAP_TYPE_STRING => {
283                // It's a gap!
284                let gap = self.load_gap_content(store, linked_chunk_id, id)?;
285                Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
286            }
287
288            CHUNK_TYPE_EVENT_TYPE_STRING => {
289                // It's events!
290                let events = self.load_events_content(store, linked_chunk_id, id)?;
291                Ok(RawChunk {
292                    content: ChunkContent::Items(events),
293                    previous,
294                    identifier: id,
295                    next,
296                })
297            }
298
299            other => {
300                // It's an error!
301                Err(Error::InvalidData {
302                    details: format!("a linked chunk has an unknown type {other}"),
303                })
304            }
305        }
306    }
307
308    fn load_gap_content(
309        &self,
310        store: &SqliteEventCacheStore,
311        linked_chunk_id: &Key,
312        chunk_id: ChunkIdentifier,
313    ) -> Result<Gap> {
314        // There's at most one row for it in the database, so a call to `query_row` is
315        // sufficient.
316        let encoded_prev_token: Vec<u8> = self.query_row(
317            "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
318            (chunk_id.index(), &linked_chunk_id),
319            |row| row.get(0),
320        )?;
321        let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
322        let prev_token = serde_json::from_slice(&prev_token_bytes)?;
323        Ok(Gap { prev_token })
324    }
325
326    fn load_events_content(
327        &self,
328        store: &SqliteEventCacheStore,
329        linked_chunk_id: &Key,
330        chunk_id: ChunkIdentifier,
331    ) -> Result<Vec<Event>> {
332        // Retrieve all the events from the database.
333        let mut events = Vec::new();
334
335        for event_data in self
336            .prepare(
337                r#"
338                    SELECT events.content
339                    FROM event_chunks ec, events
340                    WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
341                    ORDER BY ec.position ASC
342                "#,
343            )?
344            .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
345        {
346            let encoded_content = event_data?;
347            let serialized_content = store.decode_value(&encoded_content)?;
348            let event = serde_json::from_slice(&serialized_content)?;
349
350            events.push(event);
351        }
352
353        Ok(events)
354    }
355}
356
357/// Run migrations for the given version of the database.
358async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
359    if version == 0 {
360        debug!("Creating database");
361    } else if version < DATABASE_VERSION {
362        debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
363    } else {
364        return Ok(());
365    }
366
367    // Always enable foreign keys for the current connection.
368    conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
369
370    if version < 1 {
371        // First turn on WAL mode, this can't be done in the transaction, it fails with
372        // the error message: "cannot change into wal mode from within a transaction".
373        conn.execute_batch("PRAGMA journal_mode = wal;").await?;
374        conn.with_transaction(|txn| {
375            txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
376            txn.set_db_version(1)
377        })
378        .await?;
379    }
380
381    if version < 2 {
382        conn.with_transaction(|txn| {
383            txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
384            txn.set_db_version(2)
385        })
386        .await?;
387    }
388
389    if version < 3 {
390        conn.with_transaction(|txn| {
391            txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
392            txn.set_db_version(3)
393        })
394        .await?;
395    }
396
397    if version < 4 {
398        conn.with_transaction(|txn| {
399            txn.execute_batch(include_str!(
400                "../migrations/event_cache_store/004_ignore_policy.sql"
401            ))?;
402            txn.set_db_version(4)
403        })
404        .await?;
405    }
406
407    if version < 5 {
408        conn.with_transaction(|txn| {
409            txn.execute_batch(include_str!(
410                "../migrations/event_cache_store/005_events_index_on_event_id.sql"
411            ))?;
412            txn.set_db_version(5)
413        })
414        .await?;
415    }
416
417    if version < 6 {
418        conn.with_transaction(|txn| {
419            txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
420            txn.set_db_version(6)
421        })
422        .await?;
423    }
424
425    if version < 7 {
426        conn.with_transaction(|txn| {
427            txn.execute_batch(include_str!(
428                "../migrations/event_cache_store/007_event_chunks.sql"
429            ))?;
430            txn.set_db_version(7)
431        })
432        .await?;
433    }
434
435    if version < 8 {
436        conn.with_transaction(|txn| {
437            txn.execute_batch(include_str!(
438                "../migrations/event_cache_store/008_linked_chunk_id.sql"
439            ))?;
440            txn.set_db_version(8)
441        })
442        .await?;
443    }
444
445    if version < 9 {
446        conn.with_transaction(|txn| {
447            txn.execute_batch(include_str!(
448                "../migrations/event_cache_store/009_related_event_index.sql"
449            ))?;
450            txn.set_db_version(9)
451        })
452        .await?;
453    }
454
455    if version < 10 {
456        conn.with_transaction(|txn| {
457            txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
458            txn.set_db_version(10)
459        })
460        .await?;
461
462        if version >= 1 {
463            // Defragment the DB and optimize its size on the filesystem now that we removed
464            // the media cache.
465            conn.vacuum().await?;
466        }
467    }
468
469    if version < 11 {
470        conn.with_transaction(|txn| {
471            txn.execute_batch(include_str!(
472                "../migrations/event_cache_store/011_empty_event_cache.sql"
473            ))?;
474            txn.set_db_version(11)
475        })
476        .await?;
477    }
478
479    if version < 12 {
480        conn.with_transaction(|txn| {
481            txn.execute_batch(include_str!(
482                "../migrations/event_cache_store/012_store_event_type.sql"
483            ))?;
484            txn.set_db_version(12)
485        })
486        .await?;
487    }
488
489    if version < 13 {
490        conn.with_transaction(|txn| {
491            txn.execute_batch(include_str!(
492                "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
493            ))?;
494            txn.set_db_version(13)
495        })
496        .await?;
497    }
498
499    if version < 14 {
500        conn.with_transaction(|txn| {
501            txn.execute_batch(include_str!(
502                "../migrations/event_cache_store/014_event_chunks_event_id_index.sql"
503            ))?;
504            txn.set_db_version(14)
505        })
506        .await?;
507    }
508
509    Ok(())
510}
511
512#[async_trait]
513impl EventCacheStore for SqliteEventCacheStore {
514    type Error = Error;
515
516    #[instrument(skip(self))]
517    async fn try_take_leased_lock(
518        &self,
519        lease_duration_ms: u32,
520        key: &str,
521        holder: &str,
522    ) -> Result<Option<CrossProcessLockGeneration>> {
523        let key = key.to_owned();
524        let holder = holder.to_owned();
525
526        let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
527        let expiration = now + lease_duration_ms as u64;
528
529        // Learn about the `excluded` keyword in https://sqlite.org/lang_upsert.html.
530        let generation = self
531            .write()
532            .await?
533            .with_transaction(move |txn| {
534                txn.query_row(
535                    "INSERT INTO lease_locks (key, holder, expiration)
536                    VALUES (?1, ?2, ?3)
537                    ON CONFLICT (key)
538                    DO
539                        UPDATE SET
540                            holder = excluded.holder,
541                            expiration = excluded.expiration,
542                            generation =
543                                CASE holder
544                                    WHEN excluded.holder THEN generation
545                                    ELSE generation + 1
546                                END
547                        WHERE
548                            holder = excluded.holder
549                            OR expiration < ?4
550                    RETURNING generation
551                    ",
552                    (key, holder, expiration, now),
553                    |row| row.get(0),
554                )
555                .optional()
556            })
557            .await?;
558
559        Ok(generation)
560    }
561
562    #[instrument(skip(self, updates))]
563    async fn handle_linked_chunk_updates(
564        &self,
565        linked_chunk_id: LinkedChunkId<'_>,
566        updates: Vec<Update<Event, Gap>>,
567    ) -> Result<(), Self::Error> {
568        let _timer = timer!("method");
569
570        // Use a single transaction throughout this function, so that either all updates
571        // work, or none is taken into account.
572        let hashed_linked_chunk_id =
573            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
574        let linked_chunk_id = linked_chunk_id.to_owned();
575        let this = self.clone();
576
577        with_immediate_transaction(self, move |txn| {
578            for up in updates {
579                match up {
580                    Update::NewItemsChunk { previous, new, next } => {
581                        let previous = previous.as_ref().map(ChunkIdentifier::index);
582                        let new = new.index();
583                        let next = next.as_ref().map(ChunkIdentifier::index);
584
585                        trace!(
586                            %linked_chunk_id,
587                            "new events chunk (prev={previous:?}, i={new}, next={next:?})",
588                        );
589
590                        insert_chunk(
591                            txn,
592                            &hashed_linked_chunk_id,
593                            previous,
594                            new,
595                            next,
596                            CHUNK_TYPE_EVENT_TYPE_STRING,
597                        )?;
598                    }
599
600                    Update::NewGapChunk { previous, new, next, gap } => {
601                        let serialized = serde_json::to_vec(&gap.prev_token)?;
602                        let prev_token = this.encode_value(serialized)?;
603
604                        let previous = previous.as_ref().map(ChunkIdentifier::index);
605                        let new = new.index();
606                        let next = next.as_ref().map(ChunkIdentifier::index);
607
608                        trace!(
609                            %linked_chunk_id,
610                            "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
611                        );
612
613                        // Insert the chunk as a gap.
614                        insert_chunk(
615                            txn,
616                            &hashed_linked_chunk_id,
617                            previous,
618                            new,
619                            next,
620                            CHUNK_TYPE_GAP_TYPE_STRING,
621                        )?;
622
623                        // Insert the gap's value.
624                        txn.execute(
625                            r#"
626                            INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
627                            VALUES (?, ?, ?)
628                        "#,
629                            (new, &hashed_linked_chunk_id, prev_token),
630                        )?;
631                    }
632
633                    Update::RemoveChunk(chunk_identifier) => {
634                        let chunk_id = chunk_identifier.index();
635
636                        trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
637
638                        // Find chunk to delete.
639                        let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
640                            "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
641                            (chunk_id, &hashed_linked_chunk_id),
642                            |row| Ok((row.get(0)?, row.get(1)?))
643                        )?;
644
645                        // Replace its previous' next to its own next.
646                        if let Some(previous) = previous {
647                            txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
648                        }
649
650                        // Replace its next' previous to its own previous.
651                        if let Some(next) = next {
652                            txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
653                        }
654
655                        // Now delete it, and let cascading delete corresponding entries in the
656                        // other data tables.
657                        txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
658                    }
659
660                    Update::PushItems { at, items } => {
661                        if items.is_empty() {
662                            // Should never happens, but better be safe.
663                            continue;
664                        }
665
666                        let chunk_id = at.chunk_identifier().index();
667
668                        trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
669
670                        let mut chunk_statement = txn.prepare(
671                            "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
672                        )?;
673
674                        // Note: we use `OR REPLACE` here, because the event might have been
675                        // already inserted in the database. This is the case when an event is
676                        // deduplicated and moved to another position; or because it was inserted
677                        // outside the context of a linked chunk (e.g. pinned event).
678                        let mut content_statement = txn.prepare(
679                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
680                        )?;
681
682                        let invalid_event = |event: TimelineEvent| {
683                            let Some(event_id) = event.event_id() else {
684                                error!(%linked_chunk_id, "Trying to push an event with no ID");
685                                return None;
686                            };
687
688                            let Some(event_type) = event.kind.event_type() else {
689                                error!(%event_id, "Trying to save an event with no event type");
690                                return None;
691                            };
692
693                            Some((event_id.to_string(), event_type, event))
694                        };
695
696                        let room_id = linked_chunk_id.room_id();
697                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
698
699                        for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
700                            // Insert the location information into the database.
701                            let index = at.index() + i;
702                            chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
703
704                            let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
705                            let event_type = this.encode_key(keys::EVENTS, event_type);
706
707                            // Now, insert the event content into the database.
708                            let encoded_event = this.encode_event(&event)?;
709                            content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
710                        }
711                    }
712
713                    Update::ReplaceItem { at, item: event } => {
714                        let chunk_id = at.chunk_identifier().index();
715
716                        let index = at.index();
717
718                        trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
719
720                        // The event id should be the same, but just in case it changed…
721                        let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
722                            error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
723                            continue;
724                        };
725
726                        let Some(event_type) = event.kind.event_type() else {
727                            error!(%event_id, "Trying to save an event with no event type");
728                            continue;
729                        };
730
731                        let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
732                        let event_type = this.encode_key(keys::EVENTS, event_type);
733
734                        // Replace the event's content. Really we'd like to update, but in case the
735                        // event id changed, we are a bit lenient here and will allow an insertion
736                        // of the new event.
737                        let encoded_event = this.encode_event(&event)?;
738                        let room_id = linked_chunk_id.room_id();
739                        let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
740                        txn.execute(
741                            "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
742                            (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
743
744                        // Replace the event id in the linked chunk, in case it changed.
745                        txn.execute(
746                            r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
747                            (event_id, &hashed_linked_chunk_id, chunk_id, index)
748                        )?;
749                    }
750
751                    Update::RemoveItem { at } => {
752                        let chunk_id = at.chunk_identifier().index();
753                        let index = at.index();
754
755                        trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
756
757                        // Remove the entry in the chunk table.
758                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
759
760                        // Decrement the index of each item after the one we are
761                        // going to remove.
762                        //
763                        // Imagine we have the following events:
764                        //
765                        // | event_id | linked_chunk_id | chunk_id | position |
766                        // |----------|-----------------|----------|----------|
767                        // | $ev0     | !r0             | 42       | 0        |
768                        // | $ev1     | !r0             | 42       | 1        |
769                        // | $ev2     | !r0             | 42       | 2        |
770                        // | $ev3     | !r0             | 42       | 3        |
771                        // | $ev4     | !r0             | 42       | 4        |
772                        //
773                        // `$ev2` has been removed, then we end up in this
774                        // state:
775                        //
776                        // | event_id | linked_chunk_id    | chunk_id | position |
777                        // |----------|--------------------|----------|----------|
778                        // | $ev0     | !r0                | 42       | 0        |
779                        // | $ev1     | !r0                | 42       | 1        |
780                        // |          |                    |          |          | <- no more `$ev2`
781                        // | $ev3     | !r0                | 42       | 3        |
782                        // | $ev4     | !r0                | 42       | 4        |
783                        //
784                        // We need to shift the `position` of `$ev3` and `$ev4`
785                        // to `position - 1`, like so:
786                        //
787                        // | event_id | linked_chunk_id | chunk_id | position |
788                        // |----------|-----------------|----------|----------|
789                        // | $ev0     | !r0             | 42       | 0        |
790                        // | $ev1     | !r0             | 42       | 1        |
791                        // | $ev3     | !r0             | 42       | 2        |
792                        // | $ev4     | !r0             | 42       | 3        |
793                        //
794                        // Usually, it boils down to run the following query:
795                        //
796                        // ```sql
797                        // UPDATE event_chunks
798                        // SET position = position - 1
799                        // WHERE position > 2 AND …
800                        // ```
801                        //
802                        // Okay. But `UPDATE` runs on rows in no particular
803                        // order. It means that it can update `$ev4` before
804                        // `$ev3` for example. What happens in this particular
805                        // case? The `position` of `$ev4` becomes `3`, however
806                        // `$ev3` already has `position = 3`. Because there
807                        // is a `UNIQUE` constraint on `(linked_chunk_id, chunk_id,
808                        // position)`, it will result in a constraint violation.
809                        //
810                        // There is **no way** to control the execution order of
811                        // `UPDATE` in SQLite. To persuade yourself, try:
812                        //
813                        // ```sql
814                        // UPDATE event_chunks
815                        // SET position = position - 1
816                        // FROM (
817                        //     SELECT event_id
818                        //     FROM event_chunks
819                        //     WHERE position > 2 AND …
820                        //     ORDER BY position ASC
821                        // ) as ordered
822                        // WHERE event_chunks.event_id = ordered.event_id
823                        // ```
824                        //
825                        // It will fail the same way.
826                        //
827                        // Thus, we have 2 solutions:
828                        //
829                        // 1. Remove the `UNIQUE` constraint,
830                        // 2. Be creative.
831                        //
832                        // The `UNIQUE` constraint is a safe belt. Normally, we
833                        // have `event_cache::Deduplicator` that is responsible
834                        // to ensure there is no duplicated event. However,
835                        // relying on this is “fragile” in the sense it can
836                        // contain bugs. Relying on the `UNIQUE` constraint from
837                        // SQLite is more robust. It's “braces and belt” as we
838                        // say here.
839                        //
840                        // So. We need to be creative.
841                        //
842                        // Many solutions exist. Amongst the most popular, we
843                        // see _dropping and re-creating the index_, which is
844                        // no-go for us, it's too expensive. I (@hywan) have
845                        // adopted the following one:
846                        //
847                        // - Do `position = position - 1` but in the negative
848                        //   space, so `position = -(position - 1)`. A position
849                        //   cannot be negative; we are sure it is unique!
850                        // - Once all candidate rows are updated, do `position =
851                        //   -position` to move back to the positive space.
852                        //
853                        // 'told you it's gonna be creative.
854                        //
855                        // This solution is a hack, **but** it is a small
856                        // number of operations, and we can keep the `UNIQUE`
857                        // constraint in place.
858                        txn.execute(
859                            r#"
860                                UPDATE event_chunks
861                                SET position = -(position - 1)
862                                WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
863                            "#,
864                            (&hashed_linked_chunk_id, chunk_id, index)
865                        )?;
866                        txn.execute(
867                            r#"
868                                UPDATE event_chunks
869                                SET position = -position
870                                WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
871                            "#,
872                            (&hashed_linked_chunk_id, chunk_id)
873                        )?;
874                    }
875
876                    Update::DetachLastItems { at } => {
877                        let chunk_id = at.chunk_identifier().index();
878                        let index = at.index();
879
880                        trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
881
882                        // Remove these entries.
883                        txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
884                    }
885
886                    Update::Clear => {
887                        trace!(%linked_chunk_id, "clearing items");
888
889                        // Remove chunks, and let cascading do its job.
890                        txn.execute(
891                            "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
892                            (&hashed_linked_chunk_id,),
893                        )?;
894                    }
895
896                    Update::StartReattachItems | Update::EndReattachItems => {
897                        // Nothing.
898                    }
899                }
900            }
901
902            Ok(())
903        })
904        .await?;
905
906        Ok(())
907    }
908
909    #[instrument(skip(self))]
910    async fn load_all_chunks(
911        &self,
912        linked_chunk_id: LinkedChunkId<'_>,
913    ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
914        let _timer = timer!("method");
915
916        let hashed_linked_chunk_id =
917            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
918
919        let this = self.clone();
920
921        let result = self
922            .read()
923            .await?
924            .with_transaction(move |txn| -> Result<_> {
925                let mut items = Vec::new();
926
927                // Use `ORDER BY id` to get a deterministic ordering for testing purposes.
928                for data in txn
929                    .prepare(
930                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
931                    )?
932                    .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
933                {
934                    let (id, previous, next, chunk_type) = data?;
935                    let new = txn.rebuild_chunk(
936                        &this,
937                        &hashed_linked_chunk_id,
938                        previous,
939                        id,
940                        next,
941                        chunk_type.as_str(),
942                    )?;
943                    items.push(new);
944                }
945
946                Ok(items)
947            })
948            .await?;
949
950        Ok(result)
951    }
952
953    #[instrument(skip(self))]
954    async fn load_all_chunks_metadata(
955        &self,
956        linked_chunk_id: LinkedChunkId<'_>,
957    ) -> Result<Vec<ChunkMetadata>, Self::Error> {
958        let _timer = timer!("method");
959
960        let hashed_linked_chunk_id =
961            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
962
963        self.read()
964            .await?
965            .with_transaction(move |txn| -> Result<_> {
966                // We want to collect the metadata about each chunk (id, next, previous), and
967                // for event chunks, the number of events in it. For gaps, the
968                // number of events is 0, by convention.
969                //
970                // We've tried different strategies over time:
971                // - use a `LEFT JOIN` + `COUNT`, which was extremely inefficient because it
972                //   caused a full table traversal for each chunk, including for gaps which
973                //   don't have any events. This happened in
974                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5225.
975                // - use a `CASE` statement on the chunk's type: if it's an event chunk, run an
976                //   additional `SELECT` query. It was an immense improvement, but still caused
977                //   one select query per event chunk. This happened in
978                //   https://github.com/matrix-org/matrix-rust-sdk/pull/5411.
979                //
980                // The current solution is to run two queries:
981                // - one to get each chunk and its number of events, by doing a single `SELECT`
982                //   query over the `event_chunks` table, grouping by chunk ids. This gives us a
983                //   list of `(chunk_id, num_events)` pairs, which can be transformed into a
984                //   hashmap.
985                // - one to get each chunk's metadata (id, previous, next, type) from the
986                //   database with a `SELECT`, and then use the hashmap to get the number of
987                //   events.
988                //
989                // This strategy minimizes the number of queries to the database, and keeps them
990                // super simple, while doing a bit more processing here, which is much faster.
991
992                let num_events_by_chunk_ids = txn
993                    .prepare(
994                        r#"
995                            SELECT ec.chunk_id, COUNT(ec.event_id)
996                            FROM event_chunks as ec
997                            WHERE ec.linked_chunk_id = ?
998                            GROUP BY ec.chunk_id
999                        "#,
1000                    )?
1001                    .query_map((&hashed_linked_chunk_id,), |row| {
1002                        Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
1003                    })?
1004                    .collect::<Result<HashMap<_, _>, _>>()?;
1005
1006                txn.prepare(
1007                    r#"
1008                        SELECT
1009                            lc.id,
1010                            lc.previous,
1011                            lc.next,
1012                            lc.type
1013                        FROM linked_chunks as lc
1014                        WHERE lc.linked_chunk_id = ?
1015                        ORDER BY lc.id"#,
1016                )?
1017                .query_map((&hashed_linked_chunk_id,), |row| {
1018                    Ok((
1019                        row.get::<_, u64>(0)?,
1020                        row.get::<_, Option<u64>>(1)?,
1021                        row.get::<_, Option<u64>>(2)?,
1022                        row.get::<_, String>(3)?,
1023                    ))
1024                })?
1025                .map(|data| -> Result<_> {
1026                    let (id, previous, next, chunk_type) = data?;
1027
1028                    // Note: since a gap has 0 events, an alternative could be to *not* retrieve
1029                    // the chunk type, and just let the hashmap lookup fail for gaps. However,
1030                    // benchmarking shows that this is slightly slower than matching the chunk
1031                    // type (around 1%, so in the realm of noise), so we keep the explicit
1032                    // check instead.
1033                    let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1034                        0
1035                    } else {
1036                        num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1037                    };
1038
1039                    Ok(ChunkMetadata {
1040                        identifier: ChunkIdentifier::new(id),
1041                        previous: previous.map(ChunkIdentifier::new),
1042                        next: next.map(ChunkIdentifier::new),
1043                        num_items,
1044                    })
1045                })
1046                .collect::<Result<Vec<_>, _>>()
1047            })
1048            .await
1049    }
1050
1051    #[instrument(skip(self))]
1052    async fn load_last_chunk(
1053        &self,
1054        linked_chunk_id: LinkedChunkId<'_>,
1055    ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1056        let _timer = timer!("method");
1057
1058        let hashed_linked_chunk_id =
1059            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1060
1061        let this = self.clone();
1062
1063        self
1064            .read()
1065            .await?
1066            .with_transaction(move |txn| -> Result<_> {
1067                // Find the latest chunk identifier to generate a `ChunkIdentifierGenerator`, and count the number of chunks.
1068                let (observed_max_identifier, number_of_chunks) = txn
1069                    .prepare(
1070                        "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1071                    )?
1072                    .query_row(
1073                        (&hashed_linked_chunk_id,),
1074                        |row| {
1075                            Ok((
1076                                // Read the `MAX(id)` as an `Option<u64>` instead
1077                                // of `u64` in case the `SELECT` returns nothing.
1078                                // Indeed, if it returns no line, the `MAX(id)` is
1079                                // set to `Null`.
1080                                row.get::<_, Option<u64>>(0)?,
1081                                row.get::<_, u64>(1)?,
1082                            ))
1083                        }
1084                    )?;
1085
1086                let chunk_identifier_generator = match observed_max_identifier {
1087                    Some(max_observed_identifier) => {
1088                        ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1089                            ChunkIdentifier::new(max_observed_identifier)
1090                        )
1091                    },
1092                    None => ChunkIdentifierGenerator::new_from_scratch(),
1093                };
1094
1095                // Find the last chunk.
1096                let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1097                    .prepare(
1098                        "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1099                    )?
1100                    .query_row(
1101                        (&hashed_linked_chunk_id,),
1102                        |row| {
1103                            Ok((
1104                                row.get::<_, u64>(0)?,
1105                                row.get::<_, Option<u64>>(1)?,
1106                                row.get::<_, String>(2)?,
1107                            ))
1108                        }
1109                    )
1110                    .optional()?
1111                else {
1112                    // Chunk is not found and there are zero chunks for this room, this is consistent, all
1113                    // good.
1114                    if number_of_chunks == 0 {
1115                        return Ok((None, chunk_identifier_generator));
1116                    }
1117                    // Chunk is not found **but** there are chunks for this room, this is inconsistent. The
1118                    // linked chunk is malformed.
1119                    //
1120                    // Returning `Ok((None, _))` would be invalid here: we must return an error.
1121                    else {
1122                        return Err(Error::InvalidData {
1123                            details:
1124                                "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1125                                    .to_owned()
1126                            }
1127                        )
1128                    }
1129                };
1130
1131                // Build the chunk.
1132                let last_chunk = txn.rebuild_chunk(
1133                    &this,
1134                    &hashed_linked_chunk_id,
1135                    previous_chunk,
1136                    chunk_identifier,
1137                    None,
1138                    &chunk_type
1139                )?;
1140
1141                Ok((Some(last_chunk), chunk_identifier_generator))
1142            })
1143            .await
1144    }
1145
1146    #[instrument(skip(self))]
1147    async fn load_previous_chunk(
1148        &self,
1149        linked_chunk_id: LinkedChunkId<'_>,
1150        before_chunk_identifier: ChunkIdentifier,
1151    ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1152        let _timer = timer!("method");
1153
1154        let hashed_linked_chunk_id =
1155            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1156
1157        let this = self.clone();
1158
1159        self
1160            .read()
1161            .await?
1162            .with_transaction(move |txn| -> Result<_> {
1163                // Find the chunk before the chunk identified by `before_chunk_identifier`.
1164                let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1165                    .prepare(
1166                        "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1167                    )?
1168                    .query_row(
1169                        (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1170                        |row| {
1171                            Ok((
1172                                row.get::<_, u64>(0)?,
1173                                row.get::<_, Option<u64>>(1)?,
1174                                row.get::<_, Option<u64>>(2)?,
1175                                row.get::<_, String>(3)?,
1176                            ))
1177                        }
1178                    )
1179                    .optional()?
1180                else {
1181                    // Chunk is not found.
1182                    return Ok(None);
1183                };
1184
1185                // Build the chunk.
1186                let last_chunk = txn.rebuild_chunk(
1187                    &this,
1188                    &hashed_linked_chunk_id,
1189                    previous_chunk,
1190                    chunk_identifier,
1191                    next_chunk,
1192                    &chunk_type
1193                )?;
1194
1195                Ok(Some(last_chunk))
1196            })
1197            .await
1198    }
1199
1200    #[instrument(skip(self))]
1201    async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1202        let _timer = timer!("method");
1203
1204        self.write()
1205            .await?
1206            .with_transaction(move |txn| {
1207                // Remove all the chunks, and let cascading do its job.
1208                txn.execute("DELETE FROM linked_chunks", ())?;
1209                // Also clear all the events' contents.
1210                txn.execute("DELETE FROM events", ())
1211            })
1212            .await?;
1213
1214        Ok(())
1215    }
1216
1217    #[instrument(skip(self, events))]
1218    async fn filter_duplicated_events(
1219        &self,
1220        linked_chunk_id: LinkedChunkId<'_>,
1221        events: Vec<OwnedEventId>,
1222    ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1223        let _timer = timer!("method");
1224
1225        // If there's no events for which we want to check duplicates, we can return
1226        // early. It's not only an optimization to do so: it's required, otherwise the
1227        // `repeat_vars` call below will panic.
1228        if events.is_empty() {
1229            return Ok(Vec::new());
1230        }
1231
1232        // Select all events that exist in the store, i.e. the duplicates.
1233        let hashed_linked_chunk_id =
1234            self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1235        let linked_chunk_id = linked_chunk_id.to_owned();
1236
1237        self.read()
1238            .await?
1239            .with_transaction(move |txn| -> Result<_> {
1240                txn.chunk_large_query_over(events, None, move |txn, events| {
1241                    let query = format!(
1242                        r#"
1243                            SELECT event_id, chunk_id, position
1244                            FROM event_chunks
1245                            WHERE linked_chunk_id = ? AND event_id IN ({})
1246                            ORDER BY chunk_id ASC, position ASC
1247                        "#,
1248                        repeat_vars(events.len()),
1249                    );
1250
1251                    let parameters = params_from_iter(
1252                        // parameter for `linked_chunk_id = ?`
1253                        once(
1254                            hashed_linked_chunk_id
1255                                .to_sql()
1256                                // SAFETY: it cannot fail since `Key::to_sql` never fails
1257                                .unwrap(),
1258                        )
1259                        // parameters for `event_id IN (…)`
1260                        .chain(events.iter().map(|event| {
1261                            event
1262                                .as_str()
1263                                .to_sql()
1264                                // SAFETY: it cannot fail since `str::to_sql` never fails
1265                                .unwrap()
1266                        })),
1267                    );
1268
1269                    let mut duplicated_events = Vec::new();
1270
1271                    for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1272                        Ok((
1273                            row.get::<_, String>(0)?,
1274                            row.get::<_, u64>(1)?,
1275                            row.get::<_, usize>(2)?,
1276                        ))
1277                    })? {
1278                        let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1279
1280                        let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1281                            // Normally unreachable, but the event ID has been stored even if it is
1282                            // malformed, let's skip it.
1283                            error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1284                            continue;
1285                        };
1286
1287                        duplicated_events.push((
1288                            duplicated_event,
1289                            Position::new(ChunkIdentifier::new(chunk_identifier), index),
1290                        ));
1291                    }
1292
1293                    Ok(duplicated_events)
1294                })
1295            })
1296            .await
1297    }
1298
1299    #[instrument(skip(self, event_id))]
1300    async fn find_event(
1301        &self,
1302        room_id: &RoomId,
1303        event_id: &EventId,
1304    ) -> Result<Option<Event>, Self::Error> {
1305        let _timer = timer!("method");
1306
1307        let event_id = event_id.to_owned();
1308        let this = self.clone();
1309
1310        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1311
1312        self.read()
1313            .await?
1314            .with_transaction(move |txn| -> Result<_> {
1315                let Some(event) = txn
1316                    .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1317                    .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1318                    .optional()?
1319                else {
1320                    // Event is not found.
1321                    return Ok(None);
1322                };
1323
1324                let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1325
1326                Ok(Some(event))
1327            })
1328            .await
1329    }
1330
1331    #[instrument(skip(self, event_id, filters))]
1332    async fn find_event_relations(
1333        &self,
1334        room_id: &RoomId,
1335        event_id: &EventId,
1336        filters: Option<&[RelationType]>,
1337    ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1338        let _timer = timer!("method");
1339
1340        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1341
1342        let hashed_linked_chunk_id =
1343            self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1344
1345        let event_id = event_id.to_owned();
1346        let filters = filters.map(ToOwned::to_owned);
1347        let store = self.clone();
1348
1349        self.read()
1350            .await?
1351            .with_transaction(move |txn| -> Result<_> {
1352                find_event_relations_transaction(
1353                    store,
1354                    hashed_room_id,
1355                    hashed_linked_chunk_id,
1356                    event_id,
1357                    filters,
1358                    txn,
1359                )
1360            })
1361            .await
1362    }
1363
1364    #[instrument(skip(self))]
1365    async fn get_room_events(
1366        &self,
1367        room_id: &RoomId,
1368        event_type: Option<&str>,
1369        session_id: Option<&str>,
1370    ) -> Result<Vec<Event>, Self::Error> {
1371        let _timer = timer!("method");
1372
1373        let this = self.clone();
1374
1375        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1376        let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1377        let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1378
1379        self.read()
1380            .await?
1381            .with_transaction(move |txn| -> Result<_> {
1382                // I'm not sure why clippy claims that the clones aren't required. The compiler
1383                // tells us that the lifetimes aren't long enough if we remove them. Doesn't matter
1384                // much so let's silence things.
1385                #[allow(clippy::redundant_clone)]
1386                let (query, keys) = match (hashed_event_type, hashed_session_id) {
1387                    (None, None) => {
1388                        ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1389                    }
1390                    (None, Some(session_id)) => (
1391                        "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1392                        params![hashed_room_id, session_id.to_owned()],
1393                    ),
1394                    (Some(event_type), None) => (
1395                        "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1396                        params![hashed_room_id, event_type.to_owned()]
1397                    ),
1398                    (Some(event_type), Some(session_id)) => (
1399                        "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1400                        params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1401                    ),
1402                };
1403
1404                let mut statement = txn.prepare(query)?;
1405                let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1406
1407                let mut events = Vec::new();
1408                for ev in maybe_events {
1409                    let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1410                    events.push(event);
1411                }
1412
1413                Ok(events)
1414            })
1415            .await
1416    }
1417
1418    #[instrument(skip(self, event))]
1419    async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1420        let _timer = timer!("method");
1421
1422        let Some(event_id) = event.event_id() else {
1423            error!("Trying to save an event with no ID");
1424            return Ok(());
1425        };
1426
1427        let Some(event_type) = event.kind.event_type() else {
1428            error!(%event_id, "Trying to save an event with no event type");
1429            return Ok(());
1430        };
1431
1432        let event_type = self.encode_key(keys::EVENTS, event_type);
1433        let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1434
1435        let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1436        let event_id = event_id.to_string();
1437        let encoded_event = self.encode_event(&event)?;
1438
1439        self.write()
1440            .await?
1441            .with_transaction(move |txn| -> Result<_> {
1442                txn.execute(
1443                    "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1444                    (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1445
1446                Ok(())
1447            })
1448            .await
1449    }
1450
1451    async fn optimize(&self) -> Result<(), Self::Error> {
1452        Ok(self.vacuum().await?)
1453    }
1454
1455    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1456        self.get_db_size().await
1457    }
1458}
1459
1460fn find_event_relations_transaction(
1461    store: SqliteEventCacheStore,
1462    hashed_room_id: Key,
1463    hashed_linked_chunk_id: Key,
1464    event_id: OwnedEventId,
1465    filters: Option<Vec<RelationType>>,
1466    txn: &Transaction<'_>,
1467) -> Result<Vec<(Event, Option<Position>)>> {
1468    let get_rows = |row: &rusqlite::Row<'_>| {
1469        Ok((
1470            row.get::<_, Vec<u8>>(0)?,
1471            row.get::<_, Option<u64>>(1)?,
1472            row.get::<_, Option<usize>>(2)?,
1473        ))
1474    };
1475
1476    // Collect related events.
1477    let collect_results = |transaction| {
1478        let mut related = Vec::new();
1479
1480        for result in transaction {
1481            let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1482
1483            let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1484
1485            // Only build the position if both the chunk_id and position were present; in
1486            // theory, they should either be present at the same time, or not at all.
1487            let pos = chunk_id
1488                .zip(index)
1489                .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1490
1491            related.push((event, pos));
1492        }
1493
1494        Ok(related)
1495    };
1496
1497    if let Some(filters) = filters {
1498        let question_marks = repeat_vars(filters.len());
1499        let query = format!(
1500            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1501            FROM events
1502            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1503            WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1504        );
1505
1506        // First the filters need to be stringified; because `.to_sql()` will borrow
1507        // from them, they also need to be stringified onto the stack, so as to
1508        // get a stable address (to avoid returning a temporary reference in the
1509        // map closure below).
1510        let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1511        let filters_params: Vec<_> = filter_strings
1512            .iter()
1513            .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1514            .collect();
1515
1516        let parameters = params_from_iter(
1517            [
1518                hashed_linked_chunk_id.to_sql().expect(
1519                    "We should be able to convert a hashed linked chunk ID to a SQLite value",
1520                ),
1521                event_id
1522                    .as_str()
1523                    .to_sql()
1524                    .expect("We should be able to convert an event ID to a SQLite value"),
1525                hashed_room_id
1526                    .to_sql()
1527                    .expect("We should be able to convert a room ID to a SQLite value"),
1528            ]
1529            .into_iter()
1530            .chain(filters_params),
1531        );
1532
1533        let mut transaction = txn.prepare(&query)?;
1534        let transaction = transaction.query_map(parameters, get_rows)?;
1535
1536        collect_results(transaction)
1537    } else {
1538        let query =
1539            "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1540            FROM events
1541            LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1542            WHERE relates_to = ? AND room_id = ?";
1543        let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1544
1545        let mut transaction = txn.prepare(query)?;
1546        let transaction = transaction.query_map(parameters, get_rows)?;
1547
1548        collect_results(transaction)
1549    }
1550}
1551
1552/// Like `deadpool::managed::Object::with_transaction`, but starts the
1553/// transaction in immediate (write) mode from the beginning, precluding errors
1554/// of the kind SQLITE_BUSY from happening, for transactions that may involve
1555/// both reads and writes, and start with a write.
1556async fn with_immediate_transaction<
1557    T: Send + 'static,
1558    F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1559>(
1560    this: &SqliteEventCacheStore,
1561    f: F,
1562) -> Result<T, Error> {
1563    this.write()
1564        .await?
1565        .interact(move |conn| -> Result<T, Error> {
1566            // Start the transaction in IMMEDIATE mode since all updates may cause writes,
1567            // to avoid read transactions upgrading to write mode and causing
1568            // SQLITE_BUSY errors. See also: https://www.sqlite.org/lang_transaction.html#deferred_immediate_and_exclusive_transactions
1569            conn.set_transaction_behavior(TransactionBehavior::Immediate);
1570
1571            let code = || -> Result<T, Error> {
1572                let txn = conn.transaction()?;
1573                let res = f(&txn)?;
1574                txn.commit()?;
1575                Ok(res)
1576            };
1577
1578            let res = code();
1579
1580            // Reset the transaction behavior to use Deferred, after this transaction has
1581            // been run, whether it was successful or not.
1582            conn.set_transaction_behavior(TransactionBehavior::Deferred);
1583
1584            res
1585        })
1586        .await
1587        // SAFETY: same logic as in [`deadpool::managed::Object::with_transaction`].`
1588        .unwrap()
1589}
1590
1591fn insert_chunk(
1592    txn: &Transaction<'_>,
1593    linked_chunk_id: &Key,
1594    previous: Option<u64>,
1595    new: u64,
1596    next: Option<u64>,
1597    type_str: &str,
1598) -> rusqlite::Result<()> {
1599    // First, insert the new chunk.
1600    txn.execute(
1601        r#"
1602            INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1603            VALUES (?, ?, ?, ?, ?)
1604        "#,
1605        (new, linked_chunk_id, previous, next, type_str),
1606    )?;
1607
1608    // If this chunk has a previous one, update its `next` field.
1609    if let Some(previous) = previous {
1610        let updated = txn.execute(
1611            r#"
1612                UPDATE linked_chunks
1613                SET next = ?
1614                WHERE id = ? AND linked_chunk_id = ?
1615            "#,
1616            (new, previous, linked_chunk_id),
1617        )?;
1618        if updated < 1 {
1619            return Err(rusqlite::Error::QueryReturnedNoRows);
1620        }
1621        if updated > 1 {
1622            return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1623        }
1624    }
1625
1626    // If this chunk has a next one, update its `previous` field.
1627    if let Some(next) = next {
1628        let updated = txn.execute(
1629            r#"
1630                UPDATE linked_chunks
1631                SET previous = ?
1632                WHERE id = ? AND linked_chunk_id = ?
1633            "#,
1634            (new, next, linked_chunk_id),
1635        )?;
1636        if updated < 1 {
1637            return Err(rusqlite::Error::QueryReturnedNoRows);
1638        }
1639        if updated > 1 {
1640            return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1641        }
1642    }
1643
1644    Ok(())
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649    use std::{
1650        path::PathBuf,
1651        sync::atomic::{AtomicU32, Ordering::SeqCst},
1652    };
1653
1654    use assert_matches::assert_matches;
1655    use matrix_sdk_base::{
1656        event_cache::store::{
1657            EventCacheStore, EventCacheStoreError, IntoEventCacheStore,
1658            integration_tests::{EventCacheStoreIntegrationTests, make_test_event_with_event_id},
1659        },
1660        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1661        linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1662    };
1663    use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1664    use once_cell::sync::Lazy;
1665    use ruma::event_id;
1666    use tempfile::{TempDir, tempdir};
1667
1668    use super::SqliteEventCacheStore;
1669    use crate::{
1670        SqliteStoreConfig,
1671        event_cache_store::keys,
1672        utils::{EncryptableStore as _, SqliteAsyncConnExt},
1673    };
1674
1675    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1676    static NUM: AtomicU32 = AtomicU32::new(0);
1677
1678    fn new_event_cache_store_workspace() -> PathBuf {
1679        let name = NUM.fetch_add(1, SeqCst).to_string();
1680        TMP_DIR.path().join(name)
1681    }
1682
1683    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1684        let tmpdir_path = new_event_cache_store_workspace();
1685
1686        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1687
1688        Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1689    }
1690
1691    event_cache_store_integration_tests!();
1692    event_cache_store_integration_tests_time!();
1693
1694    #[async_test]
1695    async fn test_pool_size() {
1696        let tmpdir_path = new_event_cache_store_workspace();
1697        let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1698
1699        let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1700
1701        assert_eq!(store.pool.status().max_size, 42);
1702    }
1703
1704    #[async_test]
1705    async fn test_linked_chunk_remove_chunk() {
1706        let store = get_event_cache_store().await.expect("creating cache store failed");
1707
1708        // Run corresponding integration test
1709        store.clone().into_event_cache_store().test_linked_chunk_remove_chunk().await;
1710
1711        // Check that cascading worked. Yes, SQLite, I doubt you.
1712        let gaps = store
1713            .read()
1714            .await
1715            .unwrap()
1716            .with_transaction(|txn| -> rusqlite::Result<_> {
1717                let mut gaps = Vec::new();
1718                for data in txn
1719                    .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1720                    .query_map((), |row| row.get::<_, u64>(0))?
1721                {
1722                    gaps.push(data?);
1723                }
1724                Ok(gaps)
1725            })
1726            .await
1727            .unwrap();
1728
1729        // Check that the gaps match those set up in the corresponding integration test
1730        // above
1731        assert_eq!(gaps, vec![42, 44]);
1732    }
1733
1734    #[async_test]
1735    async fn test_linked_chunk_remove_item() {
1736        let store = get_event_cache_store().await.expect("creating cache store failed");
1737
1738        // Run corresponding integration test
1739        store.clone().into_event_cache_store().test_linked_chunk_remove_item().await;
1740
1741        let room_id = *DEFAULT_TEST_ROOM_ID;
1742        let linked_chunk_id = LinkedChunkId::Room(room_id);
1743
1744        // Make sure the position have been updated for the remaining events.
1745        let num_rows: u64 = store
1746            .read()
1747            .await
1748            .unwrap()
1749            .with_transaction(move |txn| {
1750                txn.query_row(
1751                    "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1752                    (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1753                    |row| row.get(0),
1754                )
1755            })
1756            .await
1757            .unwrap();
1758        assert_eq!(num_rows, 3);
1759    }
1760
1761    #[async_test]
1762    async fn test_linked_chunk_clear() {
1763        let store = get_event_cache_store().await.expect("creating cache store failed");
1764
1765        // Run corresponding integration test
1766        store.clone().into_event_cache_store().test_linked_chunk_clear().await;
1767
1768        // Check that cascading worked. Yes, SQLite, I doubt you.
1769        store
1770            .read()
1771            .await
1772            .unwrap()
1773            .with_transaction(|txn| -> rusqlite::Result<_> {
1774                let num_gaps = txn
1775                    .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
1776                    .query_row((), |row| row.get::<_, u64>(0))?;
1777                assert_eq!(num_gaps, 0);
1778
1779                let num_events = txn
1780                    .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
1781                    .query_row((), |row| row.get::<_, u64>(0))?;
1782                assert_eq!(num_events, 0);
1783
1784                Ok(())
1785            })
1786            .await
1787            .unwrap();
1788    }
1789
1790    #[async_test]
1791    async fn test_linked_chunk_update_is_a_transaction() {
1792        let store = get_event_cache_store().await.expect("creating cache store failed");
1793
1794        let room_id = *DEFAULT_TEST_ROOM_ID;
1795        let linked_chunk_id = LinkedChunkId::Room(room_id);
1796
1797        // Trigger a violation of the unique constraint on the (room id, chunk id)
1798        // couple.
1799        let err = store
1800            .handle_linked_chunk_updates(
1801                linked_chunk_id,
1802                vec![
1803                    Update::NewItemsChunk {
1804                        previous: None,
1805                        new: ChunkIdentifier::new(42),
1806                        next: None,
1807                    },
1808                    Update::NewItemsChunk {
1809                        previous: None,
1810                        new: ChunkIdentifier::new(42),
1811                        next: None,
1812                    },
1813                ],
1814            )
1815            .await
1816            .unwrap_err();
1817
1818        // The operation fails with a constraint violation error.
1819        assert_matches!(err, crate::error::Error::Sqlite(err) => {
1820            assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1821        });
1822
1823        // If the updates have been handled transactionally, then no new chunks should
1824        // have been added; failure of the second update leads to the first one being
1825        // rolled back.
1826        let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1827        assert!(chunks.is_empty());
1828    }
1829
1830    #[async_test]
1831    async fn test_event_chunks_allows_same_event_in_room_and_thread() {
1832        // This test verifies that the same event can appear in both a room's linked
1833        // chunk and a thread's linked chunk. This is the real-world use case:
1834        // a thread reply appears in both the main room timeline and the thread.
1835
1836        let store = get_event_cache_store().await.expect("creating cache store failed");
1837
1838        let room_id = *DEFAULT_TEST_ROOM_ID;
1839        let thread_root = event_id!("$thread_root");
1840
1841        // Create an event that will be inserted into both the room and thread linked
1842        // chunks.
1843        let event = make_test_event_with_event_id(
1844            room_id,
1845            "thread reply",
1846            Some(event_id!("$thread_reply")),
1847        );
1848
1849        let room_linked_chunk_id = LinkedChunkId::Room(room_id);
1850        let thread_linked_chunk_id = LinkedChunkId::Thread(room_id, thread_root);
1851
1852        // Insert the event into the room's linked chunk.
1853        store
1854            .handle_linked_chunk_updates(
1855                room_linked_chunk_id,
1856                vec![
1857                    Update::NewItemsChunk {
1858                        previous: None,
1859                        new: ChunkIdentifier::new(1),
1860                        next: None,
1861                    },
1862                    Update::PushItems {
1863                        at: Position::new(ChunkIdentifier::new(1), 0),
1864                        items: vec![event.clone()],
1865                    },
1866                ],
1867            )
1868            .await
1869            .unwrap();
1870
1871        // Insert the same event into the thread's linked chunk.
1872        store
1873            .handle_linked_chunk_updates(
1874                thread_linked_chunk_id,
1875                vec![
1876                    Update::NewItemsChunk {
1877                        previous: None,
1878                        new: ChunkIdentifier::new(1),
1879                        next: None,
1880                    },
1881                    Update::PushItems {
1882                        at: Position::new(ChunkIdentifier::new(1), 0),
1883                        items: vec![event],
1884                    },
1885                ],
1886            )
1887            .await
1888            .unwrap();
1889
1890        // Verify both entries exist by loading chunks from both linked chunk IDs.
1891        let room_chunks = store.load_all_chunks(room_linked_chunk_id).await.unwrap();
1892        let thread_chunks = store.load_all_chunks(thread_linked_chunk_id).await.unwrap();
1893
1894        assert_eq!(room_chunks.len(), 1);
1895        assert_eq!(thread_chunks.len(), 1);
1896
1897        // Verify the event is in both.
1898        assert_matches!(&room_chunks[0].content, ChunkContent::Items(events) => {
1899            assert_eq!(events.len(), 1);
1900            assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1901        });
1902        assert_matches!(&thread_chunks[0].content, ChunkContent::Items(events) => {
1903            assert_eq!(events.len(), 1);
1904            assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1905        });
1906    }
1907}
1908
1909#[cfg(test)]
1910mod encrypted_tests {
1911    use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
1912
1913    use matrix_sdk_base::{
1914        event_cache::store::{EventCacheStore, EventCacheStoreError},
1915        event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1916    };
1917    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1918    use once_cell::sync::Lazy;
1919    use ruma::{
1920        event_id,
1921        events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1922        room_id, user_id,
1923    };
1924    use tempfile::{TempDir, tempdir};
1925
1926    use super::SqliteEventCacheStore;
1927
1928    static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1929    static NUM: AtomicU32 = AtomicU32::new(0);
1930
1931    async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1932        let name = NUM.fetch_add(1, SeqCst).to_string();
1933        let tmpdir_path = TMP_DIR.path().join(name);
1934
1935        tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1936
1937        Ok(SqliteEventCacheStore::open(
1938            tmpdir_path.to_str().unwrap(),
1939            Some("default_test_password"),
1940        )
1941        .await
1942        .unwrap())
1943    }
1944
1945    event_cache_store_integration_tests!();
1946    event_cache_store_integration_tests_time!();
1947
1948    #[async_test]
1949    async fn test_no_sqlite_injection_in_find_event_relations() {
1950        let room_id = room_id!("!test:localhost");
1951        let another_room_id = room_id!("!r1:matrix.org");
1952        let sender = user_id!("@alice:localhost");
1953
1954        let store = get_event_cache_store()
1955            .await
1956            .expect("We should be able to create a new, empty, event cache store");
1957
1958        let f = EventFactory::new().room(room_id).sender(sender);
1959
1960        // Create an event for the first room.
1961        let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
1962        let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
1963
1964        // Create a related event.
1965        let edit_id = event_id!("$find_me:matrix.org");
1966        let edit = f
1967            .text_msg("Find me")
1968            .event_id(edit_id)
1969            .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
1970            .into_event();
1971
1972        // Create an event for the second room.
1973        let f = f.room(another_room_id);
1974
1975        let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
1976        let another_event =
1977            f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
1978
1979        // Save the events in the DB.
1980        store.save_event(room_id, event).await.unwrap();
1981        store.save_event(room_id, edit).await.unwrap();
1982        store.save_event(another_room_id, another_event).await.unwrap();
1983
1984        // Craft a `RelationType` that will inject some SQL to be executed. The
1985        // `OR 1=1` ensures that all the previous parameters, the room
1986        // ID and event ID are ignored.
1987        let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
1988
1989        // Attempt to find events in the first room.
1990        let results = store
1991            .find_event_relations(room_id, event_id, filter.as_deref())
1992            .await
1993            .expect("We should be able to attempt to find event relations");
1994
1995        // Ensure that we only got the single related event the first room contains.
1996        similar_asserts::assert_eq!(
1997            results.len(),
1998            1,
1999            "We should only have loaded events for the first room {results:#?}"
2000        );
2001
2002        // The event needs to be the edit event, otherwise something is wrong.
2003        let (found_event, _) = &results[0];
2004        assert_eq!(
2005            found_event.event_id().as_deref(),
2006            Some(edit_id),
2007            "The single event we found should be the edit event"
2008        );
2009    }
2010}