1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21 cross_process_lock::CrossProcessLockGeneration,
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 Event, Gap,
25 store::{EventCacheStore, extract_event_relation},
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
36};
37use rusqlite::{
38 OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
39};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47 OpenStoreError, Secret, SqliteStoreConfig,
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
53 },
54};
55
56mod keys {
57 pub const LINKED_CHUNKS: &str = "linked_chunks";
59 pub const EVENTS: &str = "events";
60}
61
62const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
68const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
71
72#[derive(Clone)]
74pub struct SqliteEventCacheStore {
75 store_cipher: Option<Arc<StoreCipher>>,
76
77 pool: SqlitePool,
79
80 write_connection: Arc<Mutex<SqliteAsyncConn>>,
85}
86
87#[cfg(not(tarpaulin_include))]
88impl fmt::Debug for SqliteEventCacheStore {
89 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
90 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
91 }
92}
93
94impl EncryptableStore for SqliteEventCacheStore {
95 fn get_cypher(&self) -> Option<&StoreCipher> {
96 self.store_cipher.as_deref()
97 }
98}
99
100impl SqliteEventCacheStore {
101 pub async fn open(
104 path: impl AsRef<Path>,
105 passphrase: Option<&str>,
106 ) -> Result<Self, OpenStoreError> {
107 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
108 }
109
110 pub async fn open_with_key(
113 path: impl AsRef<Path>,
114 key: Option<&[u8; 32]>,
115 ) -> Result<Self, OpenStoreError> {
116 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
117 }
118
119 #[instrument(skip(config), fields(path = ?config.path))]
121 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
122 debug!(?config);
123
124 let _timer = timer!("open_with_config");
125
126 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
127
128 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
129
130 let this = Self::open_with_pool(pool, config.secret).await?;
131 this.write().await?.apply_runtime_config(config.runtime_config).await?;
132
133 Ok(this)
134 }
135
136 async fn open_with_pool(
139 pool: SqlitePool,
140 secret: Option<Secret>,
141 ) -> Result<Self, OpenStoreError> {
142 let conn = pool.get().await?;
143
144 let version = conn.db_version().await?;
145
146 run_migrations(&conn, version).await?;
147
148 conn.wal_checkpoint().await;
149
150 let store_cipher = match secret {
151 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
152 None => None,
153 };
154
155 Ok(Self {
156 store_cipher,
157 pool,
158 write_connection: Arc::new(Mutex::new(conn)),
160 })
161 }
162
163 #[instrument(skip_all)]
165 async fn read(&self) -> Result<SqliteAsyncConn> {
166 let connection = self.pool.get().await?;
167
168 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
173
174 Ok(connection)
175 }
176
177 #[instrument(skip_all)]
179 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
180 let connection = self.write_connection.clone().lock_owned().await;
181
182 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
187
188 Ok(connection)
189 }
190
191 fn map_row_to_chunk(
192 row: &rusqlite::Row<'_>,
193 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
194 Ok((
195 row.get::<_, u64>(0)?,
196 row.get::<_, Option<u64>>(1)?,
197 row.get::<_, Option<u64>>(2)?,
198 row.get::<_, String>(3)?,
199 ))
200 }
201
202 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
203 let serialized = serde_json::to_vec(event)?;
204
205 let raw_event = event.raw();
207 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
208
209 let content = self.encode_value(serialized)?;
211
212 Ok(EncodedEvent {
213 content,
214 rel_type,
215 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
216 })
217 }
218
219 pub async fn vacuum(&self) -> Result<()> {
220 self.write_connection.lock().await.vacuum().await
221 }
222
223 async fn get_db_size(&self) -> Result<Option<usize>> {
224 Ok(Some(self.pool.get().await?.get_db_size().await?))
225 }
226}
227
228struct EncodedEvent {
229 content: Vec<u8>,
230 rel_type: Option<String>,
231 relates_to: Option<String>,
232}
233
234trait TransactionExtForLinkedChunks {
235 fn rebuild_chunk(
236 &self,
237 store: &SqliteEventCacheStore,
238 linked_chunk_id: &Key,
239 previous: Option<u64>,
240 index: u64,
241 next: Option<u64>,
242 chunk_type: &str,
243 ) -> Result<RawChunk<Event, Gap>>;
244
245 fn load_gap_content(
246 &self,
247 store: &SqliteEventCacheStore,
248 linked_chunk_id: &Key,
249 chunk_id: ChunkIdentifier,
250 ) -> Result<Gap>;
251
252 fn load_events_content(
253 &self,
254 store: &SqliteEventCacheStore,
255 linked_chunk_id: &Key,
256 chunk_id: ChunkIdentifier,
257 ) -> Result<Vec<Event>>;
258}
259
260impl TransactionExtForLinkedChunks for Transaction<'_> {
261 fn rebuild_chunk(
262 &self,
263 store: &SqliteEventCacheStore,
264 linked_chunk_id: &Key,
265 previous: Option<u64>,
266 id: u64,
267 next: Option<u64>,
268 chunk_type: &str,
269 ) -> Result<RawChunk<Event, Gap>> {
270 let previous = previous.map(ChunkIdentifier::new);
271 let next = next.map(ChunkIdentifier::new);
272 let id = ChunkIdentifier::new(id);
273
274 match chunk_type {
275 CHUNK_TYPE_GAP_TYPE_STRING => {
276 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
278 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
279 }
280
281 CHUNK_TYPE_EVENT_TYPE_STRING => {
282 let events = self.load_events_content(store, linked_chunk_id, id)?;
284 Ok(RawChunk {
285 content: ChunkContent::Items(events),
286 previous,
287 identifier: id,
288 next,
289 })
290 }
291
292 other => {
293 Err(Error::InvalidData {
295 details: format!("a linked chunk has an unknown type {other}"),
296 })
297 }
298 }
299 }
300
301 fn load_gap_content(
302 &self,
303 store: &SqliteEventCacheStore,
304 linked_chunk_id: &Key,
305 chunk_id: ChunkIdentifier,
306 ) -> Result<Gap> {
307 let encoded_prev_token: Vec<u8> = self.query_row(
310 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
311 (chunk_id.index(), &linked_chunk_id),
312 |row| row.get(0),
313 )?;
314 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
315 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
316 Ok(Gap { token: prev_token })
317 }
318
319 fn load_events_content(
320 &self,
321 store: &SqliteEventCacheStore,
322 linked_chunk_id: &Key,
323 chunk_id: ChunkIdentifier,
324 ) -> Result<Vec<Event>> {
325 let mut events = Vec::new();
327
328 for event_data in self
329 .prepare(
330 r#"
331 SELECT events.content
332 FROM event_chunks ec, events
333 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
334 ORDER BY ec.position ASC
335 "#,
336 )?
337 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
338 {
339 let encoded_content = event_data?;
340 let serialized_content = store.decode_value(&encoded_content)?;
341 let event = serde_json::from_slice(&serialized_content)?;
342
343 events.push(event);
344 }
345
346 Ok(events)
347 }
348}
349
350async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
352 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
354
355 if version < 1 {
356 debug!("Creating database");
357 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
360 conn.with_transaction(|txn| {
361 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
362 txn.set_db_version(1)
363 })
364 .await?;
365 }
366
367 if version < 2 {
368 debug!("Upgrading database to version 2");
369 conn.with_transaction(|txn| {
370 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
371 txn.set_db_version(2)
372 })
373 .await?;
374 }
375
376 if version < 3 {
377 debug!("Upgrading database to version 3");
378 conn.with_transaction(|txn| {
379 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
380 txn.set_db_version(3)
381 })
382 .await?;
383 }
384
385 if version < 4 {
386 debug!("Upgrading database to version 4");
387 conn.with_transaction(|txn| {
388 txn.execute_batch(include_str!(
389 "../migrations/event_cache_store/004_ignore_policy.sql"
390 ))?;
391 txn.set_db_version(4)
392 })
393 .await?;
394 }
395
396 if version < 5 {
397 debug!("Upgrading database to version 5");
398 conn.with_transaction(|txn| {
399 txn.execute_batch(include_str!(
400 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
401 ))?;
402 txn.set_db_version(5)
403 })
404 .await?;
405 }
406
407 if version < 6 {
408 debug!("Upgrading database to version 6");
409 conn.with_transaction(|txn| {
410 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
411 txn.set_db_version(6)
412 })
413 .await?;
414 }
415
416 if version < 7 {
417 debug!("Upgrading database to version 7");
418 conn.with_transaction(|txn| {
419 txn.execute_batch(include_str!(
420 "../migrations/event_cache_store/007_event_chunks.sql"
421 ))?;
422 txn.set_db_version(7)
423 })
424 .await?;
425 }
426
427 if version < 8 {
428 debug!("Upgrading database to version 8");
429 conn.with_transaction(|txn| {
430 txn.execute_batch(include_str!(
431 "../migrations/event_cache_store/008_linked_chunk_id.sql"
432 ))?;
433 txn.set_db_version(8)
434 })
435 .await?;
436 }
437
438 if version < 9 {
439 debug!("Upgrading database to version 9");
440 conn.with_transaction(|txn| {
441 txn.execute_batch(include_str!(
442 "../migrations/event_cache_store/009_related_event_index.sql"
443 ))?;
444 txn.set_db_version(9)
445 })
446 .await?;
447 }
448
449 if version < 10 {
450 debug!("Upgrading database to version 10");
451 conn.with_transaction(|txn| {
452 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
453 txn.set_db_version(10)
454 })
455 .await?;
456
457 if version >= 1 {
458 conn.vacuum().await?;
461 }
462 }
463
464 if version < 11 {
465 debug!("Upgrading database to version 11");
466 conn.with_transaction(|txn| {
467 txn.execute_batch(include_str!(
468 "../migrations/event_cache_store/011_empty_event_cache.sql"
469 ))?;
470 txn.set_db_version(11)
471 })
472 .await?;
473 }
474
475 if version < 12 {
476 debug!("Upgrading database to version 12");
477 conn.with_transaction(|txn| {
478 txn.execute_batch(include_str!(
479 "../migrations/event_cache_store/012_store_event_type.sql"
480 ))?;
481 txn.set_db_version(12)
482 })
483 .await?;
484 }
485
486 if version < 13 {
487 debug!("Upgrading database to version 13");
488 conn.with_transaction(|txn| {
489 txn.execute_batch(include_str!(
490 "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
491 ))?;
492 txn.set_db_version(13)
493 })
494 .await?;
495 }
496
497 if version < 14 {
498 debug!("Upgrading database to version 14");
499 conn.with_transaction(|txn| {
500 txn.execute_batch(include_str!(
501 "../migrations/event_cache_store/014_event_chunks_event_id_index.sql"
502 ))?;
503 txn.set_db_version(14)
504 })
505 .await?;
506 }
507
508 Ok(())
509}
510
511#[async_trait]
512impl EventCacheStore for SqliteEventCacheStore {
513 type Error = Error;
514
515 #[instrument(skip(self))]
516 async fn try_take_leased_lock(
517 &self,
518 lease_duration_ms: u32,
519 key: &str,
520 holder: &str,
521 ) -> Result<Option<CrossProcessLockGeneration>> {
522 let key = key.to_owned();
523 let holder = holder.to_owned();
524
525 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
526 let expiration = now + lease_duration_ms as u64;
527
528 let generation = self
530 .write()
531 .await?
532 .with_transaction(move |txn| {
533 txn.query_row(
534 "INSERT INTO lease_locks (key, holder, expiration)
535 VALUES (?1, ?2, ?3)
536 ON CONFLICT (key)
537 DO
538 UPDATE SET
539 holder = excluded.holder,
540 expiration = excluded.expiration,
541 generation =
542 CASE holder
543 WHEN excluded.holder THEN generation
544 ELSE generation + 1
545 END
546 WHERE
547 holder = excluded.holder
548 OR expiration < ?4
549 RETURNING generation
550 ",
551 (key, holder, expiration, now),
552 |row| row.get(0),
553 )
554 .optional()
555 })
556 .await?;
557
558 Ok(generation)
559 }
560
561 #[instrument(skip(self, updates))]
562 async fn handle_linked_chunk_updates(
563 &self,
564 linked_chunk_id: LinkedChunkId<'_>,
565 updates: Vec<Update<Event, Gap>>,
566 ) -> Result<(), Self::Error> {
567 let _timer = timer!("method");
568
569 let hashed_linked_chunk_id =
572 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
573 let linked_chunk_id = linked_chunk_id.to_owned();
574 let this = self.clone();
575
576 with_immediate_transaction(self, move |txn| {
577 for up in updates {
578 match up {
579 Update::NewItemsChunk { previous, new, next } => {
580 let previous = previous.as_ref().map(ChunkIdentifier::index);
581 let new = new.index();
582 let next = next.as_ref().map(ChunkIdentifier::index);
583
584 trace!(
585 %linked_chunk_id,
586 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
587 );
588
589 insert_chunk(
590 txn,
591 &hashed_linked_chunk_id,
592 previous,
593 new,
594 next,
595 CHUNK_TYPE_EVENT_TYPE_STRING,
596 )?;
597 }
598
599 Update::NewGapChunk { previous, new, next, gap } => {
600 let serialized = serde_json::to_vec(&gap.token)?;
601 let prev_token = this.encode_value(serialized)?;
602
603 let previous = previous.as_ref().map(ChunkIdentifier::index);
604 let new = new.index();
605 let next = next.as_ref().map(ChunkIdentifier::index);
606
607 trace!(
608 %linked_chunk_id,
609 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
610 );
611
612 insert_chunk(
614 txn,
615 &hashed_linked_chunk_id,
616 previous,
617 new,
618 next,
619 CHUNK_TYPE_GAP_TYPE_STRING,
620 )?;
621
622 txn.execute(
624 r#"
625 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
626 VALUES (?, ?, ?)
627 "#,
628 (new, &hashed_linked_chunk_id, prev_token),
629 )?;
630 }
631
632 Update::RemoveChunk(chunk_identifier) => {
633 let chunk_id = chunk_identifier.index();
634
635 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
636
637 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
639 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
640 (chunk_id, &hashed_linked_chunk_id),
641 |row| Ok((row.get(0)?, row.get(1)?))
642 )?;
643
644 if let Some(previous) = previous {
646 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
647 }
648
649 if let Some(next) = next {
651 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
652 }
653
654 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
657 }
658
659 Update::PushItems { at, items } => {
660 if items.is_empty() {
661 continue;
663 }
664
665 let chunk_id = at.chunk_identifier().index();
666
667 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
668
669 let mut chunk_statement = txn.prepare(
670 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
671 )?;
672
673 let mut content_statement = txn.prepare(
678 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
679 )?;
680
681 let invalid_event = |event: TimelineEvent| {
682 let Some(event_id) = event.event_id() else {
683 error!(%linked_chunk_id, "Trying to push an event with no ID");
684 return None;
685 };
686
687 let Some(event_type) = event.kind.event_type() else {
688 error!(%event_id, "Trying to save an event with no event type");
689 return None;
690 };
691
692 Some((event_id.to_string(), event_type, event))
693 };
694
695 let room_id = linked_chunk_id.room_id();
696 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
697
698 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
699 let index = at.index() + i;
701 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
702
703 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
704 let event_type = this.encode_key(keys::EVENTS, event_type);
705
706 let encoded_event = this.encode_event(&event)?;
708 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
709 }
710 }
711
712 Update::ReplaceItem { at, item: event } => {
713 let chunk_id = at.chunk_identifier().index();
714
715 let index = at.index();
716
717 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
718
719 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
721 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
722 continue;
723 };
724
725 let Some(event_type) = event.kind.event_type() else {
726 error!(%event_id, "Trying to save an event with no event type");
727 continue;
728 };
729
730 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
731 let event_type = this.encode_key(keys::EVENTS, event_type);
732
733 let encoded_event = this.encode_event(&event)?;
737 let room_id = linked_chunk_id.room_id();
738 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
739 txn.execute(
740 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
741 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
742
743 txn.execute(
745 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
746 (event_id, &hashed_linked_chunk_id, chunk_id, index)
747 )?;
748 }
749
750 Update::RemoveItem { at } => {
751 let chunk_id = at.chunk_identifier().index();
752 let index = at.index();
753
754 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
755
756 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
758
759 txn.execute(
858 r#"
859 UPDATE event_chunks
860 SET position = -(position - 1)
861 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
862 "#,
863 (&hashed_linked_chunk_id, chunk_id, index)
864 )?;
865 txn.execute(
866 r#"
867 UPDATE event_chunks
868 SET position = -position
869 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
870 "#,
871 (&hashed_linked_chunk_id, chunk_id)
872 )?;
873 }
874
875 Update::DetachLastItems { at } => {
876 let chunk_id = at.chunk_identifier().index();
877 let index = at.index();
878
879 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
880
881 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
883 }
884
885 Update::Clear => {
886 trace!(%linked_chunk_id, "clearing items");
887
888 txn.execute(
890 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
891 (&hashed_linked_chunk_id,),
892 )?;
893 }
894
895 Update::StartReattachItems | Update::EndReattachItems => {
896 }
898 }
899 }
900
901 Ok(())
902 })
903 .await?;
904
905 Ok(())
906 }
907
908 #[instrument(skip(self))]
909 async fn load_all_chunks(
910 &self,
911 linked_chunk_id: LinkedChunkId<'_>,
912 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
913 let _timer = timer!("method");
914
915 let hashed_linked_chunk_id =
916 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
917
918 let this = self.clone();
919
920 let result = self
921 .read()
922 .await?
923 .with_transaction(move |txn| -> Result<_> {
924 let mut items = Vec::new();
925
926 for data in txn
928 .prepare(
929 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
930 )?
931 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
932 {
933 let (id, previous, next, chunk_type) = data?;
934 let new = txn.rebuild_chunk(
935 &this,
936 &hashed_linked_chunk_id,
937 previous,
938 id,
939 next,
940 chunk_type.as_str(),
941 )?;
942 items.push(new);
943 }
944
945 Ok(items)
946 })
947 .await?;
948
949 Ok(result)
950 }
951
952 #[instrument(skip(self))]
953 async fn load_all_chunks_metadata(
954 &self,
955 linked_chunk_id: LinkedChunkId<'_>,
956 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
957 let _timer = timer!("method");
958
959 let hashed_linked_chunk_id =
960 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
961
962 self.read()
963 .await?
964 .with_transaction(move |txn| -> Result<_> {
965 let num_events_by_chunk_ids = txn
992 .prepare(
993 r#"
994 SELECT ec.chunk_id, COUNT(ec.event_id)
995 FROM event_chunks as ec
996 WHERE ec.linked_chunk_id = ?
997 GROUP BY ec.chunk_id
998 "#,
999 )?
1000 .query_map((&hashed_linked_chunk_id,), |row| {
1001 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
1002 })?
1003 .collect::<Result<HashMap<_, _>, _>>()?;
1004
1005 txn.prepare(
1006 r#"
1007 SELECT
1008 lc.id,
1009 lc.previous,
1010 lc.next,
1011 lc.type
1012 FROM linked_chunks as lc
1013 WHERE lc.linked_chunk_id = ?
1014 ORDER BY lc.id"#,
1015 )?
1016 .query_map((&hashed_linked_chunk_id,), |row| {
1017 Ok((
1018 row.get::<_, u64>(0)?,
1019 row.get::<_, Option<u64>>(1)?,
1020 row.get::<_, Option<u64>>(2)?,
1021 row.get::<_, String>(3)?,
1022 ))
1023 })?
1024 .map(|data| -> Result<_> {
1025 let (id, previous, next, chunk_type) = data?;
1026
1027 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1033 0
1034 } else {
1035 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1036 };
1037
1038 Ok(ChunkMetadata {
1039 identifier: ChunkIdentifier::new(id),
1040 previous: previous.map(ChunkIdentifier::new),
1041 next: next.map(ChunkIdentifier::new),
1042 num_items,
1043 })
1044 })
1045 .collect::<Result<Vec<_>, _>>()
1046 })
1047 .await
1048 }
1049
1050 #[instrument(skip(self))]
1051 async fn load_last_chunk(
1052 &self,
1053 linked_chunk_id: LinkedChunkId<'_>,
1054 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1055 let _timer = timer!("method");
1056
1057 let hashed_linked_chunk_id =
1058 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1059
1060 let this = self.clone();
1061
1062 self
1063 .read()
1064 .await?
1065 .with_transaction(move |txn| -> Result<_> {
1066 let (observed_max_identifier, number_of_chunks) = txn
1068 .prepare(
1069 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1070 )?
1071 .query_row(
1072 (&hashed_linked_chunk_id,),
1073 |row| {
1074 Ok((
1075 row.get::<_, Option<u64>>(0)?,
1080 row.get::<_, u64>(1)?,
1081 ))
1082 }
1083 )?;
1084
1085 let chunk_identifier_generator = match observed_max_identifier {
1086 Some(max_observed_identifier) => {
1087 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1088 ChunkIdentifier::new(max_observed_identifier)
1089 )
1090 },
1091 None => ChunkIdentifierGenerator::new_from_scratch(),
1092 };
1093
1094 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1096 .prepare(
1097 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1098 )?
1099 .query_row(
1100 (&hashed_linked_chunk_id,),
1101 |row| {
1102 Ok((
1103 row.get::<_, u64>(0)?,
1104 row.get::<_, Option<u64>>(1)?,
1105 row.get::<_, String>(2)?,
1106 ))
1107 }
1108 )
1109 .optional()?
1110 else {
1111 if number_of_chunks == 0 {
1114 return Ok((None, chunk_identifier_generator));
1115 }
1116 else {
1121 return Err(Error::InvalidData {
1122 details:
1123 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1124 .to_owned()
1125 }
1126 )
1127 }
1128 };
1129
1130 let last_chunk = txn.rebuild_chunk(
1132 &this,
1133 &hashed_linked_chunk_id,
1134 previous_chunk,
1135 chunk_identifier,
1136 None,
1137 &chunk_type
1138 )?;
1139
1140 Ok((Some(last_chunk), chunk_identifier_generator))
1141 })
1142 .await
1143 }
1144
1145 #[instrument(skip(self))]
1146 async fn load_previous_chunk(
1147 &self,
1148 linked_chunk_id: LinkedChunkId<'_>,
1149 before_chunk_identifier: ChunkIdentifier,
1150 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1151 let _timer = timer!("method");
1152
1153 let hashed_linked_chunk_id =
1154 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1155
1156 let this = self.clone();
1157
1158 self
1159 .read()
1160 .await?
1161 .with_transaction(move |txn| -> Result<_> {
1162 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1164 .prepare(
1165 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1166 )?
1167 .query_row(
1168 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1169 |row| {
1170 Ok((
1171 row.get::<_, u64>(0)?,
1172 row.get::<_, Option<u64>>(1)?,
1173 row.get::<_, Option<u64>>(2)?,
1174 row.get::<_, String>(3)?,
1175 ))
1176 }
1177 )
1178 .optional()?
1179 else {
1180 return Ok(None);
1182 };
1183
1184 let last_chunk = txn.rebuild_chunk(
1186 &this,
1187 &hashed_linked_chunk_id,
1188 previous_chunk,
1189 chunk_identifier,
1190 next_chunk,
1191 &chunk_type
1192 )?;
1193
1194 Ok(Some(last_chunk))
1195 })
1196 .await
1197 }
1198
1199 #[instrument(skip(self))]
1200 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1201 let _timer = timer!("method");
1202
1203 self.write()
1204 .await?
1205 .with_transaction(move |txn| {
1206 txn.execute("DELETE FROM linked_chunks", ())?;
1208 txn.execute("DELETE FROM events", ())
1210 })
1211 .await?;
1212
1213 Ok(())
1214 }
1215
1216 #[instrument(skip(self, events))]
1217 async fn filter_duplicated_events(
1218 &self,
1219 linked_chunk_id: LinkedChunkId<'_>,
1220 events: Vec<OwnedEventId>,
1221 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1222 let _timer = timer!("method");
1223
1224 if events.is_empty() {
1228 return Ok(Vec::new());
1229 }
1230
1231 let hashed_linked_chunk_id =
1233 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1234 let linked_chunk_id = linked_chunk_id.to_owned();
1235
1236 self.read()
1237 .await?
1238 .with_transaction(move |txn| -> Result<_> {
1239 txn.chunk_large_query_over(events, None, move |txn, events| {
1240 let query = format!(
1241 r#"
1242 SELECT event_id, chunk_id, position
1243 FROM event_chunks
1244 WHERE linked_chunk_id = ? AND event_id IN ({})
1245 ORDER BY chunk_id ASC, position ASC
1246 "#,
1247 repeat_vars(events.len()),
1248 );
1249
1250 let parameters = params_from_iter(
1251 once(
1253 hashed_linked_chunk_id
1254 .to_sql()
1255 .unwrap(),
1257 )
1258 .chain(events.iter().map(|event| {
1260 event
1261 .as_str()
1262 .to_sql()
1263 .unwrap()
1265 })),
1266 );
1267
1268 let mut duplicated_events = Vec::new();
1269
1270 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1271 Ok((
1272 row.get::<_, String>(0)?,
1273 row.get::<_, u64>(1)?,
1274 row.get::<_, usize>(2)?,
1275 ))
1276 })? {
1277 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1278
1279 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1280 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1283 continue;
1284 };
1285
1286 duplicated_events.push((
1287 duplicated_event,
1288 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1289 ));
1290 }
1291
1292 Ok(duplicated_events)
1293 })
1294 })
1295 .await
1296 }
1297
1298 #[instrument(skip(self, event_id))]
1299 async fn find_event(
1300 &self,
1301 room_id: &RoomId,
1302 event_id: &EventId,
1303 ) -> Result<Option<Event>, Self::Error> {
1304 let _timer = timer!("method");
1305
1306 let event_id = event_id.to_owned();
1307 let this = self.clone();
1308
1309 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1310
1311 self.read()
1312 .await?
1313 .with_transaction(move |txn| -> Result<_> {
1314 let Some(event) = txn
1315 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1316 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1317 .optional()?
1318 else {
1319 return Ok(None);
1321 };
1322
1323 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1324
1325 Ok(Some(event))
1326 })
1327 .await
1328 }
1329
1330 #[instrument(skip(self, event_id, filters))]
1331 async fn find_event_relations(
1332 &self,
1333 room_id: &RoomId,
1334 event_id: &EventId,
1335 filters: Option<&[RelationType]>,
1336 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1337 let _timer = timer!("method");
1338
1339 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1340
1341 let hashed_linked_chunk_id =
1342 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1343
1344 let event_id = event_id.to_owned();
1345 let filters = filters.map(ToOwned::to_owned);
1346 let store = self.clone();
1347
1348 self.read()
1349 .await?
1350 .with_transaction(move |txn| -> Result<_> {
1351 find_event_relations_transaction(
1352 store,
1353 hashed_room_id,
1354 hashed_linked_chunk_id,
1355 event_id,
1356 filters,
1357 txn,
1358 )
1359 })
1360 .await
1361 }
1362
1363 #[instrument(skip(self))]
1364 async fn get_room_events(
1365 &self,
1366 room_id: &RoomId,
1367 event_type: Option<&str>,
1368 session_id: Option<&str>,
1369 ) -> Result<Vec<Event>, Self::Error> {
1370 let _timer = timer!("method");
1371
1372 let this = self.clone();
1373
1374 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1375 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1376 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1377
1378 self.read()
1379 .await?
1380 .with_transaction(move |txn| -> Result<_> {
1381 #[allow(clippy::redundant_clone)]
1385 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1386 (None, None) => {
1387 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1388 }
1389 (None, Some(session_id)) => (
1390 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1391 params![hashed_room_id, session_id.to_owned()],
1392 ),
1393 (Some(event_type), None) => (
1394 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1395 params![hashed_room_id, event_type.to_owned()]
1396 ),
1397 (Some(event_type), Some(session_id)) => (
1398 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1399 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1400 ),
1401 };
1402
1403 let mut statement = txn.prepare(query)?;
1404 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1405
1406 let mut events = Vec::new();
1407 for ev in maybe_events {
1408 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1409 events.push(event);
1410 }
1411
1412 Ok(events)
1413 })
1414 .await
1415 }
1416
1417 #[instrument(skip(self, event))]
1418 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1419 let _timer = timer!("method");
1420
1421 let Some(event_id) = event.event_id() else {
1422 error!("Trying to save an event with no ID");
1423 return Ok(());
1424 };
1425
1426 let Some(event_type) = event.kind.event_type() else {
1427 error!(%event_id, "Trying to save an event with no event type");
1428 return Ok(());
1429 };
1430
1431 let event_type = self.encode_key(keys::EVENTS, event_type);
1432 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1433
1434 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1435 let event_id = event_id.to_string();
1436 let encoded_event = self.encode_event(&event)?;
1437
1438 self.write()
1439 .await?
1440 .with_transaction(move |txn| -> Result<_> {
1441 txn.execute(
1442 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1443 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1444
1445 Ok(())
1446 })
1447 .await
1448 }
1449
1450 async fn optimize(&self) -> Result<(), Self::Error> {
1451 Ok(self.vacuum().await?)
1452 }
1453
1454 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1455 self.get_db_size().await
1456 }
1457}
1458
1459fn find_event_relations_transaction(
1460 store: SqliteEventCacheStore,
1461 hashed_room_id: Key,
1462 hashed_linked_chunk_id: Key,
1463 event_id: OwnedEventId,
1464 filters: Option<Vec<RelationType>>,
1465 txn: &Transaction<'_>,
1466) -> Result<Vec<(Event, Option<Position>)>> {
1467 let get_rows = |row: &rusqlite::Row<'_>| {
1468 Ok((
1469 row.get::<_, Vec<u8>>(0)?,
1470 row.get::<_, Option<u64>>(1)?,
1471 row.get::<_, Option<usize>>(2)?,
1472 ))
1473 };
1474
1475 let collect_results = |transaction| {
1477 let mut related = Vec::new();
1478
1479 for result in transaction {
1480 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1481
1482 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1483
1484 let pos = chunk_id
1487 .zip(index)
1488 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1489
1490 related.push((event, pos));
1491 }
1492
1493 Ok(related)
1494 };
1495
1496 if let Some(filters) = filters {
1497 let question_marks = repeat_vars(filters.len());
1498 let query = format!(
1499 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1500 FROM events
1501 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1502 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1503 );
1504
1505 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1510 let filters_params: Vec<_> = filter_strings
1511 .iter()
1512 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1513 .collect();
1514
1515 let parameters = params_from_iter(
1516 [
1517 hashed_linked_chunk_id.to_sql().expect(
1518 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1519 ),
1520 event_id
1521 .as_str()
1522 .to_sql()
1523 .expect("We should be able to convert an event ID to a SQLite value"),
1524 hashed_room_id
1525 .to_sql()
1526 .expect("We should be able to convert a room ID to a SQLite value"),
1527 ]
1528 .into_iter()
1529 .chain(filters_params),
1530 );
1531
1532 let mut transaction = txn.prepare(&query)?;
1533 let transaction = transaction.query_map(parameters, get_rows)?;
1534
1535 collect_results(transaction)
1536 } else {
1537 let query =
1538 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1539 FROM events
1540 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1541 WHERE relates_to = ? AND room_id = ?";
1542 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1543
1544 let mut transaction = txn.prepare(query)?;
1545 let transaction = transaction.query_map(parameters, get_rows)?;
1546
1547 collect_results(transaction)
1548 }
1549}
1550
1551async fn with_immediate_transaction<
1556 T: Send + 'static,
1557 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1558>(
1559 this: &SqliteEventCacheStore,
1560 f: F,
1561) -> Result<T, Error> {
1562 this.write()
1563 .await?
1564 .interact(move |conn| -> Result<T, Error> {
1565 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1569
1570 let code = || -> Result<T, Error> {
1571 let txn = conn.transaction()?;
1572 let res = f(&txn)?;
1573 txn.commit()?;
1574 Ok(res)
1575 };
1576
1577 let res = code();
1578
1579 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1582
1583 res
1584 })
1585 .await
1586 .unwrap()
1588}
1589
1590fn insert_chunk(
1591 txn: &Transaction<'_>,
1592 linked_chunk_id: &Key,
1593 previous: Option<u64>,
1594 new: u64,
1595 next: Option<u64>,
1596 type_str: &str,
1597) -> rusqlite::Result<()> {
1598 txn.execute(
1600 r#"
1601 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1602 VALUES (?, ?, ?, ?, ?)
1603 "#,
1604 (new, linked_chunk_id, previous, next, type_str),
1605 )?;
1606
1607 if let Some(previous) = previous {
1609 let updated = txn.execute(
1610 r#"
1611 UPDATE linked_chunks
1612 SET next = ?
1613 WHERE id = ? AND linked_chunk_id = ?
1614 "#,
1615 (new, previous, linked_chunk_id),
1616 )?;
1617 if updated < 1 {
1618 return Err(rusqlite::Error::QueryReturnedNoRows);
1619 }
1620 if updated > 1 {
1621 return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1622 }
1623 }
1624
1625 if let Some(next) = next {
1627 let updated = txn.execute(
1628 r#"
1629 UPDATE linked_chunks
1630 SET previous = ?
1631 WHERE id = ? AND linked_chunk_id = ?
1632 "#,
1633 (new, next, linked_chunk_id),
1634 )?;
1635 if updated < 1 {
1636 return Err(rusqlite::Error::QueryReturnedNoRows);
1637 }
1638 if updated > 1 {
1639 return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1640 }
1641 }
1642
1643 Ok(())
1644}
1645
1646#[cfg(test)]
1647mod tests {
1648 use std::{
1649 path::PathBuf,
1650 sync::{
1651 LazyLock,
1652 atomic::{AtomicU32, Ordering::SeqCst},
1653 },
1654 };
1655
1656 use assert_matches::assert_matches;
1657 use matrix_sdk_base::{
1658 event_cache::store::{
1659 EventCacheStore, EventCacheStoreError, IntoEventCacheStore,
1660 integration_tests::{EventCacheStoreIntegrationTests, make_test_event_with_event_id},
1661 },
1662 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1663 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1664 };
1665 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1666 use ruma::event_id;
1667 use tempfile::{TempDir, tempdir};
1668
1669 use super::SqliteEventCacheStore;
1670 use crate::{
1671 SqliteStoreConfig,
1672 event_cache_store::keys,
1673 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1674 };
1675
1676 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1677 static NUM: AtomicU32 = AtomicU32::new(0);
1678
1679 fn new_event_cache_store_workspace() -> PathBuf {
1680 let name = NUM.fetch_add(1, SeqCst).to_string();
1681 TMP_DIR.path().join(name)
1682 }
1683
1684 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1685 let tmpdir_path = new_event_cache_store_workspace();
1686
1687 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1688
1689 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1690 }
1691
1692 event_cache_store_integration_tests!();
1693 event_cache_store_integration_tests_time!();
1694
1695 #[async_test]
1696 async fn test_pool_size() {
1697 let tmpdir_path = new_event_cache_store_workspace();
1698 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1699
1700 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1701
1702 assert_eq!(store.pool.status().max_size, 42);
1703 }
1704
1705 #[async_test]
1706 async fn test_linked_chunk_remove_chunk() {
1707 let store = get_event_cache_store().await.expect("creating cache store failed");
1708
1709 store.clone().into_event_cache_store().test_linked_chunk_remove_chunk().await;
1711
1712 let gaps = store
1714 .read()
1715 .await
1716 .unwrap()
1717 .with_transaction(|txn| -> rusqlite::Result<_> {
1718 let mut gaps = Vec::new();
1719 for data in txn
1720 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1721 .query_map((), |row| row.get::<_, u64>(0))?
1722 {
1723 gaps.push(data?);
1724 }
1725 Ok(gaps)
1726 })
1727 .await
1728 .unwrap();
1729
1730 assert_eq!(gaps, vec![42, 44]);
1733 }
1734
1735 #[async_test]
1736 async fn test_linked_chunk_remove_item() {
1737 let store = get_event_cache_store().await.expect("creating cache store failed");
1738
1739 store.clone().into_event_cache_store().test_linked_chunk_remove_item().await;
1741
1742 let room_id = *DEFAULT_TEST_ROOM_ID;
1743 let linked_chunk_id = LinkedChunkId::Room(room_id);
1744
1745 let num_rows: u64 = store
1747 .read()
1748 .await
1749 .unwrap()
1750 .with_transaction(move |txn| {
1751 txn.query_row(
1752 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1753 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1754 |row| row.get(0),
1755 )
1756 })
1757 .await
1758 .unwrap();
1759 assert_eq!(num_rows, 3);
1760 }
1761
1762 #[async_test]
1763 async fn test_linked_chunk_clear() {
1764 let store = get_event_cache_store().await.expect("creating cache store failed");
1765
1766 store.clone().into_event_cache_store().test_linked_chunk_clear().await;
1768
1769 store
1771 .read()
1772 .await
1773 .unwrap()
1774 .with_transaction(|txn| -> rusqlite::Result<_> {
1775 let num_gaps = txn
1776 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
1777 .query_row((), |row| row.get::<_, u64>(0))?;
1778 assert_eq!(num_gaps, 0);
1779
1780 let num_events = txn
1781 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
1782 .query_row((), |row| row.get::<_, u64>(0))?;
1783 assert_eq!(num_events, 0);
1784
1785 Ok(())
1786 })
1787 .await
1788 .unwrap();
1789 }
1790
1791 #[async_test]
1792 async fn test_linked_chunk_update_is_a_transaction() {
1793 let store = get_event_cache_store().await.expect("creating cache store failed");
1794
1795 let room_id = *DEFAULT_TEST_ROOM_ID;
1796 let linked_chunk_id = LinkedChunkId::Room(room_id);
1797
1798 let err = store
1801 .handle_linked_chunk_updates(
1802 linked_chunk_id,
1803 vec![
1804 Update::NewItemsChunk {
1805 previous: None,
1806 new: ChunkIdentifier::new(42),
1807 next: None,
1808 },
1809 Update::NewItemsChunk {
1810 previous: None,
1811 new: ChunkIdentifier::new(42),
1812 next: None,
1813 },
1814 ],
1815 )
1816 .await
1817 .unwrap_err();
1818
1819 assert_matches!(err, crate::error::Error::Sqlite(err) => {
1821 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1822 });
1823
1824 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1828 assert!(chunks.is_empty());
1829 }
1830
1831 #[async_test]
1832 async fn test_event_chunks_allows_same_event_in_room_and_thread() {
1833 let store = get_event_cache_store().await.expect("creating cache store failed");
1838
1839 let room_id = *DEFAULT_TEST_ROOM_ID;
1840 let thread_root = event_id!("$thread_root");
1841
1842 let event = make_test_event_with_event_id(
1845 room_id,
1846 "thread reply",
1847 Some(event_id!("$thread_reply")),
1848 );
1849
1850 let room_linked_chunk_id = LinkedChunkId::Room(room_id);
1851 let thread_linked_chunk_id = LinkedChunkId::Thread(room_id, thread_root);
1852
1853 store
1855 .handle_linked_chunk_updates(
1856 room_linked_chunk_id,
1857 vec![
1858 Update::NewItemsChunk {
1859 previous: None,
1860 new: ChunkIdentifier::new(1),
1861 next: None,
1862 },
1863 Update::PushItems {
1864 at: Position::new(ChunkIdentifier::new(1), 0),
1865 items: vec![event.clone()],
1866 },
1867 ],
1868 )
1869 .await
1870 .unwrap();
1871
1872 store
1874 .handle_linked_chunk_updates(
1875 thread_linked_chunk_id,
1876 vec![
1877 Update::NewItemsChunk {
1878 previous: None,
1879 new: ChunkIdentifier::new(1),
1880 next: None,
1881 },
1882 Update::PushItems {
1883 at: Position::new(ChunkIdentifier::new(1), 0),
1884 items: vec![event],
1885 },
1886 ],
1887 )
1888 .await
1889 .unwrap();
1890
1891 let room_chunks = store.load_all_chunks(room_linked_chunk_id).await.unwrap();
1893 let thread_chunks = store.load_all_chunks(thread_linked_chunk_id).await.unwrap();
1894
1895 assert_eq!(room_chunks.len(), 1);
1896 assert_eq!(thread_chunks.len(), 1);
1897
1898 assert_matches!(&room_chunks[0].content, ChunkContent::Items(events) => {
1900 assert_eq!(events.len(), 1);
1901 assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1902 });
1903 assert_matches!(&thread_chunks[0].content, ChunkContent::Items(events) => {
1904 assert_eq!(events.len(), 1);
1905 assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1906 });
1907 }
1908}
1909
1910#[cfg(test)]
1911mod encrypted_tests {
1912 use std::sync::{
1913 LazyLock,
1914 atomic::{AtomicU32, Ordering::SeqCst},
1915 };
1916
1917 use matrix_sdk_base::{
1918 event_cache::store::{EventCacheStore, EventCacheStoreError},
1919 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1920 };
1921 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1922 use ruma::{
1923 event_id,
1924 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1925 room_id, user_id,
1926 };
1927 use tempfile::{TempDir, tempdir};
1928
1929 use super::SqliteEventCacheStore;
1930
1931 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1932 static NUM: AtomicU32 = AtomicU32::new(0);
1933
1934 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1935 let name = NUM.fetch_add(1, SeqCst).to_string();
1936 let tmpdir_path = TMP_DIR.path().join(name);
1937
1938 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1939
1940 Ok(SqliteEventCacheStore::open(
1941 tmpdir_path.to_str().unwrap(),
1942 Some("default_test_password"),
1943 )
1944 .await
1945 .unwrap())
1946 }
1947
1948 event_cache_store_integration_tests!();
1949 event_cache_store_integration_tests_time!();
1950
1951 #[async_test]
1952 async fn test_no_sqlite_injection_in_find_event_relations() {
1953 let room_id = room_id!("!test:localhost");
1954 let another_room_id = room_id!("!r1:matrix.org");
1955 let sender = user_id!("@alice:localhost");
1956
1957 let store = get_event_cache_store()
1958 .await
1959 .expect("We should be able to create a new, empty, event cache store");
1960
1961 let f = EventFactory::new().room(room_id).sender(sender);
1962
1963 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
1965 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
1966
1967 let edit_id = event_id!("$find_me:matrix.org");
1969 let edit = f
1970 .text_msg("Find me")
1971 .event_id(edit_id)
1972 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
1973 .into_event();
1974
1975 let f = f.room(another_room_id);
1977
1978 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
1979 let another_event =
1980 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
1981
1982 store.save_event(room_id, event).await.unwrap();
1984 store.save_event(room_id, edit).await.unwrap();
1985 store.save_event(another_room_id, another_event).await.unwrap();
1986
1987 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
1991
1992 let results = store
1994 .find_event_relations(room_id, event_id, filter.as_deref())
1995 .await
1996 .expect("We should be able to attempt to find event relations");
1997
1998 similar_asserts::assert_eq!(
2000 results.len(),
2001 1,
2002 "We should only have loaded events for the first room {results:#?}"
2003 );
2004
2005 let (found_event, _) = &results[0];
2007 assert_eq!(
2008 found_event.event_id().as_deref(),
2009 Some(edit_id),
2010 "The single event we found should be the edit event"
2011 );
2012 }
2013}