1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21 cross_process_lock::CrossProcessLockGeneration,
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 Event, Gap,
25 store::{EventCacheStore, extract_event_relation},
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
36};
37use rusqlite::{
38 OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
39};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47 OpenStoreError, Secret, SqliteStoreConfig,
48 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
49 error::{Error, Result},
50 utils::{
51 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
52 SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
53 },
54};
55
56mod keys {
57 pub const LINKED_CHUNKS: &str = "linked_chunks";
59 pub const EVENTS: &str = "events";
60}
61
62const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65const DATABASE_VERSION: u8 = 13;
71
72const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82 store_cipher: Option<Arc<StoreCipher>>,
83
84 pool: SqlitePool,
86
87 write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98 }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102 fn get_cypher(&self) -> Option<&StoreCipher> {
103 self.store_cipher.as_deref()
104 }
105}
106
107impl SqliteEventCacheStore {
108 pub async fn open(
111 path: impl AsRef<Path>,
112 passphrase: Option<&str>,
113 ) -> Result<Self, OpenStoreError> {
114 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115 }
116
117 pub async fn open_with_key(
120 path: impl AsRef<Path>,
121 key: Option<&[u8; 32]>,
122 ) -> Result<Self, OpenStoreError> {
123 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124 }
125
126 #[instrument(skip(config), fields(path = ?config.path))]
128 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129 debug!(?config);
130
131 let _timer = timer!("open_with_config");
132
133 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137 let this = Self::open_with_pool(pool, config.secret).await?;
138 this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140 Ok(this)
141 }
142
143 async fn open_with_pool(
146 pool: SqlitePool,
147 secret: Option<Secret>,
148 ) -> Result<Self, OpenStoreError> {
149 let conn = pool.get().await?;
150
151 let version = conn.db_version().await?;
152 run_migrations(&conn, version).await?;
153
154 let store_cipher = match secret {
155 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
156 None => None,
157 };
158
159 Ok(Self {
160 store_cipher,
161 pool,
162 write_connection: Arc::new(Mutex::new(conn)),
164 })
165 }
166
167 #[instrument(skip_all)]
169 async fn read(&self) -> Result<SqliteAsyncConn> {
170 let connection = self.pool.get().await?;
171
172 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
177
178 Ok(connection)
179 }
180
181 #[instrument(skip_all)]
183 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
184 let connection = self.write_connection.clone().lock_owned().await;
185
186 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
191
192 Ok(connection)
193 }
194
195 fn map_row_to_chunk(
196 row: &rusqlite::Row<'_>,
197 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
198 Ok((
199 row.get::<_, u64>(0)?,
200 row.get::<_, Option<u64>>(1)?,
201 row.get::<_, Option<u64>>(2)?,
202 row.get::<_, String>(3)?,
203 ))
204 }
205
206 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
207 let serialized = serde_json::to_vec(event)?;
208
209 let raw_event = event.raw();
211 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
212
213 let content = self.encode_value(serialized)?;
215
216 Ok(EncodedEvent {
217 content,
218 rel_type,
219 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
220 })
221 }
222
223 pub async fn vacuum(&self) -> Result<()> {
224 self.write_connection.lock().await.vacuum().await
225 }
226
227 async fn get_db_size(&self) -> Result<Option<usize>> {
228 Ok(Some(self.pool.get().await?.get_db_size().await?))
229 }
230}
231
232struct EncodedEvent {
233 content: Vec<u8>,
234 rel_type: Option<String>,
235 relates_to: Option<String>,
236}
237
238trait TransactionExtForLinkedChunks {
239 fn rebuild_chunk(
240 &self,
241 store: &SqliteEventCacheStore,
242 linked_chunk_id: &Key,
243 previous: Option<u64>,
244 index: u64,
245 next: Option<u64>,
246 chunk_type: &str,
247 ) -> Result<RawChunk<Event, Gap>>;
248
249 fn load_gap_content(
250 &self,
251 store: &SqliteEventCacheStore,
252 linked_chunk_id: &Key,
253 chunk_id: ChunkIdentifier,
254 ) -> Result<Gap>;
255
256 fn load_events_content(
257 &self,
258 store: &SqliteEventCacheStore,
259 linked_chunk_id: &Key,
260 chunk_id: ChunkIdentifier,
261 ) -> Result<Vec<Event>>;
262}
263
264impl TransactionExtForLinkedChunks for Transaction<'_> {
265 fn rebuild_chunk(
266 &self,
267 store: &SqliteEventCacheStore,
268 linked_chunk_id: &Key,
269 previous: Option<u64>,
270 id: u64,
271 next: Option<u64>,
272 chunk_type: &str,
273 ) -> Result<RawChunk<Event, Gap>> {
274 let previous = previous.map(ChunkIdentifier::new);
275 let next = next.map(ChunkIdentifier::new);
276 let id = ChunkIdentifier::new(id);
277
278 match chunk_type {
279 CHUNK_TYPE_GAP_TYPE_STRING => {
280 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
282 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
283 }
284
285 CHUNK_TYPE_EVENT_TYPE_STRING => {
286 let events = self.load_events_content(store, linked_chunk_id, id)?;
288 Ok(RawChunk {
289 content: ChunkContent::Items(events),
290 previous,
291 identifier: id,
292 next,
293 })
294 }
295
296 other => {
297 Err(Error::InvalidData {
299 details: format!("a linked chunk has an unknown type {other}"),
300 })
301 }
302 }
303 }
304
305 fn load_gap_content(
306 &self,
307 store: &SqliteEventCacheStore,
308 linked_chunk_id: &Key,
309 chunk_id: ChunkIdentifier,
310 ) -> Result<Gap> {
311 let encoded_prev_token: Vec<u8> = self.query_row(
314 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
315 (chunk_id.index(), &linked_chunk_id),
316 |row| row.get(0),
317 )?;
318 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
319 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
320 Ok(Gap { prev_token })
321 }
322
323 fn load_events_content(
324 &self,
325 store: &SqliteEventCacheStore,
326 linked_chunk_id: &Key,
327 chunk_id: ChunkIdentifier,
328 ) -> Result<Vec<Event>> {
329 let mut events = Vec::new();
331
332 for event_data in self
333 .prepare(
334 r#"
335 SELECT events.content
336 FROM event_chunks ec, events
337 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
338 ORDER BY ec.position ASC
339 "#,
340 )?
341 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
342 {
343 let encoded_content = event_data?;
344 let serialized_content = store.decode_value(&encoded_content)?;
345 let event = serde_json::from_slice(&serialized_content)?;
346
347 events.push(event);
348 }
349
350 Ok(events)
351 }
352}
353
354async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
356 if version == 0 {
357 debug!("Creating database");
358 } else if version < DATABASE_VERSION {
359 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
360 } else {
361 return Ok(());
362 }
363
364 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
366
367 if version < 1 {
368 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
371 conn.with_transaction(|txn| {
372 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
373 txn.set_db_version(1)
374 })
375 .await?;
376 }
377
378 if version < 2 {
379 conn.with_transaction(|txn| {
380 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
381 txn.set_db_version(2)
382 })
383 .await?;
384 }
385
386 if version < 3 {
387 conn.with_transaction(|txn| {
388 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
389 txn.set_db_version(3)
390 })
391 .await?;
392 }
393
394 if version < 4 {
395 conn.with_transaction(|txn| {
396 txn.execute_batch(include_str!(
397 "../migrations/event_cache_store/004_ignore_policy.sql"
398 ))?;
399 txn.set_db_version(4)
400 })
401 .await?;
402 }
403
404 if version < 5 {
405 conn.with_transaction(|txn| {
406 txn.execute_batch(include_str!(
407 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
408 ))?;
409 txn.set_db_version(5)
410 })
411 .await?;
412 }
413
414 if version < 6 {
415 conn.with_transaction(|txn| {
416 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
417 txn.set_db_version(6)
418 })
419 .await?;
420 }
421
422 if version < 7 {
423 conn.with_transaction(|txn| {
424 txn.execute_batch(include_str!(
425 "../migrations/event_cache_store/007_event_chunks.sql"
426 ))?;
427 txn.set_db_version(7)
428 })
429 .await?;
430 }
431
432 if version < 8 {
433 conn.with_transaction(|txn| {
434 txn.execute_batch(include_str!(
435 "../migrations/event_cache_store/008_linked_chunk_id.sql"
436 ))?;
437 txn.set_db_version(8)
438 })
439 .await?;
440 }
441
442 if version < 9 {
443 conn.with_transaction(|txn| {
444 txn.execute_batch(include_str!(
445 "../migrations/event_cache_store/009_related_event_index.sql"
446 ))?;
447 txn.set_db_version(9)
448 })
449 .await?;
450 }
451
452 if version < 10 {
453 conn.with_transaction(|txn| {
454 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
455 txn.set_db_version(10)
456 })
457 .await?;
458
459 if version >= 1 {
460 conn.vacuum().await?;
463 }
464 }
465
466 if version < 11 {
467 conn.with_transaction(|txn| {
468 txn.execute_batch(include_str!(
469 "../migrations/event_cache_store/011_empty_event_cache.sql"
470 ))?;
471 txn.set_db_version(11)
472 })
473 .await?;
474 }
475
476 if version < 12 {
477 conn.with_transaction(|txn| {
478 txn.execute_batch(include_str!(
479 "../migrations/event_cache_store/012_store_event_type.sql"
480 ))?;
481 txn.set_db_version(12)
482 })
483 .await?;
484 }
485
486 if version < 13 {
487 conn.with_transaction(|txn| {
488 txn.execute_batch(include_str!(
489 "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
490 ))?;
491 txn.set_db_version(13)
492 })
493 .await?;
494 }
495
496 Ok(())
497}
498
499#[async_trait]
500impl EventCacheStore for SqliteEventCacheStore {
501 type Error = Error;
502
503 #[instrument(skip(self))]
504 async fn try_take_leased_lock(
505 &self,
506 lease_duration_ms: u32,
507 key: &str,
508 holder: &str,
509 ) -> Result<Option<CrossProcessLockGeneration>> {
510 let key = key.to_owned();
511 let holder = holder.to_owned();
512
513 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
514 let expiration = now + lease_duration_ms as u64;
515
516 let generation = self
518 .write()
519 .await?
520 .with_transaction(move |txn| {
521 txn.query_row(
522 "INSERT INTO lease_locks (key, holder, expiration)
523 VALUES (?1, ?2, ?3)
524 ON CONFLICT (key)
525 DO
526 UPDATE SET
527 holder = excluded.holder,
528 expiration = excluded.expiration,
529 generation =
530 CASE holder
531 WHEN excluded.holder THEN generation
532 ELSE generation + 1
533 END
534 WHERE
535 holder = excluded.holder
536 OR expiration < ?4
537 RETURNING generation
538 ",
539 (key, holder, expiration, now),
540 |row| row.get(0),
541 )
542 .optional()
543 })
544 .await?;
545
546 Ok(generation)
547 }
548
549 #[instrument(skip(self, updates))]
550 async fn handle_linked_chunk_updates(
551 &self,
552 linked_chunk_id: LinkedChunkId<'_>,
553 updates: Vec<Update<Event, Gap>>,
554 ) -> Result<(), Self::Error> {
555 let _timer = timer!("method");
556
557 let hashed_linked_chunk_id =
560 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
561 let linked_chunk_id = linked_chunk_id.to_owned();
562 let this = self.clone();
563
564 with_immediate_transaction(self, move |txn| {
565 for up in updates {
566 match up {
567 Update::NewItemsChunk { previous, new, next } => {
568 let previous = previous.as_ref().map(ChunkIdentifier::index);
569 let new = new.index();
570 let next = next.as_ref().map(ChunkIdentifier::index);
571
572 trace!(
573 %linked_chunk_id,
574 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
575 );
576
577 insert_chunk(
578 txn,
579 &hashed_linked_chunk_id,
580 previous,
581 new,
582 next,
583 CHUNK_TYPE_EVENT_TYPE_STRING,
584 )?;
585 }
586
587 Update::NewGapChunk { previous, new, next, gap } => {
588 let serialized = serde_json::to_vec(&gap.prev_token)?;
589 let prev_token = this.encode_value(serialized)?;
590
591 let previous = previous.as_ref().map(ChunkIdentifier::index);
592 let new = new.index();
593 let next = next.as_ref().map(ChunkIdentifier::index);
594
595 trace!(
596 %linked_chunk_id,
597 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
598 );
599
600 insert_chunk(
602 txn,
603 &hashed_linked_chunk_id,
604 previous,
605 new,
606 next,
607 CHUNK_TYPE_GAP_TYPE_STRING,
608 )?;
609
610 txn.execute(
612 r#"
613 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
614 VALUES (?, ?, ?)
615 "#,
616 (new, &hashed_linked_chunk_id, prev_token),
617 )?;
618 }
619
620 Update::RemoveChunk(chunk_identifier) => {
621 let chunk_id = chunk_identifier.index();
622
623 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
624
625 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
627 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
628 (chunk_id, &hashed_linked_chunk_id),
629 |row| Ok((row.get(0)?, row.get(1)?))
630 )?;
631
632 if let Some(previous) = previous {
634 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
635 }
636
637 if let Some(next) = next {
639 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
640 }
641
642 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
645 }
646
647 Update::PushItems { at, items } => {
648 if items.is_empty() {
649 continue;
651 }
652
653 let chunk_id = at.chunk_identifier().index();
654
655 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
656
657 let mut chunk_statement = txn.prepare(
658 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
659 )?;
660
661 let mut content_statement = txn.prepare(
666 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
667 )?;
668
669 let invalid_event = |event: TimelineEvent| {
670 let Some(event_id) = event.event_id() else {
671 error!(%linked_chunk_id, "Trying to push an event with no ID");
672 return None;
673 };
674
675 let Some(event_type) = event.kind.event_type() else {
676 error!(%event_id, "Trying to save an event with no event type");
677 return None;
678 };
679
680 Some((event_id.to_string(), event_type, event))
681 };
682
683 let room_id = linked_chunk_id.room_id();
684 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
685
686 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
687 let index = at.index() + i;
689 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
690
691 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
692 let event_type = this.encode_key(keys::EVENTS, event_type);
693
694 let encoded_event = this.encode_event(&event)?;
696 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
697 }
698 }
699
700 Update::ReplaceItem { at, item: event } => {
701 let chunk_id = at.chunk_identifier().index();
702
703 let index = at.index();
704
705 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
706
707 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
709 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
710 continue;
711 };
712
713 let Some(event_type) = event.kind.event_type() else {
714 error!(%event_id, "Trying to save an event with no event type");
715 continue;
716 };
717
718 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
719 let event_type = this.encode_key(keys::EVENTS, event_type);
720
721 let encoded_event = this.encode_event(&event)?;
725 let room_id = linked_chunk_id.room_id();
726 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
727 txn.execute(
728 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
729 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
730
731 txn.execute(
733 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
734 (event_id, &hashed_linked_chunk_id, chunk_id, index)
735 )?;
736 }
737
738 Update::RemoveItem { at } => {
739 let chunk_id = at.chunk_identifier().index();
740 let index = at.index();
741
742 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
743
744 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
746
747 txn.execute(
846 r#"
847 UPDATE event_chunks
848 SET position = -(position - 1)
849 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
850 "#,
851 (&hashed_linked_chunk_id, chunk_id, index)
852 )?;
853 txn.execute(
854 r#"
855 UPDATE event_chunks
856 SET position = -position
857 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
858 "#,
859 (&hashed_linked_chunk_id, chunk_id)
860 )?;
861 }
862
863 Update::DetachLastItems { at } => {
864 let chunk_id = at.chunk_identifier().index();
865 let index = at.index();
866
867 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
868
869 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
871 }
872
873 Update::Clear => {
874 trace!(%linked_chunk_id, "clearing items");
875
876 txn.execute(
878 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
879 (&hashed_linked_chunk_id,),
880 )?;
881 }
882
883 Update::StartReattachItems | Update::EndReattachItems => {
884 }
886 }
887 }
888
889 Ok(())
890 })
891 .await?;
892
893 Ok(())
894 }
895
896 #[instrument(skip(self))]
897 async fn load_all_chunks(
898 &self,
899 linked_chunk_id: LinkedChunkId<'_>,
900 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
901 let _timer = timer!("method");
902
903 let hashed_linked_chunk_id =
904 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
905
906 let this = self.clone();
907
908 let result = self
909 .read()
910 .await?
911 .with_transaction(move |txn| -> Result<_> {
912 let mut items = Vec::new();
913
914 for data in txn
916 .prepare(
917 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
918 )?
919 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
920 {
921 let (id, previous, next, chunk_type) = data?;
922 let new = txn.rebuild_chunk(
923 &this,
924 &hashed_linked_chunk_id,
925 previous,
926 id,
927 next,
928 chunk_type.as_str(),
929 )?;
930 items.push(new);
931 }
932
933 Ok(items)
934 })
935 .await?;
936
937 Ok(result)
938 }
939
940 #[instrument(skip(self))]
941 async fn load_all_chunks_metadata(
942 &self,
943 linked_chunk_id: LinkedChunkId<'_>,
944 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
945 let _timer = timer!("method");
946
947 let hashed_linked_chunk_id =
948 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
949
950 self.read()
951 .await?
952 .with_transaction(move |txn| -> Result<_> {
953 let num_events_by_chunk_ids = txn
980 .prepare(
981 r#"
982 SELECT ec.chunk_id, COUNT(ec.event_id)
983 FROM event_chunks as ec
984 WHERE ec.linked_chunk_id = ?
985 GROUP BY ec.chunk_id
986 "#,
987 )?
988 .query_map((&hashed_linked_chunk_id,), |row| {
989 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
990 })?
991 .collect::<Result<HashMap<_, _>, _>>()?;
992
993 txn.prepare(
994 r#"
995 SELECT
996 lc.id,
997 lc.previous,
998 lc.next,
999 lc.type
1000 FROM linked_chunks as lc
1001 WHERE lc.linked_chunk_id = ?
1002 ORDER BY lc.id"#,
1003 )?
1004 .query_map((&hashed_linked_chunk_id,), |row| {
1005 Ok((
1006 row.get::<_, u64>(0)?,
1007 row.get::<_, Option<u64>>(1)?,
1008 row.get::<_, Option<u64>>(2)?,
1009 row.get::<_, String>(3)?,
1010 ))
1011 })?
1012 .map(|data| -> Result<_> {
1013 let (id, previous, next, chunk_type) = data?;
1014
1015 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1021 0
1022 } else {
1023 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1024 };
1025
1026 Ok(ChunkMetadata {
1027 identifier: ChunkIdentifier::new(id),
1028 previous: previous.map(ChunkIdentifier::new),
1029 next: next.map(ChunkIdentifier::new),
1030 num_items,
1031 })
1032 })
1033 .collect::<Result<Vec<_>, _>>()
1034 })
1035 .await
1036 }
1037
1038 #[instrument(skip(self))]
1039 async fn load_last_chunk(
1040 &self,
1041 linked_chunk_id: LinkedChunkId<'_>,
1042 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1043 let _timer = timer!("method");
1044
1045 let hashed_linked_chunk_id =
1046 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1047
1048 let this = self.clone();
1049
1050 self
1051 .read()
1052 .await?
1053 .with_transaction(move |txn| -> Result<_> {
1054 let (observed_max_identifier, number_of_chunks) = txn
1056 .prepare(
1057 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1058 )?
1059 .query_row(
1060 (&hashed_linked_chunk_id,),
1061 |row| {
1062 Ok((
1063 row.get::<_, Option<u64>>(0)?,
1068 row.get::<_, u64>(1)?,
1069 ))
1070 }
1071 )?;
1072
1073 let chunk_identifier_generator = match observed_max_identifier {
1074 Some(max_observed_identifier) => {
1075 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1076 ChunkIdentifier::new(max_observed_identifier)
1077 )
1078 },
1079 None => ChunkIdentifierGenerator::new_from_scratch(),
1080 };
1081
1082 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1084 .prepare(
1085 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1086 )?
1087 .query_row(
1088 (&hashed_linked_chunk_id,),
1089 |row| {
1090 Ok((
1091 row.get::<_, u64>(0)?,
1092 row.get::<_, Option<u64>>(1)?,
1093 row.get::<_, String>(2)?,
1094 ))
1095 }
1096 )
1097 .optional()?
1098 else {
1099 if number_of_chunks == 0 {
1102 return Ok((None, chunk_identifier_generator));
1103 }
1104 else {
1109 return Err(Error::InvalidData {
1110 details:
1111 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1112 .to_owned()
1113 }
1114 )
1115 }
1116 };
1117
1118 let last_chunk = txn.rebuild_chunk(
1120 &this,
1121 &hashed_linked_chunk_id,
1122 previous_chunk,
1123 chunk_identifier,
1124 None,
1125 &chunk_type
1126 )?;
1127
1128 Ok((Some(last_chunk), chunk_identifier_generator))
1129 })
1130 .await
1131 }
1132
1133 #[instrument(skip(self))]
1134 async fn load_previous_chunk(
1135 &self,
1136 linked_chunk_id: LinkedChunkId<'_>,
1137 before_chunk_identifier: ChunkIdentifier,
1138 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1139 let _timer = timer!("method");
1140
1141 let hashed_linked_chunk_id =
1142 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1143
1144 let this = self.clone();
1145
1146 self
1147 .read()
1148 .await?
1149 .with_transaction(move |txn| -> Result<_> {
1150 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1152 .prepare(
1153 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1154 )?
1155 .query_row(
1156 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1157 |row| {
1158 Ok((
1159 row.get::<_, u64>(0)?,
1160 row.get::<_, Option<u64>>(1)?,
1161 row.get::<_, Option<u64>>(2)?,
1162 row.get::<_, String>(3)?,
1163 ))
1164 }
1165 )
1166 .optional()?
1167 else {
1168 return Ok(None);
1170 };
1171
1172 let last_chunk = txn.rebuild_chunk(
1174 &this,
1175 &hashed_linked_chunk_id,
1176 previous_chunk,
1177 chunk_identifier,
1178 next_chunk,
1179 &chunk_type
1180 )?;
1181
1182 Ok(Some(last_chunk))
1183 })
1184 .await
1185 }
1186
1187 #[instrument(skip(self))]
1188 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1189 let _timer = timer!("method");
1190
1191 self.write()
1192 .await?
1193 .with_transaction(move |txn| {
1194 txn.execute("DELETE FROM linked_chunks", ())?;
1196 txn.execute("DELETE FROM events", ())
1198 })
1199 .await?;
1200
1201 Ok(())
1202 }
1203
1204 #[instrument(skip(self, events))]
1205 async fn filter_duplicated_events(
1206 &self,
1207 linked_chunk_id: LinkedChunkId<'_>,
1208 events: Vec<OwnedEventId>,
1209 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1210 let _timer = timer!("method");
1211
1212 if events.is_empty() {
1216 return Ok(Vec::new());
1217 }
1218
1219 let hashed_linked_chunk_id =
1221 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1222 let linked_chunk_id = linked_chunk_id.to_owned();
1223
1224 self.read()
1225 .await?
1226 .with_transaction(move |txn| -> Result<_> {
1227 txn.chunk_large_query_over(events, None, move |txn, events| {
1228 let query = format!(
1229 r#"
1230 SELECT event_id, chunk_id, position
1231 FROM event_chunks
1232 WHERE linked_chunk_id = ? AND event_id IN ({})
1233 ORDER BY chunk_id ASC, position ASC
1234 "#,
1235 repeat_vars(events.len()),
1236 );
1237
1238 let parameters = params_from_iter(
1239 once(
1241 hashed_linked_chunk_id
1242 .to_sql()
1243 .unwrap(),
1245 )
1246 .chain(events.iter().map(|event| {
1248 event
1249 .as_str()
1250 .to_sql()
1251 .unwrap()
1253 })),
1254 );
1255
1256 let mut duplicated_events = Vec::new();
1257
1258 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1259 Ok((
1260 row.get::<_, String>(0)?,
1261 row.get::<_, u64>(1)?,
1262 row.get::<_, usize>(2)?,
1263 ))
1264 })? {
1265 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1266
1267 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1268 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1271 continue;
1272 };
1273
1274 duplicated_events.push((
1275 duplicated_event,
1276 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1277 ));
1278 }
1279
1280 Ok(duplicated_events)
1281 })
1282 })
1283 .await
1284 }
1285
1286 #[instrument(skip(self, event_id))]
1287 async fn find_event(
1288 &self,
1289 room_id: &RoomId,
1290 event_id: &EventId,
1291 ) -> Result<Option<Event>, Self::Error> {
1292 let _timer = timer!("method");
1293
1294 let event_id = event_id.to_owned();
1295 let this = self.clone();
1296
1297 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1298
1299 self.read()
1300 .await?
1301 .with_transaction(move |txn| -> Result<_> {
1302 let Some(event) = txn
1303 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1304 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1305 .optional()?
1306 else {
1307 return Ok(None);
1309 };
1310
1311 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1312
1313 Ok(Some(event))
1314 })
1315 .await
1316 }
1317
1318 #[instrument(skip(self, event_id, filters))]
1319 async fn find_event_relations(
1320 &self,
1321 room_id: &RoomId,
1322 event_id: &EventId,
1323 filters: Option<&[RelationType]>,
1324 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1325 let _timer = timer!("method");
1326
1327 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1328
1329 let hashed_linked_chunk_id =
1330 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1331
1332 let event_id = event_id.to_owned();
1333 let filters = filters.map(ToOwned::to_owned);
1334 let store = self.clone();
1335
1336 self.read()
1337 .await?
1338 .with_transaction(move |txn| -> Result<_> {
1339 find_event_relations_transaction(
1340 store,
1341 hashed_room_id,
1342 hashed_linked_chunk_id,
1343 event_id,
1344 filters,
1345 txn,
1346 )
1347 })
1348 .await
1349 }
1350
1351 #[instrument(skip(self))]
1352 async fn get_room_events(
1353 &self,
1354 room_id: &RoomId,
1355 event_type: Option<&str>,
1356 session_id: Option<&str>,
1357 ) -> Result<Vec<Event>, Self::Error> {
1358 let _timer = timer!("method");
1359
1360 let this = self.clone();
1361
1362 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1363 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1364 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1365
1366 self.read()
1367 .await?
1368 .with_transaction(move |txn| -> Result<_> {
1369 #[allow(clippy::redundant_clone)]
1373 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1374 (None, None) => {
1375 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1376 }
1377 (None, Some(session_id)) => (
1378 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1379 params![hashed_room_id, session_id.to_owned()],
1380 ),
1381 (Some(event_type), None) => (
1382 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1383 params![hashed_room_id, event_type.to_owned()]
1384 ),
1385 (Some(event_type), Some(session_id)) => (
1386 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1387 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1388 ),
1389 };
1390
1391 let mut statement = txn.prepare(query)?;
1392 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1393
1394 let mut events = Vec::new();
1395 for ev in maybe_events {
1396 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1397 events.push(event);
1398 }
1399
1400 Ok(events)
1401 })
1402 .await
1403 }
1404
1405 #[instrument(skip(self, event))]
1406 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1407 let _timer = timer!("method");
1408
1409 let Some(event_id) = event.event_id() else {
1410 error!("Trying to save an event with no ID");
1411 return Ok(());
1412 };
1413
1414 let Some(event_type) = event.kind.event_type() else {
1415 error!(%event_id, "Trying to save an event with no event type");
1416 return Ok(());
1417 };
1418
1419 let event_type = self.encode_key(keys::EVENTS, event_type);
1420 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1421
1422 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1423 let event_id = event_id.to_string();
1424 let encoded_event = self.encode_event(&event)?;
1425
1426 self.write()
1427 .await?
1428 .with_transaction(move |txn| -> Result<_> {
1429 txn.execute(
1430 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1431 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1432
1433 Ok(())
1434 })
1435 .await
1436 }
1437
1438 async fn optimize(&self) -> Result<(), Self::Error> {
1439 Ok(self.vacuum().await?)
1440 }
1441
1442 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1443 self.get_db_size().await
1444 }
1445}
1446
1447fn find_event_relations_transaction(
1448 store: SqliteEventCacheStore,
1449 hashed_room_id: Key,
1450 hashed_linked_chunk_id: Key,
1451 event_id: OwnedEventId,
1452 filters: Option<Vec<RelationType>>,
1453 txn: &Transaction<'_>,
1454) -> Result<Vec<(Event, Option<Position>)>> {
1455 let get_rows = |row: &rusqlite::Row<'_>| {
1456 Ok((
1457 row.get::<_, Vec<u8>>(0)?,
1458 row.get::<_, Option<u64>>(1)?,
1459 row.get::<_, Option<usize>>(2)?,
1460 ))
1461 };
1462
1463 let collect_results = |transaction| {
1465 let mut related = Vec::new();
1466
1467 for result in transaction {
1468 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1469
1470 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1471
1472 let pos = chunk_id
1475 .zip(index)
1476 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1477
1478 related.push((event, pos));
1479 }
1480
1481 Ok(related)
1482 };
1483
1484 if let Some(filters) = filters {
1485 let question_marks = repeat_vars(filters.len());
1486 let query = format!(
1487 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1488 FROM events
1489 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1490 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1491 );
1492
1493 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1498 let filters_params: Vec<_> = filter_strings
1499 .iter()
1500 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1501 .collect();
1502
1503 let parameters = params_from_iter(
1504 [
1505 hashed_linked_chunk_id.to_sql().expect(
1506 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1507 ),
1508 event_id
1509 .as_str()
1510 .to_sql()
1511 .expect("We should be able to convert an event ID to a SQLite value"),
1512 hashed_room_id
1513 .to_sql()
1514 .expect("We should be able to convert a room ID to a SQLite value"),
1515 ]
1516 .into_iter()
1517 .chain(filters_params),
1518 );
1519
1520 let mut transaction = txn.prepare(&query)?;
1521 let transaction = transaction.query_map(parameters, get_rows)?;
1522
1523 collect_results(transaction)
1524 } else {
1525 let query =
1526 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1527 FROM events
1528 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1529 WHERE relates_to = ? AND room_id = ?";
1530 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1531
1532 let mut transaction = txn.prepare(query)?;
1533 let transaction = transaction.query_map(parameters, get_rows)?;
1534
1535 collect_results(transaction)
1536 }
1537}
1538
1539async fn with_immediate_transaction<
1544 T: Send + 'static,
1545 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1546>(
1547 this: &SqliteEventCacheStore,
1548 f: F,
1549) -> Result<T, Error> {
1550 this.write()
1551 .await?
1552 .interact(move |conn| -> Result<T, Error> {
1553 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1557
1558 let code = || -> Result<T, Error> {
1559 let txn = conn.transaction()?;
1560 let res = f(&txn)?;
1561 txn.commit()?;
1562 Ok(res)
1563 };
1564
1565 let res = code();
1566
1567 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1570
1571 res
1572 })
1573 .await
1574 .unwrap()
1576}
1577
1578fn insert_chunk(
1579 txn: &Transaction<'_>,
1580 linked_chunk_id: &Key,
1581 previous: Option<u64>,
1582 new: u64,
1583 next: Option<u64>,
1584 type_str: &str,
1585) -> rusqlite::Result<()> {
1586 txn.execute(
1588 r#"
1589 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1590 VALUES (?, ?, ?, ?, ?)
1591 "#,
1592 (new, linked_chunk_id, previous, next, type_str),
1593 )?;
1594
1595 if let Some(previous) = previous {
1597 txn.execute(
1598 r#"
1599 UPDATE linked_chunks
1600 SET next = ?
1601 WHERE id = ? AND linked_chunk_id = ?
1602 "#,
1603 (new, previous, linked_chunk_id),
1604 )?;
1605 }
1606
1607 if let Some(next) = next {
1609 txn.execute(
1610 r#"
1611 UPDATE linked_chunks
1612 SET previous = ?
1613 WHERE id = ? AND linked_chunk_id = ?
1614 "#,
1615 (new, next, linked_chunk_id),
1616 )?;
1617 }
1618
1619 Ok(())
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use std::{
1625 path::PathBuf,
1626 sync::atomic::{AtomicU32, Ordering::SeqCst},
1627 };
1628
1629 use assert_matches::assert_matches;
1630 use matrix_sdk_base::{
1631 event_cache::{
1632 Gap,
1633 store::{
1634 EventCacheStore, EventCacheStoreError,
1635 integration_tests::{
1636 check_test_event, make_test_event, make_test_event_with_event_id,
1637 },
1638 },
1639 },
1640 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1641 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1642 };
1643 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1644 use once_cell::sync::Lazy;
1645 use ruma::{event_id, room_id};
1646 use tempfile::{TempDir, tempdir};
1647
1648 use super::SqliteEventCacheStore;
1649 use crate::{
1650 SqliteStoreConfig,
1651 event_cache_store::keys,
1652 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1653 };
1654
1655 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1656 static NUM: AtomicU32 = AtomicU32::new(0);
1657
1658 fn new_event_cache_store_workspace() -> PathBuf {
1659 let name = NUM.fetch_add(1, SeqCst).to_string();
1660 TMP_DIR.path().join(name)
1661 }
1662
1663 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1664 let tmpdir_path = new_event_cache_store_workspace();
1665
1666 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1667
1668 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1669 }
1670
1671 event_cache_store_integration_tests!();
1672 event_cache_store_integration_tests_time!();
1673
1674 #[async_test]
1675 async fn test_pool_size() {
1676 let tmpdir_path = new_event_cache_store_workspace();
1677 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1678
1679 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1680
1681 assert_eq!(store.pool.status().max_size, 42);
1682 }
1683
1684 #[async_test]
1685 async fn test_linked_chunk_new_items_chunk() {
1686 let store = get_event_cache_store().await.expect("creating cache store failed");
1687
1688 let room_id = &DEFAULT_TEST_ROOM_ID;
1689 let linked_chunk_id = LinkedChunkId::Room(room_id);
1690
1691 store
1692 .handle_linked_chunk_updates(
1693 linked_chunk_id,
1694 vec![
1695 Update::NewItemsChunk {
1696 previous: None,
1697 new: ChunkIdentifier::new(42),
1698 next: None, },
1700 Update::NewItemsChunk {
1701 previous: Some(ChunkIdentifier::new(42)),
1702 new: ChunkIdentifier::new(13),
1703 next: Some(ChunkIdentifier::new(37)), },
1706 Update::NewItemsChunk {
1707 previous: Some(ChunkIdentifier::new(13)),
1708 new: ChunkIdentifier::new(37),
1709 next: None,
1710 },
1711 ],
1712 )
1713 .await
1714 .unwrap();
1715
1716 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1717
1718 assert_eq!(chunks.len(), 3);
1719
1720 {
1721 let c = chunks.remove(0);
1723 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1724 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1725 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1726 assert_matches!(c.content, ChunkContent::Items(events) => {
1727 assert!(events.is_empty());
1728 });
1729
1730 let c = chunks.remove(0);
1731 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1732 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1733 assert_eq!(c.next, None);
1734 assert_matches!(c.content, ChunkContent::Items(events) => {
1735 assert!(events.is_empty());
1736 });
1737
1738 let c = chunks.remove(0);
1739 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1740 assert_eq!(c.previous, None);
1741 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1742 assert_matches!(c.content, ChunkContent::Items(events) => {
1743 assert!(events.is_empty());
1744 });
1745 }
1746 }
1747
1748 #[async_test]
1749 async fn test_linked_chunk_new_gap_chunk() {
1750 let store = get_event_cache_store().await.expect("creating cache store failed");
1751
1752 let room_id = &DEFAULT_TEST_ROOM_ID;
1753 let linked_chunk_id = LinkedChunkId::Room(room_id);
1754
1755 store
1756 .handle_linked_chunk_updates(
1757 linked_chunk_id,
1758 vec![Update::NewGapChunk {
1759 previous: None,
1760 new: ChunkIdentifier::new(42),
1761 next: None,
1762 gap: Gap { prev_token: "raclette".to_owned() },
1763 }],
1764 )
1765 .await
1766 .unwrap();
1767
1768 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1769
1770 assert_eq!(chunks.len(), 1);
1771
1772 let c = chunks.remove(0);
1774 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1775 assert_eq!(c.previous, None);
1776 assert_eq!(c.next, None);
1777 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1778 assert_eq!(gap.prev_token, "raclette");
1779 });
1780 }
1781
1782 #[async_test]
1783 async fn test_linked_chunk_replace_item() {
1784 let store = get_event_cache_store().await.expect("creating cache store failed");
1785
1786 let room_id = &DEFAULT_TEST_ROOM_ID;
1787 let linked_chunk_id = LinkedChunkId::Room(room_id);
1788 let event_id = event_id!("$world");
1789
1790 store
1791 .handle_linked_chunk_updates(
1792 linked_chunk_id,
1793 vec![
1794 Update::NewItemsChunk {
1795 previous: None,
1796 new: ChunkIdentifier::new(42),
1797 next: None,
1798 },
1799 Update::PushItems {
1800 at: Position::new(ChunkIdentifier::new(42), 0),
1801 items: vec![
1802 make_test_event(room_id, "hello"),
1803 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1804 ],
1805 },
1806 Update::ReplaceItem {
1807 at: Position::new(ChunkIdentifier::new(42), 1),
1808 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1809 },
1810 ],
1811 )
1812 .await
1813 .unwrap();
1814
1815 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1816
1817 assert_eq!(chunks.len(), 1);
1818
1819 let c = chunks.remove(0);
1820 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1821 assert_eq!(c.previous, None);
1822 assert_eq!(c.next, None);
1823 assert_matches!(c.content, ChunkContent::Items(events) => {
1824 assert_eq!(events.len(), 2);
1825 check_test_event(&events[0], "hello");
1826 check_test_event(&events[1], "yolo");
1827 });
1828 }
1829
1830 #[async_test]
1831 async fn test_linked_chunk_remove_chunk() {
1832 let store = get_event_cache_store().await.expect("creating cache store failed");
1833
1834 let room_id = &DEFAULT_TEST_ROOM_ID;
1835 let linked_chunk_id = LinkedChunkId::Room(room_id);
1836
1837 store
1838 .handle_linked_chunk_updates(
1839 linked_chunk_id,
1840 vec![
1841 Update::NewGapChunk {
1842 previous: None,
1843 new: ChunkIdentifier::new(42),
1844 next: None,
1845 gap: Gap { prev_token: "raclette".to_owned() },
1846 },
1847 Update::NewGapChunk {
1848 previous: Some(ChunkIdentifier::new(42)),
1849 new: ChunkIdentifier::new(43),
1850 next: None,
1851 gap: Gap { prev_token: "fondue".to_owned() },
1852 },
1853 Update::NewGapChunk {
1854 previous: Some(ChunkIdentifier::new(43)),
1855 new: ChunkIdentifier::new(44),
1856 next: None,
1857 gap: Gap { prev_token: "tartiflette".to_owned() },
1858 },
1859 Update::RemoveChunk(ChunkIdentifier::new(43)),
1860 ],
1861 )
1862 .await
1863 .unwrap();
1864
1865 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1866
1867 assert_eq!(chunks.len(), 2);
1868
1869 let c = chunks.remove(0);
1871 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1872 assert_eq!(c.previous, None);
1873 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1874 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1875 assert_eq!(gap.prev_token, "raclette");
1876 });
1877
1878 let c = chunks.remove(0);
1879 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1880 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1881 assert_eq!(c.next, None);
1882 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1883 assert_eq!(gap.prev_token, "tartiflette");
1884 });
1885
1886 let gaps = store
1888 .read()
1889 .await
1890 .unwrap()
1891 .with_transaction(|txn| -> rusqlite::Result<_> {
1892 let mut gaps = Vec::new();
1893 for data in txn
1894 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1895 .query_map((), |row| row.get::<_, u64>(0))?
1896 {
1897 gaps.push(data?);
1898 }
1899 Ok(gaps)
1900 })
1901 .await
1902 .unwrap();
1903
1904 assert_eq!(gaps, vec![42, 44]);
1905 }
1906
1907 #[async_test]
1908 async fn test_linked_chunk_push_items() {
1909 let store = get_event_cache_store().await.expect("creating cache store failed");
1910
1911 let room_id = &DEFAULT_TEST_ROOM_ID;
1912 let linked_chunk_id = LinkedChunkId::Room(room_id);
1913
1914 store
1915 .handle_linked_chunk_updates(
1916 linked_chunk_id,
1917 vec![
1918 Update::NewItemsChunk {
1919 previous: None,
1920 new: ChunkIdentifier::new(42),
1921 next: None,
1922 },
1923 Update::PushItems {
1924 at: Position::new(ChunkIdentifier::new(42), 0),
1925 items: vec![
1926 make_test_event(room_id, "hello"),
1927 make_test_event(room_id, "world"),
1928 ],
1929 },
1930 Update::PushItems {
1931 at: Position::new(ChunkIdentifier::new(42), 2),
1932 items: vec![make_test_event(room_id, "who?")],
1933 },
1934 ],
1935 )
1936 .await
1937 .unwrap();
1938
1939 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1940
1941 assert_eq!(chunks.len(), 1);
1942
1943 let c = chunks.remove(0);
1944 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1945 assert_eq!(c.previous, None);
1946 assert_eq!(c.next, None);
1947 assert_matches!(c.content, ChunkContent::Items(events) => {
1948 assert_eq!(events.len(), 3);
1949
1950 check_test_event(&events[0], "hello");
1951 check_test_event(&events[1], "world");
1952 check_test_event(&events[2], "who?");
1953 });
1954 }
1955
1956 #[async_test]
1957 async fn test_linked_chunk_remove_item() {
1958 let store = get_event_cache_store().await.expect("creating cache store failed");
1959
1960 let room_id = *DEFAULT_TEST_ROOM_ID;
1961 let linked_chunk_id = LinkedChunkId::Room(room_id);
1962
1963 store
1964 .handle_linked_chunk_updates(
1965 linked_chunk_id,
1966 vec![
1967 Update::NewItemsChunk {
1968 previous: None,
1969 new: ChunkIdentifier::new(42),
1970 next: None,
1971 },
1972 Update::PushItems {
1973 at: Position::new(ChunkIdentifier::new(42), 0),
1974 items: vec![
1975 make_test_event(room_id, "one"),
1976 make_test_event(room_id, "two"),
1977 make_test_event(room_id, "three"),
1978 make_test_event(room_id, "four"),
1979 make_test_event(room_id, "five"),
1980 make_test_event(room_id, "six"),
1981 ],
1982 },
1983 Update::RemoveItem {
1984 at: Position::new(ChunkIdentifier::new(42), 2), },
1986 ],
1987 )
1988 .await
1989 .unwrap();
1990
1991 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1992
1993 assert_eq!(chunks.len(), 1);
1994
1995 let c = chunks.remove(0);
1996 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1997 assert_eq!(c.previous, None);
1998 assert_eq!(c.next, None);
1999 assert_matches!(c.content, ChunkContent::Items(events) => {
2000 assert_eq!(events.len(), 5);
2001 check_test_event(&events[0], "one");
2002 check_test_event(&events[1], "two");
2003 check_test_event(&events[2], "four");
2004 check_test_event(&events[3], "five");
2005 check_test_event(&events[4], "six");
2006 });
2007
2008 let num_rows: u64 = store
2010 .read()
2011 .await
2012 .unwrap()
2013 .with_transaction(move |txn| {
2014 txn.query_row(
2015 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2016 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2017 |row| row.get(0),
2018 )
2019 })
2020 .await
2021 .unwrap();
2022 assert_eq!(num_rows, 3);
2023 }
2024
2025 #[async_test]
2026 async fn test_linked_chunk_detach_last_items() {
2027 let store = get_event_cache_store().await.expect("creating cache store failed");
2028
2029 let room_id = *DEFAULT_TEST_ROOM_ID;
2030 let linked_chunk_id = LinkedChunkId::Room(room_id);
2031
2032 store
2033 .handle_linked_chunk_updates(
2034 linked_chunk_id,
2035 vec![
2036 Update::NewItemsChunk {
2037 previous: None,
2038 new: ChunkIdentifier::new(42),
2039 next: None,
2040 },
2041 Update::PushItems {
2042 at: Position::new(ChunkIdentifier::new(42), 0),
2043 items: vec![
2044 make_test_event(room_id, "hello"),
2045 make_test_event(room_id, "world"),
2046 make_test_event(room_id, "howdy"),
2047 ],
2048 },
2049 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2050 ],
2051 )
2052 .await
2053 .unwrap();
2054
2055 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2056
2057 assert_eq!(chunks.len(), 1);
2058
2059 let c = chunks.remove(0);
2060 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2061 assert_eq!(c.previous, None);
2062 assert_eq!(c.next, None);
2063 assert_matches!(c.content, ChunkContent::Items(events) => {
2064 assert_eq!(events.len(), 1);
2065 check_test_event(&events[0], "hello");
2066 });
2067 }
2068
2069 #[async_test]
2070 async fn test_linked_chunk_start_end_reattach_items() {
2071 let store = get_event_cache_store().await.expect("creating cache store failed");
2072
2073 let room_id = *DEFAULT_TEST_ROOM_ID;
2074 let linked_chunk_id = LinkedChunkId::Room(room_id);
2075
2076 store
2080 .handle_linked_chunk_updates(
2081 linked_chunk_id,
2082 vec![
2083 Update::NewItemsChunk {
2084 previous: None,
2085 new: ChunkIdentifier::new(42),
2086 next: None,
2087 },
2088 Update::PushItems {
2089 at: Position::new(ChunkIdentifier::new(42), 0),
2090 items: vec![
2091 make_test_event(room_id, "hello"),
2092 make_test_event(room_id, "world"),
2093 make_test_event(room_id, "howdy"),
2094 ],
2095 },
2096 Update::StartReattachItems,
2097 Update::EndReattachItems,
2098 ],
2099 )
2100 .await
2101 .unwrap();
2102
2103 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2104
2105 assert_eq!(chunks.len(), 1);
2106
2107 let c = chunks.remove(0);
2108 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2109 assert_eq!(c.previous, None);
2110 assert_eq!(c.next, None);
2111 assert_matches!(c.content, ChunkContent::Items(events) => {
2112 assert_eq!(events.len(), 3);
2113 check_test_event(&events[0], "hello");
2114 check_test_event(&events[1], "world");
2115 check_test_event(&events[2], "howdy");
2116 });
2117 }
2118
2119 #[async_test]
2120 async fn test_linked_chunk_clear() {
2121 let store = get_event_cache_store().await.expect("creating cache store failed");
2122
2123 let room_id = *DEFAULT_TEST_ROOM_ID;
2124 let linked_chunk_id = LinkedChunkId::Room(room_id);
2125 let event_0 = make_test_event(room_id, "hello");
2126 let event_1 = make_test_event(room_id, "world");
2127 let event_2 = make_test_event(room_id, "howdy");
2128
2129 store
2130 .handle_linked_chunk_updates(
2131 linked_chunk_id,
2132 vec![
2133 Update::NewItemsChunk {
2134 previous: None,
2135 new: ChunkIdentifier::new(42),
2136 next: None,
2137 },
2138 Update::NewGapChunk {
2139 previous: Some(ChunkIdentifier::new(42)),
2140 new: ChunkIdentifier::new(54),
2141 next: None,
2142 gap: Gap { prev_token: "fondue".to_owned() },
2143 },
2144 Update::PushItems {
2145 at: Position::new(ChunkIdentifier::new(42), 0),
2146 items: vec![event_0.clone(), event_1, event_2],
2147 },
2148 Update::Clear,
2149 ],
2150 )
2151 .await
2152 .unwrap();
2153
2154 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2155 assert!(chunks.is_empty());
2156
2157 store
2159 .read()
2160 .await
2161 .unwrap()
2162 .with_transaction(|txn| -> rusqlite::Result<_> {
2163 let num_gaps = txn
2164 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2165 .query_row((), |row| row.get::<_, u64>(0))?;
2166 assert_eq!(num_gaps, 0);
2167
2168 let num_events = txn
2169 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2170 .query_row((), |row| row.get::<_, u64>(0))?;
2171 assert_eq!(num_events, 0);
2172
2173 Ok(())
2174 })
2175 .await
2176 .unwrap();
2177
2178 store
2180 .handle_linked_chunk_updates(
2181 linked_chunk_id,
2182 vec![
2183 Update::NewItemsChunk {
2184 previous: None,
2185 new: ChunkIdentifier::new(42),
2186 next: None,
2187 },
2188 Update::PushItems {
2189 at: Position::new(ChunkIdentifier::new(42), 0),
2190 items: vec![event_0],
2191 },
2192 ],
2193 )
2194 .await
2195 .unwrap();
2196 }
2197
2198 #[async_test]
2199 async fn test_linked_chunk_multiple_rooms() {
2200 let store = get_event_cache_store().await.expect("creating cache store failed");
2201
2202 let room1 = room_id!("!realcheeselovers:raclette.fr");
2203 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2204 let room2 = room_id!("!realcheeselovers:fondue.ch");
2205 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2206
2207 store
2211 .handle_linked_chunk_updates(
2212 linked_chunk_id1,
2213 vec![
2214 Update::NewItemsChunk {
2215 previous: None,
2216 new: ChunkIdentifier::new(42),
2217 next: None,
2218 },
2219 Update::PushItems {
2220 at: Position::new(ChunkIdentifier::new(42), 0),
2221 items: vec![
2222 make_test_event(room1, "best cheese is raclette"),
2223 make_test_event(room1, "obviously"),
2224 ],
2225 },
2226 ],
2227 )
2228 .await
2229 .unwrap();
2230
2231 store
2232 .handle_linked_chunk_updates(
2233 linked_chunk_id2,
2234 vec![
2235 Update::NewItemsChunk {
2236 previous: None,
2237 new: ChunkIdentifier::new(42),
2238 next: None,
2239 },
2240 Update::PushItems {
2241 at: Position::new(ChunkIdentifier::new(42), 0),
2242 items: vec![make_test_event(room1, "beaufort is the best")],
2243 },
2244 ],
2245 )
2246 .await
2247 .unwrap();
2248
2249 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2251 assert_eq!(chunks_room1.len(), 1);
2252
2253 let c = chunks_room1.remove(0);
2254 assert_matches!(c.content, ChunkContent::Items(events) => {
2255 assert_eq!(events.len(), 2);
2256 check_test_event(&events[0], "best cheese is raclette");
2257 check_test_event(&events[1], "obviously");
2258 });
2259
2260 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2262 assert_eq!(chunks_room2.len(), 1);
2263
2264 let c = chunks_room2.remove(0);
2265 assert_matches!(c.content, ChunkContent::Items(events) => {
2266 assert_eq!(events.len(), 1);
2267 check_test_event(&events[0], "beaufort is the best");
2268 });
2269 }
2270
2271 #[async_test]
2272 async fn test_linked_chunk_update_is_a_transaction() {
2273 let store = get_event_cache_store().await.expect("creating cache store failed");
2274
2275 let room_id = *DEFAULT_TEST_ROOM_ID;
2276 let linked_chunk_id = LinkedChunkId::Room(room_id);
2277
2278 let err = store
2281 .handle_linked_chunk_updates(
2282 linked_chunk_id,
2283 vec![
2284 Update::NewItemsChunk {
2285 previous: None,
2286 new: ChunkIdentifier::new(42),
2287 next: None,
2288 },
2289 Update::NewItemsChunk {
2290 previous: None,
2291 new: ChunkIdentifier::new(42),
2292 next: None,
2293 },
2294 ],
2295 )
2296 .await
2297 .unwrap_err();
2298
2299 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2301 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2302 });
2303
2304 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2308 assert!(chunks.is_empty());
2309 }
2310
2311 #[async_test]
2312 async fn test_filter_duplicate_events_no_events() {
2313 let store = get_event_cache_store().await.expect("creating cache store failed");
2314
2315 let room_id = *DEFAULT_TEST_ROOM_ID;
2316 let linked_chunk_id = LinkedChunkId::Room(room_id);
2317 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2318 assert!(duplicates.is_empty());
2319 }
2320
2321 #[async_test]
2322 async fn test_load_last_chunk() {
2323 let room_id = room_id!("!r0:matrix.org");
2324 let linked_chunk_id = LinkedChunkId::Room(room_id);
2325 let event = |msg: &str| make_test_event(room_id, msg);
2326 let store = get_event_cache_store().await.expect("creating cache store failed");
2327
2328 {
2330 let (last_chunk, chunk_identifier_generator) =
2331 store.load_last_chunk(linked_chunk_id).await.unwrap();
2332
2333 assert!(last_chunk.is_none());
2334 assert_eq!(chunk_identifier_generator.current(), 0);
2335 }
2336
2337 {
2339 store
2340 .handle_linked_chunk_updates(
2341 linked_chunk_id,
2342 vec![
2343 Update::NewItemsChunk {
2344 previous: None,
2345 new: ChunkIdentifier::new(42),
2346 next: None,
2347 },
2348 Update::PushItems {
2349 at: Position::new(ChunkIdentifier::new(42), 0),
2350 items: vec![event("saucisse de morteau"), event("comté")],
2351 },
2352 ],
2353 )
2354 .await
2355 .unwrap();
2356
2357 let (last_chunk, chunk_identifier_generator) =
2358 store.load_last_chunk(linked_chunk_id).await.unwrap();
2359
2360 assert_matches!(last_chunk, Some(last_chunk) => {
2361 assert_eq!(last_chunk.identifier, 42);
2362 assert!(last_chunk.previous.is_none());
2363 assert!(last_chunk.next.is_none());
2364 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2365 assert_eq!(items.len(), 2);
2366 check_test_event(&items[0], "saucisse de morteau");
2367 check_test_event(&items[1], "comté");
2368 });
2369 });
2370 assert_eq!(chunk_identifier_generator.current(), 42);
2371 }
2372
2373 {
2375 store
2376 .handle_linked_chunk_updates(
2377 linked_chunk_id,
2378 vec![
2379 Update::NewItemsChunk {
2380 previous: Some(ChunkIdentifier::new(42)),
2381 new: ChunkIdentifier::new(7),
2382 next: None,
2383 },
2384 Update::PushItems {
2385 at: Position::new(ChunkIdentifier::new(7), 0),
2386 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2387 },
2388 ],
2389 )
2390 .await
2391 .unwrap();
2392
2393 let (last_chunk, chunk_identifier_generator) =
2394 store.load_last_chunk(linked_chunk_id).await.unwrap();
2395
2396 assert_matches!(last_chunk, Some(last_chunk) => {
2397 assert_eq!(last_chunk.identifier, 7);
2398 assert_matches!(last_chunk.previous, Some(previous) => {
2399 assert_eq!(previous, 42);
2400 });
2401 assert!(last_chunk.next.is_none());
2402 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2403 assert_eq!(items.len(), 3);
2404 check_test_event(&items[0], "fondue");
2405 check_test_event(&items[1], "gruyère");
2406 check_test_event(&items[2], "mont d'or");
2407 });
2408 });
2409 assert_eq!(chunk_identifier_generator.current(), 42);
2410 }
2411 }
2412
2413 #[async_test]
2414 async fn test_load_last_chunk_with_a_cycle() {
2415 let room_id = room_id!("!r0:matrix.org");
2416 let linked_chunk_id = LinkedChunkId::Room(room_id);
2417 let store = get_event_cache_store().await.expect("creating cache store failed");
2418
2419 store
2420 .handle_linked_chunk_updates(
2421 linked_chunk_id,
2422 vec![
2423 Update::NewItemsChunk {
2424 previous: None,
2425 new: ChunkIdentifier::new(0),
2426 next: None,
2427 },
2428 Update::NewItemsChunk {
2429 previous: Some(ChunkIdentifier::new(0)),
2433 new: ChunkIdentifier::new(1),
2434 next: Some(ChunkIdentifier::new(0)),
2435 },
2436 ],
2437 )
2438 .await
2439 .unwrap();
2440
2441 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2442 }
2443
2444 #[async_test]
2445 async fn test_load_previous_chunk() {
2446 let room_id = room_id!("!r0:matrix.org");
2447 let linked_chunk_id = LinkedChunkId::Room(room_id);
2448 let event = |msg: &str| make_test_event(room_id, msg);
2449 let store = get_event_cache_store().await.expect("creating cache store failed");
2450
2451 {
2454 let previous_chunk = store
2455 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2456 .await
2457 .unwrap();
2458
2459 assert!(previous_chunk.is_none());
2460 }
2461
2462 {
2465 store
2466 .handle_linked_chunk_updates(
2467 linked_chunk_id,
2468 vec![Update::NewItemsChunk {
2469 previous: None,
2470 new: ChunkIdentifier::new(42),
2471 next: None,
2472 }],
2473 )
2474 .await
2475 .unwrap();
2476
2477 let previous_chunk =
2478 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2479
2480 assert!(previous_chunk.is_none());
2481 }
2482
2483 {
2485 store
2486 .handle_linked_chunk_updates(
2487 linked_chunk_id,
2488 vec![
2489 Update::NewItemsChunk {
2491 previous: None,
2492 new: ChunkIdentifier::new(7),
2493 next: Some(ChunkIdentifier::new(42)),
2494 },
2495 Update::PushItems {
2496 at: Position::new(ChunkIdentifier::new(7), 0),
2497 items: vec![event("brigand du jorat"), event("morbier")],
2498 },
2499 ],
2500 )
2501 .await
2502 .unwrap();
2503
2504 let previous_chunk =
2505 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2506
2507 assert_matches!(previous_chunk, Some(previous_chunk) => {
2508 assert_eq!(previous_chunk.identifier, 7);
2509 assert!(previous_chunk.previous.is_none());
2510 assert_matches!(previous_chunk.next, Some(next) => {
2511 assert_eq!(next, 42);
2512 });
2513 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2514 assert_eq!(items.len(), 2);
2515 check_test_event(&items[0], "brigand du jorat");
2516 check_test_event(&items[1], "morbier");
2517 });
2518 });
2519 }
2520 }
2521}
2522
2523#[cfg(test)]
2524mod encrypted_tests {
2525 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2526
2527 use matrix_sdk_base::{
2528 event_cache::store::{EventCacheStore, EventCacheStoreError},
2529 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2530 };
2531 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2532 use once_cell::sync::Lazy;
2533 use ruma::{
2534 event_id,
2535 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2536 room_id, user_id,
2537 };
2538 use tempfile::{TempDir, tempdir};
2539
2540 use super::SqliteEventCacheStore;
2541
2542 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2543 static NUM: AtomicU32 = AtomicU32::new(0);
2544
2545 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2546 let name = NUM.fetch_add(1, SeqCst).to_string();
2547 let tmpdir_path = TMP_DIR.path().join(name);
2548
2549 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2550
2551 Ok(SqliteEventCacheStore::open(
2552 tmpdir_path.to_str().unwrap(),
2553 Some("default_test_password"),
2554 )
2555 .await
2556 .unwrap())
2557 }
2558
2559 event_cache_store_integration_tests!();
2560 event_cache_store_integration_tests_time!();
2561
2562 #[async_test]
2563 async fn test_no_sqlite_injection_in_find_event_relations() {
2564 let room_id = room_id!("!test:localhost");
2565 let another_room_id = room_id!("!r1:matrix.org");
2566 let sender = user_id!("@alice:localhost");
2567
2568 let store = get_event_cache_store()
2569 .await
2570 .expect("We should be able to create a new, empty, event cache store");
2571
2572 let f = EventFactory::new().room(room_id).sender(sender);
2573
2574 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2576 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2577
2578 let edit_id = event_id!("$find_me:matrix.org");
2580 let edit = f
2581 .text_msg("Find me")
2582 .event_id(edit_id)
2583 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2584 .into_event();
2585
2586 let f = f.room(another_room_id);
2588
2589 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2590 let another_event =
2591 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2592
2593 store.save_event(room_id, event).await.unwrap();
2595 store.save_event(room_id, edit).await.unwrap();
2596 store.save_event(another_room_id, another_event).await.unwrap();
2597
2598 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2602
2603 let results = store
2605 .find_event_relations(room_id, event_id, filter.as_deref())
2606 .await
2607 .expect("We should be able to attempt to find event relations");
2608
2609 similar_asserts::assert_eq!(
2611 results.len(),
2612 1,
2613 "We should only have loaded events for the first room {results:#?}"
2614 );
2615
2616 let (found_event, _) = &results[0];
2618 assert_eq!(
2619 found_event.event_id().as_deref(),
2620 Some(edit_id),
2621 "The single event we found should be the edit event"
2622 );
2623 }
2624}