1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21 cross_process_lock::CrossProcessLockGeneration,
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 Event, Gap,
25 store::{EventCacheStore, extract_event_relation},
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
36};
37use rusqlite::{
38 OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
39};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47 OpenStoreError, Secret, SqliteStoreConfig,
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
53 },
54};
55
56mod keys {
57 pub const LINKED_CHUNKS: &str = "linked_chunks";
59 pub const EVENTS: &str = "events";
60}
61
62const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65const DATABASE_VERSION: u8 = 14;
71
72const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82 store_cipher: Option<Arc<StoreCipher>>,
83
84 pool: SqlitePool,
86
87 write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98 }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102 fn get_cypher(&self) -> Option<&StoreCipher> {
103 self.store_cipher.as_deref()
104 }
105}
106
107impl SqliteEventCacheStore {
108 pub async fn open(
111 path: impl AsRef<Path>,
112 passphrase: Option<&str>,
113 ) -> Result<Self, OpenStoreError> {
114 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115 }
116
117 pub async fn open_with_key(
120 path: impl AsRef<Path>,
121 key: Option<&[u8; 32]>,
122 ) -> Result<Self, OpenStoreError> {
123 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124 }
125
126 #[instrument(skip(config), fields(path = ?config.path))]
128 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129 debug!(?config);
130
131 let _timer = timer!("open_with_config");
132
133 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137 let this = Self::open_with_pool(pool, config.secret).await?;
138 this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140 Ok(this)
141 }
142
143 async fn open_with_pool(
146 pool: SqlitePool,
147 secret: Option<Secret>,
148 ) -> Result<Self, OpenStoreError> {
149 let conn = pool.get().await?;
150
151 let version = conn.db_version().await?;
152
153 run_migrations(&conn, version).await?;
154
155 conn.wal_checkpoint().await;
156
157 let store_cipher = match secret {
158 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
159 None => None,
160 };
161
162 Ok(Self {
163 store_cipher,
164 pool,
165 write_connection: Arc::new(Mutex::new(conn)),
167 })
168 }
169
170 #[instrument(skip_all)]
172 async fn read(&self) -> Result<SqliteAsyncConn> {
173 let connection = self.pool.get().await?;
174
175 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
180
181 Ok(connection)
182 }
183
184 #[instrument(skip_all)]
186 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
187 let connection = self.write_connection.clone().lock_owned().await;
188
189 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
194
195 Ok(connection)
196 }
197
198 fn map_row_to_chunk(
199 row: &rusqlite::Row<'_>,
200 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
201 Ok((
202 row.get::<_, u64>(0)?,
203 row.get::<_, Option<u64>>(1)?,
204 row.get::<_, Option<u64>>(2)?,
205 row.get::<_, String>(3)?,
206 ))
207 }
208
209 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
210 let serialized = serde_json::to_vec(event)?;
211
212 let raw_event = event.raw();
214 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
215
216 let content = self.encode_value(serialized)?;
218
219 Ok(EncodedEvent {
220 content,
221 rel_type,
222 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
223 })
224 }
225
226 pub async fn vacuum(&self) -> Result<()> {
227 self.write_connection.lock().await.vacuum().await
228 }
229
230 async fn get_db_size(&self) -> Result<Option<usize>> {
231 Ok(Some(self.pool.get().await?.get_db_size().await?))
232 }
233}
234
235struct EncodedEvent {
236 content: Vec<u8>,
237 rel_type: Option<String>,
238 relates_to: Option<String>,
239}
240
241trait TransactionExtForLinkedChunks {
242 fn rebuild_chunk(
243 &self,
244 store: &SqliteEventCacheStore,
245 linked_chunk_id: &Key,
246 previous: Option<u64>,
247 index: u64,
248 next: Option<u64>,
249 chunk_type: &str,
250 ) -> Result<RawChunk<Event, Gap>>;
251
252 fn load_gap_content(
253 &self,
254 store: &SqliteEventCacheStore,
255 linked_chunk_id: &Key,
256 chunk_id: ChunkIdentifier,
257 ) -> Result<Gap>;
258
259 fn load_events_content(
260 &self,
261 store: &SqliteEventCacheStore,
262 linked_chunk_id: &Key,
263 chunk_id: ChunkIdentifier,
264 ) -> Result<Vec<Event>>;
265}
266
267impl TransactionExtForLinkedChunks for Transaction<'_> {
268 fn rebuild_chunk(
269 &self,
270 store: &SqliteEventCacheStore,
271 linked_chunk_id: &Key,
272 previous: Option<u64>,
273 id: u64,
274 next: Option<u64>,
275 chunk_type: &str,
276 ) -> Result<RawChunk<Event, Gap>> {
277 let previous = previous.map(ChunkIdentifier::new);
278 let next = next.map(ChunkIdentifier::new);
279 let id = ChunkIdentifier::new(id);
280
281 match chunk_type {
282 CHUNK_TYPE_GAP_TYPE_STRING => {
283 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
285 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
286 }
287
288 CHUNK_TYPE_EVENT_TYPE_STRING => {
289 let events = self.load_events_content(store, linked_chunk_id, id)?;
291 Ok(RawChunk {
292 content: ChunkContent::Items(events),
293 previous,
294 identifier: id,
295 next,
296 })
297 }
298
299 other => {
300 Err(Error::InvalidData {
302 details: format!("a linked chunk has an unknown type {other}"),
303 })
304 }
305 }
306 }
307
308 fn load_gap_content(
309 &self,
310 store: &SqliteEventCacheStore,
311 linked_chunk_id: &Key,
312 chunk_id: ChunkIdentifier,
313 ) -> Result<Gap> {
314 let encoded_prev_token: Vec<u8> = self.query_row(
317 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
318 (chunk_id.index(), &linked_chunk_id),
319 |row| row.get(0),
320 )?;
321 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
322 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
323 Ok(Gap { prev_token })
324 }
325
326 fn load_events_content(
327 &self,
328 store: &SqliteEventCacheStore,
329 linked_chunk_id: &Key,
330 chunk_id: ChunkIdentifier,
331 ) -> Result<Vec<Event>> {
332 let mut events = Vec::new();
334
335 for event_data in self
336 .prepare(
337 r#"
338 SELECT events.content
339 FROM event_chunks ec, events
340 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
341 ORDER BY ec.position ASC
342 "#,
343 )?
344 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
345 {
346 let encoded_content = event_data?;
347 let serialized_content = store.decode_value(&encoded_content)?;
348 let event = serde_json::from_slice(&serialized_content)?;
349
350 events.push(event);
351 }
352
353 Ok(events)
354 }
355}
356
357async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
359 if version == 0 {
360 debug!("Creating database");
361 } else if version < DATABASE_VERSION {
362 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
363 } else {
364 return Ok(());
365 }
366
367 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
369
370 if version < 1 {
371 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
374 conn.with_transaction(|txn| {
375 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
376 txn.set_db_version(1)
377 })
378 .await?;
379 }
380
381 if version < 2 {
382 conn.with_transaction(|txn| {
383 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
384 txn.set_db_version(2)
385 })
386 .await?;
387 }
388
389 if version < 3 {
390 conn.with_transaction(|txn| {
391 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
392 txn.set_db_version(3)
393 })
394 .await?;
395 }
396
397 if version < 4 {
398 conn.with_transaction(|txn| {
399 txn.execute_batch(include_str!(
400 "../migrations/event_cache_store/004_ignore_policy.sql"
401 ))?;
402 txn.set_db_version(4)
403 })
404 .await?;
405 }
406
407 if version < 5 {
408 conn.with_transaction(|txn| {
409 txn.execute_batch(include_str!(
410 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
411 ))?;
412 txn.set_db_version(5)
413 })
414 .await?;
415 }
416
417 if version < 6 {
418 conn.with_transaction(|txn| {
419 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
420 txn.set_db_version(6)
421 })
422 .await?;
423 }
424
425 if version < 7 {
426 conn.with_transaction(|txn| {
427 txn.execute_batch(include_str!(
428 "../migrations/event_cache_store/007_event_chunks.sql"
429 ))?;
430 txn.set_db_version(7)
431 })
432 .await?;
433 }
434
435 if version < 8 {
436 conn.with_transaction(|txn| {
437 txn.execute_batch(include_str!(
438 "../migrations/event_cache_store/008_linked_chunk_id.sql"
439 ))?;
440 txn.set_db_version(8)
441 })
442 .await?;
443 }
444
445 if version < 9 {
446 conn.with_transaction(|txn| {
447 txn.execute_batch(include_str!(
448 "../migrations/event_cache_store/009_related_event_index.sql"
449 ))?;
450 txn.set_db_version(9)
451 })
452 .await?;
453 }
454
455 if version < 10 {
456 conn.with_transaction(|txn| {
457 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
458 txn.set_db_version(10)
459 })
460 .await?;
461
462 if version >= 1 {
463 conn.vacuum().await?;
466 }
467 }
468
469 if version < 11 {
470 conn.with_transaction(|txn| {
471 txn.execute_batch(include_str!(
472 "../migrations/event_cache_store/011_empty_event_cache.sql"
473 ))?;
474 txn.set_db_version(11)
475 })
476 .await?;
477 }
478
479 if version < 12 {
480 conn.with_transaction(|txn| {
481 txn.execute_batch(include_str!(
482 "../migrations/event_cache_store/012_store_event_type.sql"
483 ))?;
484 txn.set_db_version(12)
485 })
486 .await?;
487 }
488
489 if version < 13 {
490 conn.with_transaction(|txn| {
491 txn.execute_batch(include_str!(
492 "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
493 ))?;
494 txn.set_db_version(13)
495 })
496 .await?;
497 }
498
499 if version < 14 {
500 conn.with_transaction(|txn| {
501 txn.execute_batch(include_str!(
502 "../migrations/event_cache_store/014_event_chunks_event_id_index.sql"
503 ))?;
504 txn.set_db_version(14)
505 })
506 .await?;
507 }
508
509 Ok(())
510}
511
512#[async_trait]
513impl EventCacheStore for SqliteEventCacheStore {
514 type Error = Error;
515
516 #[instrument(skip(self))]
517 async fn try_take_leased_lock(
518 &self,
519 lease_duration_ms: u32,
520 key: &str,
521 holder: &str,
522 ) -> Result<Option<CrossProcessLockGeneration>> {
523 let key = key.to_owned();
524 let holder = holder.to_owned();
525
526 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
527 let expiration = now + lease_duration_ms as u64;
528
529 let generation = self
531 .write()
532 .await?
533 .with_transaction(move |txn| {
534 txn.query_row(
535 "INSERT INTO lease_locks (key, holder, expiration)
536 VALUES (?1, ?2, ?3)
537 ON CONFLICT (key)
538 DO
539 UPDATE SET
540 holder = excluded.holder,
541 expiration = excluded.expiration,
542 generation =
543 CASE holder
544 WHEN excluded.holder THEN generation
545 ELSE generation + 1
546 END
547 WHERE
548 holder = excluded.holder
549 OR expiration < ?4
550 RETURNING generation
551 ",
552 (key, holder, expiration, now),
553 |row| row.get(0),
554 )
555 .optional()
556 })
557 .await?;
558
559 Ok(generation)
560 }
561
562 #[instrument(skip(self, updates))]
563 async fn handle_linked_chunk_updates(
564 &self,
565 linked_chunk_id: LinkedChunkId<'_>,
566 updates: Vec<Update<Event, Gap>>,
567 ) -> Result<(), Self::Error> {
568 let _timer = timer!("method");
569
570 let hashed_linked_chunk_id =
573 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
574 let linked_chunk_id = linked_chunk_id.to_owned();
575 let this = self.clone();
576
577 with_immediate_transaction(self, move |txn| {
578 for up in updates {
579 match up {
580 Update::NewItemsChunk { previous, new, next } => {
581 let previous = previous.as_ref().map(ChunkIdentifier::index);
582 let new = new.index();
583 let next = next.as_ref().map(ChunkIdentifier::index);
584
585 trace!(
586 %linked_chunk_id,
587 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
588 );
589
590 insert_chunk(
591 txn,
592 &hashed_linked_chunk_id,
593 previous,
594 new,
595 next,
596 CHUNK_TYPE_EVENT_TYPE_STRING,
597 )?;
598 }
599
600 Update::NewGapChunk { previous, new, next, gap } => {
601 let serialized = serde_json::to_vec(&gap.prev_token)?;
602 let prev_token = this.encode_value(serialized)?;
603
604 let previous = previous.as_ref().map(ChunkIdentifier::index);
605 let new = new.index();
606 let next = next.as_ref().map(ChunkIdentifier::index);
607
608 trace!(
609 %linked_chunk_id,
610 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
611 );
612
613 insert_chunk(
615 txn,
616 &hashed_linked_chunk_id,
617 previous,
618 new,
619 next,
620 CHUNK_TYPE_GAP_TYPE_STRING,
621 )?;
622
623 txn.execute(
625 r#"
626 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
627 VALUES (?, ?, ?)
628 "#,
629 (new, &hashed_linked_chunk_id, prev_token),
630 )?;
631 }
632
633 Update::RemoveChunk(chunk_identifier) => {
634 let chunk_id = chunk_identifier.index();
635
636 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
637
638 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
640 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
641 (chunk_id, &hashed_linked_chunk_id),
642 |row| Ok((row.get(0)?, row.get(1)?))
643 )?;
644
645 if let Some(previous) = previous {
647 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
648 }
649
650 if let Some(next) = next {
652 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
653 }
654
655 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
658 }
659
660 Update::PushItems { at, items } => {
661 if items.is_empty() {
662 continue;
664 }
665
666 let chunk_id = at.chunk_identifier().index();
667
668 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
669
670 let mut chunk_statement = txn.prepare(
671 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
672 )?;
673
674 let mut content_statement = txn.prepare(
679 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
680 )?;
681
682 let invalid_event = |event: TimelineEvent| {
683 let Some(event_id) = event.event_id() else {
684 error!(%linked_chunk_id, "Trying to push an event with no ID");
685 return None;
686 };
687
688 let Some(event_type) = event.kind.event_type() else {
689 error!(%event_id, "Trying to save an event with no event type");
690 return None;
691 };
692
693 Some((event_id.to_string(), event_type, event))
694 };
695
696 let room_id = linked_chunk_id.room_id();
697 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
698
699 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
700 let index = at.index() + i;
702 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
703
704 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
705 let event_type = this.encode_key(keys::EVENTS, event_type);
706
707 let encoded_event = this.encode_event(&event)?;
709 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
710 }
711 }
712
713 Update::ReplaceItem { at, item: event } => {
714 let chunk_id = at.chunk_identifier().index();
715
716 let index = at.index();
717
718 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
719
720 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
722 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
723 continue;
724 };
725
726 let Some(event_type) = event.kind.event_type() else {
727 error!(%event_id, "Trying to save an event with no event type");
728 continue;
729 };
730
731 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
732 let event_type = this.encode_key(keys::EVENTS, event_type);
733
734 let encoded_event = this.encode_event(&event)?;
738 let room_id = linked_chunk_id.room_id();
739 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
740 txn.execute(
741 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
742 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
743
744 txn.execute(
746 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
747 (event_id, &hashed_linked_chunk_id, chunk_id, index)
748 )?;
749 }
750
751 Update::RemoveItem { at } => {
752 let chunk_id = at.chunk_identifier().index();
753 let index = at.index();
754
755 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
756
757 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
759
760 txn.execute(
859 r#"
860 UPDATE event_chunks
861 SET position = -(position - 1)
862 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
863 "#,
864 (&hashed_linked_chunk_id, chunk_id, index)
865 )?;
866 txn.execute(
867 r#"
868 UPDATE event_chunks
869 SET position = -position
870 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
871 "#,
872 (&hashed_linked_chunk_id, chunk_id)
873 )?;
874 }
875
876 Update::DetachLastItems { at } => {
877 let chunk_id = at.chunk_identifier().index();
878 let index = at.index();
879
880 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
881
882 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
884 }
885
886 Update::Clear => {
887 trace!(%linked_chunk_id, "clearing items");
888
889 txn.execute(
891 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
892 (&hashed_linked_chunk_id,),
893 )?;
894 }
895
896 Update::StartReattachItems | Update::EndReattachItems => {
897 }
899 }
900 }
901
902 Ok(())
903 })
904 .await?;
905
906 Ok(())
907 }
908
909 #[instrument(skip(self))]
910 async fn load_all_chunks(
911 &self,
912 linked_chunk_id: LinkedChunkId<'_>,
913 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
914 let _timer = timer!("method");
915
916 let hashed_linked_chunk_id =
917 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
918
919 let this = self.clone();
920
921 let result = self
922 .read()
923 .await?
924 .with_transaction(move |txn| -> Result<_> {
925 let mut items = Vec::new();
926
927 for data in txn
929 .prepare(
930 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
931 )?
932 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
933 {
934 let (id, previous, next, chunk_type) = data?;
935 let new = txn.rebuild_chunk(
936 &this,
937 &hashed_linked_chunk_id,
938 previous,
939 id,
940 next,
941 chunk_type.as_str(),
942 )?;
943 items.push(new);
944 }
945
946 Ok(items)
947 })
948 .await?;
949
950 Ok(result)
951 }
952
953 #[instrument(skip(self))]
954 async fn load_all_chunks_metadata(
955 &self,
956 linked_chunk_id: LinkedChunkId<'_>,
957 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
958 let _timer = timer!("method");
959
960 let hashed_linked_chunk_id =
961 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
962
963 self.read()
964 .await?
965 .with_transaction(move |txn| -> Result<_> {
966 let num_events_by_chunk_ids = txn
993 .prepare(
994 r#"
995 SELECT ec.chunk_id, COUNT(ec.event_id)
996 FROM event_chunks as ec
997 WHERE ec.linked_chunk_id = ?
998 GROUP BY ec.chunk_id
999 "#,
1000 )?
1001 .query_map((&hashed_linked_chunk_id,), |row| {
1002 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
1003 })?
1004 .collect::<Result<HashMap<_, _>, _>>()?;
1005
1006 txn.prepare(
1007 r#"
1008 SELECT
1009 lc.id,
1010 lc.previous,
1011 lc.next,
1012 lc.type
1013 FROM linked_chunks as lc
1014 WHERE lc.linked_chunk_id = ?
1015 ORDER BY lc.id"#,
1016 )?
1017 .query_map((&hashed_linked_chunk_id,), |row| {
1018 Ok((
1019 row.get::<_, u64>(0)?,
1020 row.get::<_, Option<u64>>(1)?,
1021 row.get::<_, Option<u64>>(2)?,
1022 row.get::<_, String>(3)?,
1023 ))
1024 })?
1025 .map(|data| -> Result<_> {
1026 let (id, previous, next, chunk_type) = data?;
1027
1028 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1034 0
1035 } else {
1036 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1037 };
1038
1039 Ok(ChunkMetadata {
1040 identifier: ChunkIdentifier::new(id),
1041 previous: previous.map(ChunkIdentifier::new),
1042 next: next.map(ChunkIdentifier::new),
1043 num_items,
1044 })
1045 })
1046 .collect::<Result<Vec<_>, _>>()
1047 })
1048 .await
1049 }
1050
1051 #[instrument(skip(self))]
1052 async fn load_last_chunk(
1053 &self,
1054 linked_chunk_id: LinkedChunkId<'_>,
1055 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1056 let _timer = timer!("method");
1057
1058 let hashed_linked_chunk_id =
1059 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1060
1061 let this = self.clone();
1062
1063 self
1064 .read()
1065 .await?
1066 .with_transaction(move |txn| -> Result<_> {
1067 let (observed_max_identifier, number_of_chunks) = txn
1069 .prepare(
1070 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1071 )?
1072 .query_row(
1073 (&hashed_linked_chunk_id,),
1074 |row| {
1075 Ok((
1076 row.get::<_, Option<u64>>(0)?,
1081 row.get::<_, u64>(1)?,
1082 ))
1083 }
1084 )?;
1085
1086 let chunk_identifier_generator = match observed_max_identifier {
1087 Some(max_observed_identifier) => {
1088 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1089 ChunkIdentifier::new(max_observed_identifier)
1090 )
1091 },
1092 None => ChunkIdentifierGenerator::new_from_scratch(),
1093 };
1094
1095 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1097 .prepare(
1098 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1099 )?
1100 .query_row(
1101 (&hashed_linked_chunk_id,),
1102 |row| {
1103 Ok((
1104 row.get::<_, u64>(0)?,
1105 row.get::<_, Option<u64>>(1)?,
1106 row.get::<_, String>(2)?,
1107 ))
1108 }
1109 )
1110 .optional()?
1111 else {
1112 if number_of_chunks == 0 {
1115 return Ok((None, chunk_identifier_generator));
1116 }
1117 else {
1122 return Err(Error::InvalidData {
1123 details:
1124 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1125 .to_owned()
1126 }
1127 )
1128 }
1129 };
1130
1131 let last_chunk = txn.rebuild_chunk(
1133 &this,
1134 &hashed_linked_chunk_id,
1135 previous_chunk,
1136 chunk_identifier,
1137 None,
1138 &chunk_type
1139 )?;
1140
1141 Ok((Some(last_chunk), chunk_identifier_generator))
1142 })
1143 .await
1144 }
1145
1146 #[instrument(skip(self))]
1147 async fn load_previous_chunk(
1148 &self,
1149 linked_chunk_id: LinkedChunkId<'_>,
1150 before_chunk_identifier: ChunkIdentifier,
1151 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1152 let _timer = timer!("method");
1153
1154 let hashed_linked_chunk_id =
1155 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1156
1157 let this = self.clone();
1158
1159 self
1160 .read()
1161 .await?
1162 .with_transaction(move |txn| -> Result<_> {
1163 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1165 .prepare(
1166 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1167 )?
1168 .query_row(
1169 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1170 |row| {
1171 Ok((
1172 row.get::<_, u64>(0)?,
1173 row.get::<_, Option<u64>>(1)?,
1174 row.get::<_, Option<u64>>(2)?,
1175 row.get::<_, String>(3)?,
1176 ))
1177 }
1178 )
1179 .optional()?
1180 else {
1181 return Ok(None);
1183 };
1184
1185 let last_chunk = txn.rebuild_chunk(
1187 &this,
1188 &hashed_linked_chunk_id,
1189 previous_chunk,
1190 chunk_identifier,
1191 next_chunk,
1192 &chunk_type
1193 )?;
1194
1195 Ok(Some(last_chunk))
1196 })
1197 .await
1198 }
1199
1200 #[instrument(skip(self))]
1201 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1202 let _timer = timer!("method");
1203
1204 self.write()
1205 .await?
1206 .with_transaction(move |txn| {
1207 txn.execute("DELETE FROM linked_chunks", ())?;
1209 txn.execute("DELETE FROM events", ())
1211 })
1212 .await?;
1213
1214 Ok(())
1215 }
1216
1217 #[instrument(skip(self, events))]
1218 async fn filter_duplicated_events(
1219 &self,
1220 linked_chunk_id: LinkedChunkId<'_>,
1221 events: Vec<OwnedEventId>,
1222 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1223 let _timer = timer!("method");
1224
1225 if events.is_empty() {
1229 return Ok(Vec::new());
1230 }
1231
1232 let hashed_linked_chunk_id =
1234 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1235 let linked_chunk_id = linked_chunk_id.to_owned();
1236
1237 self.read()
1238 .await?
1239 .with_transaction(move |txn| -> Result<_> {
1240 txn.chunk_large_query_over(events, None, move |txn, events| {
1241 let query = format!(
1242 r#"
1243 SELECT event_id, chunk_id, position
1244 FROM event_chunks
1245 WHERE linked_chunk_id = ? AND event_id IN ({})
1246 ORDER BY chunk_id ASC, position ASC
1247 "#,
1248 repeat_vars(events.len()),
1249 );
1250
1251 let parameters = params_from_iter(
1252 once(
1254 hashed_linked_chunk_id
1255 .to_sql()
1256 .unwrap(),
1258 )
1259 .chain(events.iter().map(|event| {
1261 event
1262 .as_str()
1263 .to_sql()
1264 .unwrap()
1266 })),
1267 );
1268
1269 let mut duplicated_events = Vec::new();
1270
1271 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1272 Ok((
1273 row.get::<_, String>(0)?,
1274 row.get::<_, u64>(1)?,
1275 row.get::<_, usize>(2)?,
1276 ))
1277 })? {
1278 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1279
1280 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1281 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1284 continue;
1285 };
1286
1287 duplicated_events.push((
1288 duplicated_event,
1289 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1290 ));
1291 }
1292
1293 Ok(duplicated_events)
1294 })
1295 })
1296 .await
1297 }
1298
1299 #[instrument(skip(self, event_id))]
1300 async fn find_event(
1301 &self,
1302 room_id: &RoomId,
1303 event_id: &EventId,
1304 ) -> Result<Option<Event>, Self::Error> {
1305 let _timer = timer!("method");
1306
1307 let event_id = event_id.to_owned();
1308 let this = self.clone();
1309
1310 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1311
1312 self.read()
1313 .await?
1314 .with_transaction(move |txn| -> Result<_> {
1315 let Some(event) = txn
1316 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1317 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1318 .optional()?
1319 else {
1320 return Ok(None);
1322 };
1323
1324 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1325
1326 Ok(Some(event))
1327 })
1328 .await
1329 }
1330
1331 #[instrument(skip(self, event_id, filters))]
1332 async fn find_event_relations(
1333 &self,
1334 room_id: &RoomId,
1335 event_id: &EventId,
1336 filters: Option<&[RelationType]>,
1337 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1338 let _timer = timer!("method");
1339
1340 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1341
1342 let hashed_linked_chunk_id =
1343 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1344
1345 let event_id = event_id.to_owned();
1346 let filters = filters.map(ToOwned::to_owned);
1347 let store = self.clone();
1348
1349 self.read()
1350 .await?
1351 .with_transaction(move |txn| -> Result<_> {
1352 find_event_relations_transaction(
1353 store,
1354 hashed_room_id,
1355 hashed_linked_chunk_id,
1356 event_id,
1357 filters,
1358 txn,
1359 )
1360 })
1361 .await
1362 }
1363
1364 #[instrument(skip(self))]
1365 async fn get_room_events(
1366 &self,
1367 room_id: &RoomId,
1368 event_type: Option<&str>,
1369 session_id: Option<&str>,
1370 ) -> Result<Vec<Event>, Self::Error> {
1371 let _timer = timer!("method");
1372
1373 let this = self.clone();
1374
1375 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1376 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1377 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1378
1379 self.read()
1380 .await?
1381 .with_transaction(move |txn| -> Result<_> {
1382 #[allow(clippy::redundant_clone)]
1386 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1387 (None, None) => {
1388 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1389 }
1390 (None, Some(session_id)) => (
1391 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1392 params![hashed_room_id, session_id.to_owned()],
1393 ),
1394 (Some(event_type), None) => (
1395 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1396 params![hashed_room_id, event_type.to_owned()]
1397 ),
1398 (Some(event_type), Some(session_id)) => (
1399 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1400 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1401 ),
1402 };
1403
1404 let mut statement = txn.prepare(query)?;
1405 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1406
1407 let mut events = Vec::new();
1408 for ev in maybe_events {
1409 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1410 events.push(event);
1411 }
1412
1413 Ok(events)
1414 })
1415 .await
1416 }
1417
1418 #[instrument(skip(self, event))]
1419 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1420 let _timer = timer!("method");
1421
1422 let Some(event_id) = event.event_id() else {
1423 error!("Trying to save an event with no ID");
1424 return Ok(());
1425 };
1426
1427 let Some(event_type) = event.kind.event_type() else {
1428 error!(%event_id, "Trying to save an event with no event type");
1429 return Ok(());
1430 };
1431
1432 let event_type = self.encode_key(keys::EVENTS, event_type);
1433 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1434
1435 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1436 let event_id = event_id.to_string();
1437 let encoded_event = self.encode_event(&event)?;
1438
1439 self.write()
1440 .await?
1441 .with_transaction(move |txn| -> Result<_> {
1442 txn.execute(
1443 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1444 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1445
1446 Ok(())
1447 })
1448 .await
1449 }
1450
1451 async fn optimize(&self) -> Result<(), Self::Error> {
1452 Ok(self.vacuum().await?)
1453 }
1454
1455 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1456 self.get_db_size().await
1457 }
1458}
1459
1460fn find_event_relations_transaction(
1461 store: SqliteEventCacheStore,
1462 hashed_room_id: Key,
1463 hashed_linked_chunk_id: Key,
1464 event_id: OwnedEventId,
1465 filters: Option<Vec<RelationType>>,
1466 txn: &Transaction<'_>,
1467) -> Result<Vec<(Event, Option<Position>)>> {
1468 let get_rows = |row: &rusqlite::Row<'_>| {
1469 Ok((
1470 row.get::<_, Vec<u8>>(0)?,
1471 row.get::<_, Option<u64>>(1)?,
1472 row.get::<_, Option<usize>>(2)?,
1473 ))
1474 };
1475
1476 let collect_results = |transaction| {
1478 let mut related = Vec::new();
1479
1480 for result in transaction {
1481 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1482
1483 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1484
1485 let pos = chunk_id
1488 .zip(index)
1489 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1490
1491 related.push((event, pos));
1492 }
1493
1494 Ok(related)
1495 };
1496
1497 if let Some(filters) = filters {
1498 let question_marks = repeat_vars(filters.len());
1499 let query = format!(
1500 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1501 FROM events
1502 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1503 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1504 );
1505
1506 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1511 let filters_params: Vec<_> = filter_strings
1512 .iter()
1513 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1514 .collect();
1515
1516 let parameters = params_from_iter(
1517 [
1518 hashed_linked_chunk_id.to_sql().expect(
1519 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1520 ),
1521 event_id
1522 .as_str()
1523 .to_sql()
1524 .expect("We should be able to convert an event ID to a SQLite value"),
1525 hashed_room_id
1526 .to_sql()
1527 .expect("We should be able to convert a room ID to a SQLite value"),
1528 ]
1529 .into_iter()
1530 .chain(filters_params),
1531 );
1532
1533 let mut transaction = txn.prepare(&query)?;
1534 let transaction = transaction.query_map(parameters, get_rows)?;
1535
1536 collect_results(transaction)
1537 } else {
1538 let query =
1539 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1540 FROM events
1541 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1542 WHERE relates_to = ? AND room_id = ?";
1543 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1544
1545 let mut transaction = txn.prepare(query)?;
1546 let transaction = transaction.query_map(parameters, get_rows)?;
1547
1548 collect_results(transaction)
1549 }
1550}
1551
1552async fn with_immediate_transaction<
1557 T: Send + 'static,
1558 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1559>(
1560 this: &SqliteEventCacheStore,
1561 f: F,
1562) -> Result<T, Error> {
1563 this.write()
1564 .await?
1565 .interact(move |conn| -> Result<T, Error> {
1566 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1570
1571 let code = || -> Result<T, Error> {
1572 let txn = conn.transaction()?;
1573 let res = f(&txn)?;
1574 txn.commit()?;
1575 Ok(res)
1576 };
1577
1578 let res = code();
1579
1580 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1583
1584 res
1585 })
1586 .await
1587 .unwrap()
1589}
1590
1591fn insert_chunk(
1592 txn: &Transaction<'_>,
1593 linked_chunk_id: &Key,
1594 previous: Option<u64>,
1595 new: u64,
1596 next: Option<u64>,
1597 type_str: &str,
1598) -> rusqlite::Result<()> {
1599 txn.execute(
1601 r#"
1602 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1603 VALUES (?, ?, ?, ?, ?)
1604 "#,
1605 (new, linked_chunk_id, previous, next, type_str),
1606 )?;
1607
1608 if let Some(previous) = previous {
1610 let updated = txn.execute(
1611 r#"
1612 UPDATE linked_chunks
1613 SET next = ?
1614 WHERE id = ? AND linked_chunk_id = ?
1615 "#,
1616 (new, previous, linked_chunk_id),
1617 )?;
1618 if updated < 1 {
1619 return Err(rusqlite::Error::QueryReturnedNoRows);
1620 }
1621 if updated > 1 {
1622 return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1623 }
1624 }
1625
1626 if let Some(next) = next {
1628 let updated = txn.execute(
1629 r#"
1630 UPDATE linked_chunks
1631 SET previous = ?
1632 WHERE id = ? AND linked_chunk_id = ?
1633 "#,
1634 (new, next, linked_chunk_id),
1635 )?;
1636 if updated < 1 {
1637 return Err(rusqlite::Error::QueryReturnedNoRows);
1638 }
1639 if updated > 1 {
1640 return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1641 }
1642 }
1643
1644 Ok(())
1645}
1646
1647#[cfg(test)]
1648mod tests {
1649 use std::{
1650 path::PathBuf,
1651 sync::atomic::{AtomicU32, Ordering::SeqCst},
1652 };
1653
1654 use assert_matches::assert_matches;
1655 use matrix_sdk_base::{
1656 event_cache::store::{
1657 EventCacheStore, EventCacheStoreError, IntoEventCacheStore,
1658 integration_tests::{EventCacheStoreIntegrationTests, make_test_event_with_event_id},
1659 },
1660 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1661 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1662 };
1663 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1664 use once_cell::sync::Lazy;
1665 use ruma::event_id;
1666 use tempfile::{TempDir, tempdir};
1667
1668 use super::SqliteEventCacheStore;
1669 use crate::{
1670 SqliteStoreConfig,
1671 event_cache_store::keys,
1672 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1673 };
1674
1675 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1676 static NUM: AtomicU32 = AtomicU32::new(0);
1677
1678 fn new_event_cache_store_workspace() -> PathBuf {
1679 let name = NUM.fetch_add(1, SeqCst).to_string();
1680 TMP_DIR.path().join(name)
1681 }
1682
1683 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1684 let tmpdir_path = new_event_cache_store_workspace();
1685
1686 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1687
1688 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1689 }
1690
1691 event_cache_store_integration_tests!();
1692 event_cache_store_integration_tests_time!();
1693
1694 #[async_test]
1695 async fn test_pool_size() {
1696 let tmpdir_path = new_event_cache_store_workspace();
1697 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1698
1699 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1700
1701 assert_eq!(store.pool.status().max_size, 42);
1702 }
1703
1704 #[async_test]
1705 async fn test_linked_chunk_remove_chunk() {
1706 let store = get_event_cache_store().await.expect("creating cache store failed");
1707
1708 store.clone().into_event_cache_store().test_linked_chunk_remove_chunk().await;
1710
1711 let gaps = store
1713 .read()
1714 .await
1715 .unwrap()
1716 .with_transaction(|txn| -> rusqlite::Result<_> {
1717 let mut gaps = Vec::new();
1718 for data in txn
1719 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1720 .query_map((), |row| row.get::<_, u64>(0))?
1721 {
1722 gaps.push(data?);
1723 }
1724 Ok(gaps)
1725 })
1726 .await
1727 .unwrap();
1728
1729 assert_eq!(gaps, vec![42, 44]);
1732 }
1733
1734 #[async_test]
1735 async fn test_linked_chunk_remove_item() {
1736 let store = get_event_cache_store().await.expect("creating cache store failed");
1737
1738 store.clone().into_event_cache_store().test_linked_chunk_remove_item().await;
1740
1741 let room_id = *DEFAULT_TEST_ROOM_ID;
1742 let linked_chunk_id = LinkedChunkId::Room(room_id);
1743
1744 let num_rows: u64 = store
1746 .read()
1747 .await
1748 .unwrap()
1749 .with_transaction(move |txn| {
1750 txn.query_row(
1751 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1752 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1753 |row| row.get(0),
1754 )
1755 })
1756 .await
1757 .unwrap();
1758 assert_eq!(num_rows, 3);
1759 }
1760
1761 #[async_test]
1762 async fn test_linked_chunk_clear() {
1763 let store = get_event_cache_store().await.expect("creating cache store failed");
1764
1765 store.clone().into_event_cache_store().test_linked_chunk_clear().await;
1767
1768 store
1770 .read()
1771 .await
1772 .unwrap()
1773 .with_transaction(|txn| -> rusqlite::Result<_> {
1774 let num_gaps = txn
1775 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
1776 .query_row((), |row| row.get::<_, u64>(0))?;
1777 assert_eq!(num_gaps, 0);
1778
1779 let num_events = txn
1780 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
1781 .query_row((), |row| row.get::<_, u64>(0))?;
1782 assert_eq!(num_events, 0);
1783
1784 Ok(())
1785 })
1786 .await
1787 .unwrap();
1788 }
1789
1790 #[async_test]
1791 async fn test_linked_chunk_update_is_a_transaction() {
1792 let store = get_event_cache_store().await.expect("creating cache store failed");
1793
1794 let room_id = *DEFAULT_TEST_ROOM_ID;
1795 let linked_chunk_id = LinkedChunkId::Room(room_id);
1796
1797 let err = store
1800 .handle_linked_chunk_updates(
1801 linked_chunk_id,
1802 vec![
1803 Update::NewItemsChunk {
1804 previous: None,
1805 new: ChunkIdentifier::new(42),
1806 next: None,
1807 },
1808 Update::NewItemsChunk {
1809 previous: None,
1810 new: ChunkIdentifier::new(42),
1811 next: None,
1812 },
1813 ],
1814 )
1815 .await
1816 .unwrap_err();
1817
1818 assert_matches!(err, crate::error::Error::Sqlite(err) => {
1820 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1821 });
1822
1823 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1827 assert!(chunks.is_empty());
1828 }
1829
1830 #[async_test]
1831 async fn test_event_chunks_allows_same_event_in_room_and_thread() {
1832 let store = get_event_cache_store().await.expect("creating cache store failed");
1837
1838 let room_id = *DEFAULT_TEST_ROOM_ID;
1839 let thread_root = event_id!("$thread_root");
1840
1841 let event = make_test_event_with_event_id(
1844 room_id,
1845 "thread reply",
1846 Some(event_id!("$thread_reply")),
1847 );
1848
1849 let room_linked_chunk_id = LinkedChunkId::Room(room_id);
1850 let thread_linked_chunk_id = LinkedChunkId::Thread(room_id, thread_root);
1851
1852 store
1854 .handle_linked_chunk_updates(
1855 room_linked_chunk_id,
1856 vec![
1857 Update::NewItemsChunk {
1858 previous: None,
1859 new: ChunkIdentifier::new(1),
1860 next: None,
1861 },
1862 Update::PushItems {
1863 at: Position::new(ChunkIdentifier::new(1), 0),
1864 items: vec![event.clone()],
1865 },
1866 ],
1867 )
1868 .await
1869 .unwrap();
1870
1871 store
1873 .handle_linked_chunk_updates(
1874 thread_linked_chunk_id,
1875 vec![
1876 Update::NewItemsChunk {
1877 previous: None,
1878 new: ChunkIdentifier::new(1),
1879 next: None,
1880 },
1881 Update::PushItems {
1882 at: Position::new(ChunkIdentifier::new(1), 0),
1883 items: vec![event],
1884 },
1885 ],
1886 )
1887 .await
1888 .unwrap();
1889
1890 let room_chunks = store.load_all_chunks(room_linked_chunk_id).await.unwrap();
1892 let thread_chunks = store.load_all_chunks(thread_linked_chunk_id).await.unwrap();
1893
1894 assert_eq!(room_chunks.len(), 1);
1895 assert_eq!(thread_chunks.len(), 1);
1896
1897 assert_matches!(&room_chunks[0].content, ChunkContent::Items(events) => {
1899 assert_eq!(events.len(), 1);
1900 assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1901 });
1902 assert_matches!(&thread_chunks[0].content, ChunkContent::Items(events) => {
1903 assert_eq!(events.len(), 1);
1904 assert_eq!(events[0].event_id().as_deref(), Some(event_id!("$thread_reply")));
1905 });
1906 }
1907}
1908
1909#[cfg(test)]
1910mod encrypted_tests {
1911 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
1912
1913 use matrix_sdk_base::{
1914 event_cache::store::{EventCacheStore, EventCacheStoreError},
1915 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1916 };
1917 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1918 use once_cell::sync::Lazy;
1919 use ruma::{
1920 event_id,
1921 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1922 room_id, user_id,
1923 };
1924 use tempfile::{TempDir, tempdir};
1925
1926 use super::SqliteEventCacheStore;
1927
1928 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1929 static NUM: AtomicU32 = AtomicU32::new(0);
1930
1931 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1932 let name = NUM.fetch_add(1, SeqCst).to_string();
1933 let tmpdir_path = TMP_DIR.path().join(name);
1934
1935 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1936
1937 Ok(SqliteEventCacheStore::open(
1938 tmpdir_path.to_str().unwrap(),
1939 Some("default_test_password"),
1940 )
1941 .await
1942 .unwrap())
1943 }
1944
1945 event_cache_store_integration_tests!();
1946 event_cache_store_integration_tests_time!();
1947
1948 #[async_test]
1949 async fn test_no_sqlite_injection_in_find_event_relations() {
1950 let room_id = room_id!("!test:localhost");
1951 let another_room_id = room_id!("!r1:matrix.org");
1952 let sender = user_id!("@alice:localhost");
1953
1954 let store = get_event_cache_store()
1955 .await
1956 .expect("We should be able to create a new, empty, event cache store");
1957
1958 let f = EventFactory::new().room(room_id).sender(sender);
1959
1960 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
1962 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
1963
1964 let edit_id = event_id!("$find_me:matrix.org");
1966 let edit = f
1967 .text_msg("Find me")
1968 .event_id(edit_id)
1969 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
1970 .into_event();
1971
1972 let f = f.room(another_room_id);
1974
1975 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
1976 let another_event =
1977 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
1978
1979 store.save_event(room_id, event).await.unwrap();
1981 store.save_event(room_id, edit).await.unwrap();
1982 store.save_event(another_room_id, another_event).await.unwrap();
1983
1984 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
1988
1989 let results = store
1991 .find_event_relations(room_id, event_id, filter.as_deref())
1992 .await
1993 .expect("We should be able to attempt to find event relations");
1994
1995 similar_asserts::assert_eq!(
1997 results.len(),
1998 1,
1999 "We should only have loaded events for the first room {results:#?}"
2000 );
2001
2002 let (found_event, _) = &results[0];
2004 assert_eq!(
2005 found_event.event_id().as_deref(),
2006 Some(edit_id),
2007 "The single event we found should be the edit event"
2008 );
2009 }
2010}