1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use matrix_sdk_base::{
21 cross_process_lock::CrossProcessLockGeneration,
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{extract_event_relation, EventCacheStore},
25 Event, Gap,
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{
38 params, params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior,
39};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47 connection::{Connection as SqliteAsyncConn, Pool as SqlitePool},
48 error::{Error, Result},
49 utils::{
50 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
51 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
52 },
53 OpenStoreError, Secret, SqliteStoreConfig,
54};
55
56mod keys {
57 pub const LINKED_CHUNKS: &str = "linked_chunks";
59 pub const EVENTS: &str = "events";
60}
61
62const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
64
65const DATABASE_VERSION: u8 = 13;
71
72const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82 store_cipher: Option<Arc<StoreCipher>>,
83
84 pool: SqlitePool,
86
87 write_connection: Arc<Mutex<SqliteAsyncConn>>,
92}
93
94#[cfg(not(tarpaulin_include))]
95impl fmt::Debug for SqliteEventCacheStore {
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
97 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
98 }
99}
100
101impl EncryptableStore for SqliteEventCacheStore {
102 fn get_cypher(&self) -> Option<&StoreCipher> {
103 self.store_cipher.as_deref()
104 }
105}
106
107impl SqliteEventCacheStore {
108 pub async fn open(
111 path: impl AsRef<Path>,
112 passphrase: Option<&str>,
113 ) -> Result<Self, OpenStoreError> {
114 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
115 }
116
117 pub async fn open_with_key(
120 path: impl AsRef<Path>,
121 key: Option<&[u8; 32]>,
122 ) -> Result<Self, OpenStoreError> {
123 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
124 }
125
126 #[instrument(skip(config), fields(path = ?config.path))]
128 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
129 debug!(?config);
130
131 let _timer = timer!("open_with_config");
132
133 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
134
135 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
136
137 let this = Self::open_with_pool(pool, config.secret).await?;
138 this.write().await?.apply_runtime_config(config.runtime_config).await?;
139
140 Ok(this)
141 }
142
143 async fn open_with_pool(
146 pool: SqlitePool,
147 secret: Option<Secret>,
148 ) -> Result<Self, OpenStoreError> {
149 let conn = pool.get().await?;
150
151 let version = conn.db_version().await?;
152 run_migrations(&conn, version).await?;
153
154 let store_cipher = match secret {
155 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
156 None => None,
157 };
158
159 Ok(Self {
160 store_cipher,
161 pool,
162 write_connection: Arc::new(Mutex::new(conn)),
164 })
165 }
166
167 #[instrument(skip_all)]
169 async fn read(&self) -> Result<SqliteAsyncConn> {
170 trace!("Taking a `read` connection");
171 let _timer = timer!("connection");
172
173 let connection = self.pool.get().await?;
174
175 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
180
181 Ok(connection)
182 }
183
184 #[instrument(skip_all)]
186 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
187 trace!("Taking a `write` connection");
188 let _timer = timer!("connection");
189
190 let connection = self.write_connection.clone().lock_owned().await;
191
192 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
197
198 Ok(connection)
199 }
200
201 fn map_row_to_chunk(
202 row: &rusqlite::Row<'_>,
203 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
204 Ok((
205 row.get::<_, u64>(0)?,
206 row.get::<_, Option<u64>>(1)?,
207 row.get::<_, Option<u64>>(2)?,
208 row.get::<_, String>(3)?,
209 ))
210 }
211
212 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
213 let serialized = serde_json::to_vec(event)?;
214
215 let raw_event = event.raw();
217 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
218
219 let content = self.encode_value(serialized)?;
221
222 Ok(EncodedEvent {
223 content,
224 rel_type,
225 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
226 })
227 }
228}
229
230struct EncodedEvent {
231 content: Vec<u8>,
232 rel_type: Option<String>,
233 relates_to: Option<String>,
234}
235
236trait TransactionExtForLinkedChunks {
237 fn rebuild_chunk(
238 &self,
239 store: &SqliteEventCacheStore,
240 linked_chunk_id: &Key,
241 previous: Option<u64>,
242 index: u64,
243 next: Option<u64>,
244 chunk_type: &str,
245 ) -> Result<RawChunk<Event, Gap>>;
246
247 fn load_gap_content(
248 &self,
249 store: &SqliteEventCacheStore,
250 linked_chunk_id: &Key,
251 chunk_id: ChunkIdentifier,
252 ) -> Result<Gap>;
253
254 fn load_events_content(
255 &self,
256 store: &SqliteEventCacheStore,
257 linked_chunk_id: &Key,
258 chunk_id: ChunkIdentifier,
259 ) -> Result<Vec<Event>>;
260}
261
262impl TransactionExtForLinkedChunks for Transaction<'_> {
263 fn rebuild_chunk(
264 &self,
265 store: &SqliteEventCacheStore,
266 linked_chunk_id: &Key,
267 previous: Option<u64>,
268 id: u64,
269 next: Option<u64>,
270 chunk_type: &str,
271 ) -> Result<RawChunk<Event, Gap>> {
272 let previous = previous.map(ChunkIdentifier::new);
273 let next = next.map(ChunkIdentifier::new);
274 let id = ChunkIdentifier::new(id);
275
276 match chunk_type {
277 CHUNK_TYPE_GAP_TYPE_STRING => {
278 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
280 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
281 }
282
283 CHUNK_TYPE_EVENT_TYPE_STRING => {
284 let events = self.load_events_content(store, linked_chunk_id, id)?;
286 Ok(RawChunk {
287 content: ChunkContent::Items(events),
288 previous,
289 identifier: id,
290 next,
291 })
292 }
293
294 other => {
295 Err(Error::InvalidData {
297 details: format!("a linked chunk has an unknown type {other}"),
298 })
299 }
300 }
301 }
302
303 fn load_gap_content(
304 &self,
305 store: &SqliteEventCacheStore,
306 linked_chunk_id: &Key,
307 chunk_id: ChunkIdentifier,
308 ) -> Result<Gap> {
309 let encoded_prev_token: Vec<u8> = self.query_row(
312 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
313 (chunk_id.index(), &linked_chunk_id),
314 |row| row.get(0),
315 )?;
316 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
317 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
318 Ok(Gap { prev_token })
319 }
320
321 fn load_events_content(
322 &self,
323 store: &SqliteEventCacheStore,
324 linked_chunk_id: &Key,
325 chunk_id: ChunkIdentifier,
326 ) -> Result<Vec<Event>> {
327 let mut events = Vec::new();
329
330 for event_data in self
331 .prepare(
332 r#"
333 SELECT events.content
334 FROM event_chunks ec, events
335 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
336 ORDER BY ec.position ASC
337 "#,
338 )?
339 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
340 {
341 let encoded_content = event_data?;
342 let serialized_content = store.decode_value(&encoded_content)?;
343 let event = serde_json::from_slice(&serialized_content)?;
344
345 events.push(event);
346 }
347
348 Ok(events)
349 }
350}
351
352async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
354 if version == 0 {
355 debug!("Creating database");
356 } else if version < DATABASE_VERSION {
357 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
358 } else {
359 return Ok(());
360 }
361
362 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
364
365 if version < 1 {
366 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
369 conn.with_transaction(|txn| {
370 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
371 txn.set_db_version(1)
372 })
373 .await?;
374 }
375
376 if version < 2 {
377 conn.with_transaction(|txn| {
378 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
379 txn.set_db_version(2)
380 })
381 .await?;
382 }
383
384 if version < 3 {
385 conn.with_transaction(|txn| {
386 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
387 txn.set_db_version(3)
388 })
389 .await?;
390 }
391
392 if version < 4 {
393 conn.with_transaction(|txn| {
394 txn.execute_batch(include_str!(
395 "../migrations/event_cache_store/004_ignore_policy.sql"
396 ))?;
397 txn.set_db_version(4)
398 })
399 .await?;
400 }
401
402 if version < 5 {
403 conn.with_transaction(|txn| {
404 txn.execute_batch(include_str!(
405 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
406 ))?;
407 txn.set_db_version(5)
408 })
409 .await?;
410 }
411
412 if version < 6 {
413 conn.with_transaction(|txn| {
414 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
415 txn.set_db_version(6)
416 })
417 .await?;
418 }
419
420 if version < 7 {
421 conn.with_transaction(|txn| {
422 txn.execute_batch(include_str!(
423 "../migrations/event_cache_store/007_event_chunks.sql"
424 ))?;
425 txn.set_db_version(7)
426 })
427 .await?;
428 }
429
430 if version < 8 {
431 conn.with_transaction(|txn| {
432 txn.execute_batch(include_str!(
433 "../migrations/event_cache_store/008_linked_chunk_id.sql"
434 ))?;
435 txn.set_db_version(8)
436 })
437 .await?;
438 }
439
440 if version < 9 {
441 conn.with_transaction(|txn| {
442 txn.execute_batch(include_str!(
443 "../migrations/event_cache_store/009_related_event_index.sql"
444 ))?;
445 txn.set_db_version(9)
446 })
447 .await?;
448 }
449
450 if version < 10 {
451 conn.with_transaction(|txn| {
452 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
453 txn.set_db_version(10)
454 })
455 .await?;
456
457 if version >= 1 {
458 conn.vacuum().await?;
461 }
462 }
463
464 if version < 11 {
465 conn.with_transaction(|txn| {
466 txn.execute_batch(include_str!(
467 "../migrations/event_cache_store/011_empty_event_cache.sql"
468 ))?;
469 txn.set_db_version(11)
470 })
471 .await?;
472 }
473
474 if version < 12 {
475 conn.with_transaction(|txn| {
476 txn.execute_batch(include_str!(
477 "../migrations/event_cache_store/012_store_event_type.sql"
478 ))?;
479 txn.set_db_version(12)
480 })
481 .await?;
482 }
483
484 if version < 13 {
485 conn.with_transaction(|txn| {
486 txn.execute_batch(include_str!(
487 "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
488 ))?;
489 txn.set_db_version(13)
490 })
491 .await?;
492 }
493
494 Ok(())
495}
496
497#[async_trait]
498impl EventCacheStore for SqliteEventCacheStore {
499 type Error = Error;
500
501 #[instrument(skip(self))]
502 async fn try_take_leased_lock(
503 &self,
504 lease_duration_ms: u32,
505 key: &str,
506 holder: &str,
507 ) -> Result<Option<CrossProcessLockGeneration>> {
508 let _timer = timer!("method");
509
510 let key = key.to_owned();
511 let holder = holder.to_owned();
512
513 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
514 let expiration = now + lease_duration_ms as u64;
515
516 let generation = self
518 .write()
519 .await?
520 .with_transaction(move |txn| {
521 txn.query_row(
522 "INSERT INTO lease_locks (key, holder, expiration)
523 VALUES (?1, ?2, ?3)
524 ON CONFLICT (key)
525 DO
526 UPDATE SET
527 holder = excluded.holder,
528 expiration = excluded.expiration,
529 generation =
530 CASE holder
531 WHEN excluded.holder THEN generation
532 ELSE generation + 1
533 END
534 WHERE
535 holder = excluded.holder
536 OR expiration < ?4
537 RETURNING generation
538 ",
539 (key, holder, expiration, now),
540 |row| row.get(0),
541 )
542 .optional()
543 })
544 .await?;
545
546 Ok(generation)
547 }
548
549 #[instrument(skip(self, updates))]
550 async fn handle_linked_chunk_updates(
551 &self,
552 linked_chunk_id: LinkedChunkId<'_>,
553 updates: Vec<Update<Event, Gap>>,
554 ) -> Result<(), Self::Error> {
555 let _timer = timer!("method");
556
557 let hashed_linked_chunk_id =
560 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
561 let linked_chunk_id = linked_chunk_id.to_owned();
562 let this = self.clone();
563
564 with_immediate_transaction(self, move |txn| {
565 for up in updates {
566 match up {
567 Update::NewItemsChunk { previous, new, next } => {
568 let previous = previous.as_ref().map(ChunkIdentifier::index);
569 let new = new.index();
570 let next = next.as_ref().map(ChunkIdentifier::index);
571
572 trace!(
573 %linked_chunk_id,
574 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
575 );
576
577 insert_chunk(
578 txn,
579 &hashed_linked_chunk_id,
580 previous,
581 new,
582 next,
583 CHUNK_TYPE_EVENT_TYPE_STRING,
584 )?;
585 }
586
587 Update::NewGapChunk { previous, new, next, gap } => {
588 let serialized = serde_json::to_vec(&gap.prev_token)?;
589 let prev_token = this.encode_value(serialized)?;
590
591 let previous = previous.as_ref().map(ChunkIdentifier::index);
592 let new = new.index();
593 let next = next.as_ref().map(ChunkIdentifier::index);
594
595 trace!(
596 %linked_chunk_id,
597 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
598 );
599
600 insert_chunk(
602 txn,
603 &hashed_linked_chunk_id,
604 previous,
605 new,
606 next,
607 CHUNK_TYPE_GAP_TYPE_STRING,
608 )?;
609
610 txn.execute(
612 r#"
613 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
614 VALUES (?, ?, ?)
615 "#,
616 (new, &hashed_linked_chunk_id, prev_token),
617 )?;
618 }
619
620 Update::RemoveChunk(chunk_identifier) => {
621 let chunk_id = chunk_identifier.index();
622
623 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
624
625 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
627 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
628 (chunk_id, &hashed_linked_chunk_id),
629 |row| Ok((row.get(0)?, row.get(1)?))
630 )?;
631
632 if let Some(previous) = previous {
634 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
635 }
636
637 if let Some(next) = next {
639 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
640 }
641
642 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
645 }
646
647 Update::PushItems { at, items } => {
648 if items.is_empty() {
649 continue;
651 }
652
653 let chunk_id = at.chunk_identifier().index();
654
655 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
656
657 let mut chunk_statement = txn.prepare(
658 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
659 )?;
660
661 let mut content_statement = txn.prepare(
666 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
667 )?;
668
669 let invalid_event = |event: TimelineEvent| {
670 let Some(event_id) = event.event_id() else {
671 error!(%linked_chunk_id, "Trying to push an event with no ID");
672 return None;
673 };
674
675 let Some(event_type) = event.kind.event_type() else {
676 error!(%event_id, "Trying to save an event with no event type");
677 return None;
678 };
679
680 Some((event_id.to_string(), event_type, event))
681 };
682
683 let room_id = linked_chunk_id.room_id();
684 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
685
686 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
687 let index = at.index() + i;
689 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
690
691 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
692 let event_type = this.encode_key(keys::EVENTS, event_type);
693
694 let encoded_event = this.encode_event(&event)?;
696 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
697 }
698 }
699
700 Update::ReplaceItem { at, item: event } => {
701 let chunk_id = at.chunk_identifier().index();
702
703 let index = at.index();
704
705 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
706
707 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
709 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
710 continue;
711 };
712
713 let Some(event_type) = event.kind.event_type() else {
714 error!(%event_id, "Trying to save an event with no event type");
715 continue;
716 };
717
718 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
719 let event_type = this.encode_key(keys::EVENTS, event_type);
720
721 let encoded_event = this.encode_event(&event)?;
725 let room_id = linked_chunk_id.room_id();
726 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
727 txn.execute(
728 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
729 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
730
731 txn.execute(
733 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
734 (event_id, &hashed_linked_chunk_id, chunk_id, index)
735 )?;
736 }
737
738 Update::RemoveItem { at } => {
739 let chunk_id = at.chunk_identifier().index();
740 let index = at.index();
741
742 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
743
744 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
746
747 txn.execute(
846 r#"
847 UPDATE event_chunks
848 SET position = -(position - 1)
849 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
850 "#,
851 (&hashed_linked_chunk_id, chunk_id, index)
852 )?;
853 txn.execute(
854 r#"
855 UPDATE event_chunks
856 SET position = -position
857 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
858 "#,
859 (&hashed_linked_chunk_id, chunk_id)
860 )?;
861 }
862
863 Update::DetachLastItems { at } => {
864 let chunk_id = at.chunk_identifier().index();
865 let index = at.index();
866
867 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
868
869 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
871 }
872
873 Update::Clear => {
874 trace!(%linked_chunk_id, "clearing items");
875
876 txn.execute(
878 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
879 (&hashed_linked_chunk_id,),
880 )?;
881 }
882
883 Update::StartReattachItems | Update::EndReattachItems => {
884 }
886 }
887 }
888
889 Ok(())
890 })
891 .await?;
892
893 Ok(())
894 }
895
896 #[instrument(skip(self))]
897 async fn load_all_chunks(
898 &self,
899 linked_chunk_id: LinkedChunkId<'_>,
900 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
901 let _timer = timer!("method");
902
903 let hashed_linked_chunk_id =
904 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
905
906 let this = self.clone();
907
908 let result = self
909 .read()
910 .await?
911 .with_transaction(move |txn| -> Result<_> {
912 let mut items = Vec::new();
913
914 for data in txn
916 .prepare(
917 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
918 )?
919 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
920 {
921 let (id, previous, next, chunk_type) = data?;
922 let new = txn.rebuild_chunk(
923 &this,
924 &hashed_linked_chunk_id,
925 previous,
926 id,
927 next,
928 chunk_type.as_str(),
929 )?;
930 items.push(new);
931 }
932
933 Ok(items)
934 })
935 .await?;
936
937 Ok(result)
938 }
939
940 #[instrument(skip(self))]
941 async fn load_all_chunks_metadata(
942 &self,
943 linked_chunk_id: LinkedChunkId<'_>,
944 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
945 let _timer = timer!("method");
946
947 let hashed_linked_chunk_id =
948 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
949
950 self.read()
951 .await?
952 .with_transaction(move |txn| -> Result<_> {
953 let num_events_by_chunk_ids = txn
980 .prepare(
981 r#"
982 SELECT ec.chunk_id, COUNT(ec.event_id)
983 FROM event_chunks as ec
984 WHERE ec.linked_chunk_id = ?
985 GROUP BY ec.chunk_id
986 "#,
987 )?
988 .query_map((&hashed_linked_chunk_id,), |row| {
989 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
990 })?
991 .collect::<Result<HashMap<_, _>, _>>()?;
992
993 txn.prepare(
994 r#"
995 SELECT
996 lc.id,
997 lc.previous,
998 lc.next,
999 lc.type
1000 FROM linked_chunks as lc
1001 WHERE lc.linked_chunk_id = ?
1002 ORDER BY lc.id"#,
1003 )?
1004 .query_map((&hashed_linked_chunk_id,), |row| {
1005 Ok((
1006 row.get::<_, u64>(0)?,
1007 row.get::<_, Option<u64>>(1)?,
1008 row.get::<_, Option<u64>>(2)?,
1009 row.get::<_, String>(3)?,
1010 ))
1011 })?
1012 .map(|data| -> Result<_> {
1013 let (id, previous, next, chunk_type) = data?;
1014
1015 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1021 0
1022 } else {
1023 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1024 };
1025
1026 Ok(ChunkMetadata {
1027 identifier: ChunkIdentifier::new(id),
1028 previous: previous.map(ChunkIdentifier::new),
1029 next: next.map(ChunkIdentifier::new),
1030 num_items,
1031 })
1032 })
1033 .collect::<Result<Vec<_>, _>>()
1034 })
1035 .await
1036 }
1037
1038 #[instrument(skip(self))]
1039 async fn load_last_chunk(
1040 &self,
1041 linked_chunk_id: LinkedChunkId<'_>,
1042 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1043 let _timer = timer!("method");
1044
1045 let hashed_linked_chunk_id =
1046 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1047
1048 let this = self.clone();
1049
1050 self
1051 .read()
1052 .await?
1053 .with_transaction(move |txn| -> Result<_> {
1054 let (observed_max_identifier, number_of_chunks) = txn
1056 .prepare(
1057 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1058 )?
1059 .query_row(
1060 (&hashed_linked_chunk_id,),
1061 |row| {
1062 Ok((
1063 row.get::<_, Option<u64>>(0)?,
1068 row.get::<_, u64>(1)?,
1069 ))
1070 }
1071 )?;
1072
1073 let chunk_identifier_generator = match observed_max_identifier {
1074 Some(max_observed_identifier) => {
1075 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1076 ChunkIdentifier::new(max_observed_identifier)
1077 )
1078 },
1079 None => ChunkIdentifierGenerator::new_from_scratch(),
1080 };
1081
1082 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1084 .prepare(
1085 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1086 )?
1087 .query_row(
1088 (&hashed_linked_chunk_id,),
1089 |row| {
1090 Ok((
1091 row.get::<_, u64>(0)?,
1092 row.get::<_, Option<u64>>(1)?,
1093 row.get::<_, String>(2)?,
1094 ))
1095 }
1096 )
1097 .optional()?
1098 else {
1099 if number_of_chunks == 0 {
1102 return Ok((None, chunk_identifier_generator));
1103 }
1104 else {
1109 return Err(Error::InvalidData {
1110 details:
1111 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1112 .to_owned()
1113 }
1114 )
1115 }
1116 };
1117
1118 let last_chunk = txn.rebuild_chunk(
1120 &this,
1121 &hashed_linked_chunk_id,
1122 previous_chunk,
1123 chunk_identifier,
1124 None,
1125 &chunk_type
1126 )?;
1127
1128 Ok((Some(last_chunk), chunk_identifier_generator))
1129 })
1130 .await
1131 }
1132
1133 #[instrument(skip(self))]
1134 async fn load_previous_chunk(
1135 &self,
1136 linked_chunk_id: LinkedChunkId<'_>,
1137 before_chunk_identifier: ChunkIdentifier,
1138 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1139 let _timer = timer!("method");
1140
1141 let hashed_linked_chunk_id =
1142 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1143
1144 let this = self.clone();
1145
1146 self
1147 .read()
1148 .await?
1149 .with_transaction(move |txn| -> Result<_> {
1150 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1152 .prepare(
1153 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1154 )?
1155 .query_row(
1156 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1157 |row| {
1158 Ok((
1159 row.get::<_, u64>(0)?,
1160 row.get::<_, Option<u64>>(1)?,
1161 row.get::<_, Option<u64>>(2)?,
1162 row.get::<_, String>(3)?,
1163 ))
1164 }
1165 )
1166 .optional()?
1167 else {
1168 return Ok(None);
1170 };
1171
1172 let last_chunk = txn.rebuild_chunk(
1174 &this,
1175 &hashed_linked_chunk_id,
1176 previous_chunk,
1177 chunk_identifier,
1178 next_chunk,
1179 &chunk_type
1180 )?;
1181
1182 Ok(Some(last_chunk))
1183 })
1184 .await
1185 }
1186
1187 #[instrument(skip(self))]
1188 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1189 let _timer = timer!("method");
1190
1191 self.write()
1192 .await?
1193 .with_transaction(move |txn| {
1194 txn.execute("DELETE FROM linked_chunks", ())?;
1196 txn.execute("DELETE FROM events", ())
1198 })
1199 .await?;
1200
1201 Ok(())
1202 }
1203
1204 #[instrument(skip(self, events))]
1205 async fn filter_duplicated_events(
1206 &self,
1207 linked_chunk_id: LinkedChunkId<'_>,
1208 events: Vec<OwnedEventId>,
1209 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1210 let _timer = timer!("method");
1211
1212 if events.is_empty() {
1216 return Ok(Vec::new());
1217 }
1218
1219 let hashed_linked_chunk_id =
1221 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1222 let linked_chunk_id = linked_chunk_id.to_owned();
1223
1224 self.read()
1225 .await?
1226 .with_transaction(move |txn| -> Result<_> {
1227 txn.chunk_large_query_over(events, None, move |txn, events| {
1228 let query = format!(
1229 r#"
1230 SELECT event_id, chunk_id, position
1231 FROM event_chunks
1232 WHERE linked_chunk_id = ? AND event_id IN ({})
1233 ORDER BY chunk_id ASC, position ASC
1234 "#,
1235 repeat_vars(events.len()),
1236 );
1237
1238 let parameters = params_from_iter(
1239 once(
1241 hashed_linked_chunk_id
1242 .to_sql()
1243 .unwrap(),
1245 )
1246 .chain(events.iter().map(|event| {
1248 event
1249 .as_str()
1250 .to_sql()
1251 .unwrap()
1253 })),
1254 );
1255
1256 let mut duplicated_events = Vec::new();
1257
1258 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1259 Ok((
1260 row.get::<_, String>(0)?,
1261 row.get::<_, u64>(1)?,
1262 row.get::<_, usize>(2)?,
1263 ))
1264 })? {
1265 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1266
1267 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1268 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1271 continue;
1272 };
1273
1274 duplicated_events.push((
1275 duplicated_event,
1276 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1277 ));
1278 }
1279
1280 Ok(duplicated_events)
1281 })
1282 })
1283 .await
1284 }
1285
1286 #[instrument(skip(self, event_id))]
1287 async fn find_event(
1288 &self,
1289 room_id: &RoomId,
1290 event_id: &EventId,
1291 ) -> Result<Option<Event>, Self::Error> {
1292 let _timer = timer!("method");
1293
1294 let event_id = event_id.to_owned();
1295 let this = self.clone();
1296
1297 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1298
1299 self.read()
1300 .await?
1301 .with_transaction(move |txn| -> Result<_> {
1302 let Some(event) = txn
1303 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1304 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1305 .optional()?
1306 else {
1307 return Ok(None);
1309 };
1310
1311 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1312
1313 Ok(Some(event))
1314 })
1315 .await
1316 }
1317
1318 #[instrument(skip(self, event_id, filters))]
1319 async fn find_event_relations(
1320 &self,
1321 room_id: &RoomId,
1322 event_id: &EventId,
1323 filters: Option<&[RelationType]>,
1324 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1325 let _timer = timer!("method");
1326
1327 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1328
1329 let hashed_linked_chunk_id =
1330 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1331
1332 let event_id = event_id.to_owned();
1333 let filters = filters.map(ToOwned::to_owned);
1334 let store = self.clone();
1335
1336 self.read()
1337 .await?
1338 .with_transaction(move |txn| -> Result<_> {
1339 find_event_relations_transaction(
1340 store,
1341 hashed_room_id,
1342 hashed_linked_chunk_id,
1343 event_id,
1344 filters,
1345 txn,
1346 )
1347 })
1348 .await
1349 }
1350
1351 #[instrument(skip(self))]
1352 async fn get_room_events(
1353 &self,
1354 room_id: &RoomId,
1355 event_type: Option<&str>,
1356 session_id: Option<&str>,
1357 ) -> Result<Vec<Event>, Self::Error> {
1358 let _timer = timer!("method");
1359
1360 let this = self.clone();
1361
1362 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1363 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1364 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1365
1366 self.read()
1367 .await?
1368 .with_transaction(move |txn| -> Result<_> {
1369 #[allow(clippy::redundant_clone)]
1373 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1374 (None, None) => {
1375 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1376 }
1377 (None, Some(session_id)) => (
1378 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1379 params![hashed_room_id, session_id.to_owned()],
1380 ),
1381 (Some(event_type), None) => (
1382 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1383 params![hashed_room_id, event_type.to_owned()]
1384 ),
1385 (Some(event_type), Some(session_id)) => (
1386 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1387 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1388 ),
1389 };
1390
1391 let mut statement = txn.prepare(query)?;
1392 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1393
1394 let mut events = Vec::new();
1395 for ev in maybe_events {
1396 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1397 events.push(event);
1398 }
1399
1400 Ok(events)
1401 })
1402 .await
1403 }
1404
1405 #[instrument(skip(self, event))]
1406 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1407 let _timer = timer!("method");
1408
1409 let Some(event_id) = event.event_id() else {
1410 error!("Trying to save an event with no ID");
1411 return Ok(());
1412 };
1413
1414 let Some(event_type) = event.kind.event_type() else {
1415 error!(%event_id, "Trying to save an event with no event type");
1416 return Ok(());
1417 };
1418
1419 let event_type = self.encode_key(keys::EVENTS, event_type);
1420 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1421
1422 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1423 let event_id = event_id.to_string();
1424 let encoded_event = self.encode_event(&event)?;
1425
1426 self.write()
1427 .await?
1428 .with_transaction(move |txn| -> Result<_> {
1429 txn.execute(
1430 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1431 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1432
1433 Ok(())
1434 })
1435 .await
1436 }
1437}
1438
1439fn find_event_relations_transaction(
1440 store: SqliteEventCacheStore,
1441 hashed_room_id: Key,
1442 hashed_linked_chunk_id: Key,
1443 event_id: OwnedEventId,
1444 filters: Option<Vec<RelationType>>,
1445 txn: &Transaction<'_>,
1446) -> Result<Vec<(Event, Option<Position>)>> {
1447 let get_rows = |row: &rusqlite::Row<'_>| {
1448 Ok((
1449 row.get::<_, Vec<u8>>(0)?,
1450 row.get::<_, Option<u64>>(1)?,
1451 row.get::<_, Option<usize>>(2)?,
1452 ))
1453 };
1454
1455 let collect_results = |transaction| {
1457 let mut related = Vec::new();
1458
1459 for result in transaction {
1460 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1461
1462 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1463
1464 let pos = chunk_id
1467 .zip(index)
1468 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1469
1470 related.push((event, pos));
1471 }
1472
1473 Ok(related)
1474 };
1475
1476 let related = if let Some(filters) = filters {
1477 let question_marks = repeat_vars(filters.len());
1478 let query = format!(
1479 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1480 FROM events
1481 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1482 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1483 );
1484
1485 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1490 let filters_params: Vec<_> = filter_strings
1491 .iter()
1492 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1493 .collect();
1494
1495 let parameters = params_from_iter(
1496 [
1497 hashed_linked_chunk_id.to_sql().expect(
1498 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1499 ),
1500 event_id
1501 .as_str()
1502 .to_sql()
1503 .expect("We should be able to convert an event ID to a SQLite value"),
1504 hashed_room_id
1505 .to_sql()
1506 .expect("We should be able to convert a room ID to a SQLite value"),
1507 ]
1508 .into_iter()
1509 .chain(filters_params),
1510 );
1511
1512 let mut transaction = txn.prepare(&query)?;
1513 let transaction = transaction.query_map(parameters, get_rows)?;
1514
1515 collect_results(transaction)
1516 } else {
1517 let query =
1518 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1519 FROM events
1520 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1521 WHERE relates_to = ? AND room_id = ?";
1522 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1523
1524 let mut transaction = txn.prepare(query)?;
1525 let transaction = transaction.query_map(parameters, get_rows)?;
1526
1527 collect_results(transaction)
1528 };
1529
1530 related
1531}
1532
1533async fn with_immediate_transaction<
1538 T: Send + 'static,
1539 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1540>(
1541 this: &SqliteEventCacheStore,
1542 f: F,
1543) -> Result<T, Error> {
1544 this.write()
1545 .await?
1546 .interact(move |conn| -> Result<T, Error> {
1547 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1551
1552 let code = || -> Result<T, Error> {
1553 let txn = conn.transaction()?;
1554 let res = f(&txn)?;
1555 txn.commit()?;
1556 Ok(res)
1557 };
1558
1559 let res = code();
1560
1561 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1564
1565 res
1566 })
1567 .await
1568 .unwrap()
1570}
1571
1572fn insert_chunk(
1573 txn: &Transaction<'_>,
1574 linked_chunk_id: &Key,
1575 previous: Option<u64>,
1576 new: u64,
1577 next: Option<u64>,
1578 type_str: &str,
1579) -> rusqlite::Result<()> {
1580 txn.execute(
1582 r#"
1583 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1584 VALUES (?, ?, ?, ?, ?)
1585 "#,
1586 (new, linked_chunk_id, previous, next, type_str),
1587 )?;
1588
1589 if let Some(previous) = previous {
1591 txn.execute(
1592 r#"
1593 UPDATE linked_chunks
1594 SET next = ?
1595 WHERE id = ? AND linked_chunk_id = ?
1596 "#,
1597 (new, previous, linked_chunk_id),
1598 )?;
1599 }
1600
1601 if let Some(next) = next {
1603 txn.execute(
1604 r#"
1605 UPDATE linked_chunks
1606 SET previous = ?
1607 WHERE id = ? AND linked_chunk_id = ?
1608 "#,
1609 (new, next, linked_chunk_id),
1610 )?;
1611 }
1612
1613 Ok(())
1614}
1615
1616#[cfg(test)]
1617mod tests {
1618 use std::{
1619 path::PathBuf,
1620 sync::atomic::{AtomicU32, Ordering::SeqCst},
1621 };
1622
1623 use assert_matches::assert_matches;
1624 use matrix_sdk_base::{
1625 event_cache::{
1626 store::{
1627 integration_tests::{
1628 check_test_event, make_test_event, make_test_event_with_event_id,
1629 },
1630 EventCacheStore, EventCacheStoreError,
1631 },
1632 Gap,
1633 },
1634 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1635 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1636 };
1637 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1638 use once_cell::sync::Lazy;
1639 use ruma::{event_id, room_id};
1640 use tempfile::{tempdir, TempDir};
1641
1642 use super::SqliteEventCacheStore;
1643 use crate::{
1644 event_cache_store::keys,
1645 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1646 SqliteStoreConfig,
1647 };
1648
1649 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1650 static NUM: AtomicU32 = AtomicU32::new(0);
1651
1652 fn new_event_cache_store_workspace() -> PathBuf {
1653 let name = NUM.fetch_add(1, SeqCst).to_string();
1654 TMP_DIR.path().join(name)
1655 }
1656
1657 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1658 let tmpdir_path = new_event_cache_store_workspace();
1659
1660 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1661
1662 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1663 }
1664
1665 event_cache_store_integration_tests!();
1666 event_cache_store_integration_tests_time!();
1667
1668 #[async_test]
1669 async fn test_pool_size() {
1670 let tmpdir_path = new_event_cache_store_workspace();
1671 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1672
1673 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1674
1675 assert_eq!(store.pool.status().max_size, 42);
1676 }
1677
1678 #[async_test]
1679 async fn test_linked_chunk_new_items_chunk() {
1680 let store = get_event_cache_store().await.expect("creating cache store failed");
1681
1682 let room_id = &DEFAULT_TEST_ROOM_ID;
1683 let linked_chunk_id = LinkedChunkId::Room(room_id);
1684
1685 store
1686 .handle_linked_chunk_updates(
1687 linked_chunk_id,
1688 vec![
1689 Update::NewItemsChunk {
1690 previous: None,
1691 new: ChunkIdentifier::new(42),
1692 next: None, },
1694 Update::NewItemsChunk {
1695 previous: Some(ChunkIdentifier::new(42)),
1696 new: ChunkIdentifier::new(13),
1697 next: Some(ChunkIdentifier::new(37)), },
1700 Update::NewItemsChunk {
1701 previous: Some(ChunkIdentifier::new(13)),
1702 new: ChunkIdentifier::new(37),
1703 next: None,
1704 },
1705 ],
1706 )
1707 .await
1708 .unwrap();
1709
1710 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1711
1712 assert_eq!(chunks.len(), 3);
1713
1714 {
1715 let c = chunks.remove(0);
1717 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1718 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1719 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1720 assert_matches!(c.content, ChunkContent::Items(events) => {
1721 assert!(events.is_empty());
1722 });
1723
1724 let c = chunks.remove(0);
1725 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1726 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1727 assert_eq!(c.next, None);
1728 assert_matches!(c.content, ChunkContent::Items(events) => {
1729 assert!(events.is_empty());
1730 });
1731
1732 let c = chunks.remove(0);
1733 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1734 assert_eq!(c.previous, None);
1735 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1736 assert_matches!(c.content, ChunkContent::Items(events) => {
1737 assert!(events.is_empty());
1738 });
1739 }
1740 }
1741
1742 #[async_test]
1743 async fn test_linked_chunk_new_gap_chunk() {
1744 let store = get_event_cache_store().await.expect("creating cache store failed");
1745
1746 let room_id = &DEFAULT_TEST_ROOM_ID;
1747 let linked_chunk_id = LinkedChunkId::Room(room_id);
1748
1749 store
1750 .handle_linked_chunk_updates(
1751 linked_chunk_id,
1752 vec![Update::NewGapChunk {
1753 previous: None,
1754 new: ChunkIdentifier::new(42),
1755 next: None,
1756 gap: Gap { prev_token: "raclette".to_owned() },
1757 }],
1758 )
1759 .await
1760 .unwrap();
1761
1762 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1763
1764 assert_eq!(chunks.len(), 1);
1765
1766 let c = chunks.remove(0);
1768 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1769 assert_eq!(c.previous, None);
1770 assert_eq!(c.next, None);
1771 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1772 assert_eq!(gap.prev_token, "raclette");
1773 });
1774 }
1775
1776 #[async_test]
1777 async fn test_linked_chunk_replace_item() {
1778 let store = get_event_cache_store().await.expect("creating cache store failed");
1779
1780 let room_id = &DEFAULT_TEST_ROOM_ID;
1781 let linked_chunk_id = LinkedChunkId::Room(room_id);
1782 let event_id = event_id!("$world");
1783
1784 store
1785 .handle_linked_chunk_updates(
1786 linked_chunk_id,
1787 vec![
1788 Update::NewItemsChunk {
1789 previous: None,
1790 new: ChunkIdentifier::new(42),
1791 next: None,
1792 },
1793 Update::PushItems {
1794 at: Position::new(ChunkIdentifier::new(42), 0),
1795 items: vec![
1796 make_test_event(room_id, "hello"),
1797 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1798 ],
1799 },
1800 Update::ReplaceItem {
1801 at: Position::new(ChunkIdentifier::new(42), 1),
1802 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1803 },
1804 ],
1805 )
1806 .await
1807 .unwrap();
1808
1809 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1810
1811 assert_eq!(chunks.len(), 1);
1812
1813 let c = chunks.remove(0);
1814 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1815 assert_eq!(c.previous, None);
1816 assert_eq!(c.next, None);
1817 assert_matches!(c.content, ChunkContent::Items(events) => {
1818 assert_eq!(events.len(), 2);
1819 check_test_event(&events[0], "hello");
1820 check_test_event(&events[1], "yolo");
1821 });
1822 }
1823
1824 #[async_test]
1825 async fn test_linked_chunk_remove_chunk() {
1826 let store = get_event_cache_store().await.expect("creating cache store failed");
1827
1828 let room_id = &DEFAULT_TEST_ROOM_ID;
1829 let linked_chunk_id = LinkedChunkId::Room(room_id);
1830
1831 store
1832 .handle_linked_chunk_updates(
1833 linked_chunk_id,
1834 vec![
1835 Update::NewGapChunk {
1836 previous: None,
1837 new: ChunkIdentifier::new(42),
1838 next: None,
1839 gap: Gap { prev_token: "raclette".to_owned() },
1840 },
1841 Update::NewGapChunk {
1842 previous: Some(ChunkIdentifier::new(42)),
1843 new: ChunkIdentifier::new(43),
1844 next: None,
1845 gap: Gap { prev_token: "fondue".to_owned() },
1846 },
1847 Update::NewGapChunk {
1848 previous: Some(ChunkIdentifier::new(43)),
1849 new: ChunkIdentifier::new(44),
1850 next: None,
1851 gap: Gap { prev_token: "tartiflette".to_owned() },
1852 },
1853 Update::RemoveChunk(ChunkIdentifier::new(43)),
1854 ],
1855 )
1856 .await
1857 .unwrap();
1858
1859 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1860
1861 assert_eq!(chunks.len(), 2);
1862
1863 let c = chunks.remove(0);
1865 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1866 assert_eq!(c.previous, None);
1867 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1868 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1869 assert_eq!(gap.prev_token, "raclette");
1870 });
1871
1872 let c = chunks.remove(0);
1873 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1874 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1875 assert_eq!(c.next, None);
1876 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1877 assert_eq!(gap.prev_token, "tartiflette");
1878 });
1879
1880 let gaps = store
1882 .read()
1883 .await
1884 .unwrap()
1885 .with_transaction(|txn| -> rusqlite::Result<_> {
1886 let mut gaps = Vec::new();
1887 for data in txn
1888 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1889 .query_map((), |row| row.get::<_, u64>(0))?
1890 {
1891 gaps.push(data?);
1892 }
1893 Ok(gaps)
1894 })
1895 .await
1896 .unwrap();
1897
1898 assert_eq!(gaps, vec![42, 44]);
1899 }
1900
1901 #[async_test]
1902 async fn test_linked_chunk_push_items() {
1903 let store = get_event_cache_store().await.expect("creating cache store failed");
1904
1905 let room_id = &DEFAULT_TEST_ROOM_ID;
1906 let linked_chunk_id = LinkedChunkId::Room(room_id);
1907
1908 store
1909 .handle_linked_chunk_updates(
1910 linked_chunk_id,
1911 vec![
1912 Update::NewItemsChunk {
1913 previous: None,
1914 new: ChunkIdentifier::new(42),
1915 next: None,
1916 },
1917 Update::PushItems {
1918 at: Position::new(ChunkIdentifier::new(42), 0),
1919 items: vec![
1920 make_test_event(room_id, "hello"),
1921 make_test_event(room_id, "world"),
1922 ],
1923 },
1924 Update::PushItems {
1925 at: Position::new(ChunkIdentifier::new(42), 2),
1926 items: vec![make_test_event(room_id, "who?")],
1927 },
1928 ],
1929 )
1930 .await
1931 .unwrap();
1932
1933 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1934
1935 assert_eq!(chunks.len(), 1);
1936
1937 let c = chunks.remove(0);
1938 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1939 assert_eq!(c.previous, None);
1940 assert_eq!(c.next, None);
1941 assert_matches!(c.content, ChunkContent::Items(events) => {
1942 assert_eq!(events.len(), 3);
1943
1944 check_test_event(&events[0], "hello");
1945 check_test_event(&events[1], "world");
1946 check_test_event(&events[2], "who?");
1947 });
1948 }
1949
1950 #[async_test]
1951 async fn test_linked_chunk_remove_item() {
1952 let store = get_event_cache_store().await.expect("creating cache store failed");
1953
1954 let room_id = *DEFAULT_TEST_ROOM_ID;
1955 let linked_chunk_id = LinkedChunkId::Room(room_id);
1956
1957 store
1958 .handle_linked_chunk_updates(
1959 linked_chunk_id,
1960 vec![
1961 Update::NewItemsChunk {
1962 previous: None,
1963 new: ChunkIdentifier::new(42),
1964 next: None,
1965 },
1966 Update::PushItems {
1967 at: Position::new(ChunkIdentifier::new(42), 0),
1968 items: vec![
1969 make_test_event(room_id, "one"),
1970 make_test_event(room_id, "two"),
1971 make_test_event(room_id, "three"),
1972 make_test_event(room_id, "four"),
1973 make_test_event(room_id, "five"),
1974 make_test_event(room_id, "six"),
1975 ],
1976 },
1977 Update::RemoveItem {
1978 at: Position::new(ChunkIdentifier::new(42), 2), },
1980 ],
1981 )
1982 .await
1983 .unwrap();
1984
1985 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1986
1987 assert_eq!(chunks.len(), 1);
1988
1989 let c = chunks.remove(0);
1990 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1991 assert_eq!(c.previous, None);
1992 assert_eq!(c.next, None);
1993 assert_matches!(c.content, ChunkContent::Items(events) => {
1994 assert_eq!(events.len(), 5);
1995 check_test_event(&events[0], "one");
1996 check_test_event(&events[1], "two");
1997 check_test_event(&events[2], "four");
1998 check_test_event(&events[3], "five");
1999 check_test_event(&events[4], "six");
2000 });
2001
2002 let num_rows: u64 = store
2004 .read()
2005 .await
2006 .unwrap()
2007 .with_transaction(move |txn| {
2008 txn.query_row(
2009 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2010 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2011 |row| row.get(0),
2012 )
2013 })
2014 .await
2015 .unwrap();
2016 assert_eq!(num_rows, 3);
2017 }
2018
2019 #[async_test]
2020 async fn test_linked_chunk_detach_last_items() {
2021 let store = get_event_cache_store().await.expect("creating cache store failed");
2022
2023 let room_id = *DEFAULT_TEST_ROOM_ID;
2024 let linked_chunk_id = LinkedChunkId::Room(room_id);
2025
2026 store
2027 .handle_linked_chunk_updates(
2028 linked_chunk_id,
2029 vec![
2030 Update::NewItemsChunk {
2031 previous: None,
2032 new: ChunkIdentifier::new(42),
2033 next: None,
2034 },
2035 Update::PushItems {
2036 at: Position::new(ChunkIdentifier::new(42), 0),
2037 items: vec![
2038 make_test_event(room_id, "hello"),
2039 make_test_event(room_id, "world"),
2040 make_test_event(room_id, "howdy"),
2041 ],
2042 },
2043 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2044 ],
2045 )
2046 .await
2047 .unwrap();
2048
2049 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2050
2051 assert_eq!(chunks.len(), 1);
2052
2053 let c = chunks.remove(0);
2054 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2055 assert_eq!(c.previous, None);
2056 assert_eq!(c.next, None);
2057 assert_matches!(c.content, ChunkContent::Items(events) => {
2058 assert_eq!(events.len(), 1);
2059 check_test_event(&events[0], "hello");
2060 });
2061 }
2062
2063 #[async_test]
2064 async fn test_linked_chunk_start_end_reattach_items() {
2065 let store = get_event_cache_store().await.expect("creating cache store failed");
2066
2067 let room_id = *DEFAULT_TEST_ROOM_ID;
2068 let linked_chunk_id = LinkedChunkId::Room(room_id);
2069
2070 store
2074 .handle_linked_chunk_updates(
2075 linked_chunk_id,
2076 vec![
2077 Update::NewItemsChunk {
2078 previous: None,
2079 new: ChunkIdentifier::new(42),
2080 next: None,
2081 },
2082 Update::PushItems {
2083 at: Position::new(ChunkIdentifier::new(42), 0),
2084 items: vec![
2085 make_test_event(room_id, "hello"),
2086 make_test_event(room_id, "world"),
2087 make_test_event(room_id, "howdy"),
2088 ],
2089 },
2090 Update::StartReattachItems,
2091 Update::EndReattachItems,
2092 ],
2093 )
2094 .await
2095 .unwrap();
2096
2097 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2098
2099 assert_eq!(chunks.len(), 1);
2100
2101 let c = chunks.remove(0);
2102 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2103 assert_eq!(c.previous, None);
2104 assert_eq!(c.next, None);
2105 assert_matches!(c.content, ChunkContent::Items(events) => {
2106 assert_eq!(events.len(), 3);
2107 check_test_event(&events[0], "hello");
2108 check_test_event(&events[1], "world");
2109 check_test_event(&events[2], "howdy");
2110 });
2111 }
2112
2113 #[async_test]
2114 async fn test_linked_chunk_clear() {
2115 let store = get_event_cache_store().await.expect("creating cache store failed");
2116
2117 let room_id = *DEFAULT_TEST_ROOM_ID;
2118 let linked_chunk_id = LinkedChunkId::Room(room_id);
2119 let event_0 = make_test_event(room_id, "hello");
2120 let event_1 = make_test_event(room_id, "world");
2121 let event_2 = make_test_event(room_id, "howdy");
2122
2123 store
2124 .handle_linked_chunk_updates(
2125 linked_chunk_id,
2126 vec![
2127 Update::NewItemsChunk {
2128 previous: None,
2129 new: ChunkIdentifier::new(42),
2130 next: None,
2131 },
2132 Update::NewGapChunk {
2133 previous: Some(ChunkIdentifier::new(42)),
2134 new: ChunkIdentifier::new(54),
2135 next: None,
2136 gap: Gap { prev_token: "fondue".to_owned() },
2137 },
2138 Update::PushItems {
2139 at: Position::new(ChunkIdentifier::new(42), 0),
2140 items: vec![event_0.clone(), event_1, event_2],
2141 },
2142 Update::Clear,
2143 ],
2144 )
2145 .await
2146 .unwrap();
2147
2148 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2149 assert!(chunks.is_empty());
2150
2151 store
2153 .read()
2154 .await
2155 .unwrap()
2156 .with_transaction(|txn| -> rusqlite::Result<_> {
2157 let num_gaps = txn
2158 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2159 .query_row((), |row| row.get::<_, u64>(0))?;
2160 assert_eq!(num_gaps, 0);
2161
2162 let num_events = txn
2163 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2164 .query_row((), |row| row.get::<_, u64>(0))?;
2165 assert_eq!(num_events, 0);
2166
2167 Ok(())
2168 })
2169 .await
2170 .unwrap();
2171
2172 store
2174 .handle_linked_chunk_updates(
2175 linked_chunk_id,
2176 vec![
2177 Update::NewItemsChunk {
2178 previous: None,
2179 new: ChunkIdentifier::new(42),
2180 next: None,
2181 },
2182 Update::PushItems {
2183 at: Position::new(ChunkIdentifier::new(42), 0),
2184 items: vec![event_0],
2185 },
2186 ],
2187 )
2188 .await
2189 .unwrap();
2190 }
2191
2192 #[async_test]
2193 async fn test_linked_chunk_multiple_rooms() {
2194 let store = get_event_cache_store().await.expect("creating cache store failed");
2195
2196 let room1 = room_id!("!realcheeselovers:raclette.fr");
2197 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2198 let room2 = room_id!("!realcheeselovers:fondue.ch");
2199 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2200
2201 store
2205 .handle_linked_chunk_updates(
2206 linked_chunk_id1,
2207 vec![
2208 Update::NewItemsChunk {
2209 previous: None,
2210 new: ChunkIdentifier::new(42),
2211 next: None,
2212 },
2213 Update::PushItems {
2214 at: Position::new(ChunkIdentifier::new(42), 0),
2215 items: vec![
2216 make_test_event(room1, "best cheese is raclette"),
2217 make_test_event(room1, "obviously"),
2218 ],
2219 },
2220 ],
2221 )
2222 .await
2223 .unwrap();
2224
2225 store
2226 .handle_linked_chunk_updates(
2227 linked_chunk_id2,
2228 vec![
2229 Update::NewItemsChunk {
2230 previous: None,
2231 new: ChunkIdentifier::new(42),
2232 next: None,
2233 },
2234 Update::PushItems {
2235 at: Position::new(ChunkIdentifier::new(42), 0),
2236 items: vec![make_test_event(room1, "beaufort is the best")],
2237 },
2238 ],
2239 )
2240 .await
2241 .unwrap();
2242
2243 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2245 assert_eq!(chunks_room1.len(), 1);
2246
2247 let c = chunks_room1.remove(0);
2248 assert_matches!(c.content, ChunkContent::Items(events) => {
2249 assert_eq!(events.len(), 2);
2250 check_test_event(&events[0], "best cheese is raclette");
2251 check_test_event(&events[1], "obviously");
2252 });
2253
2254 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2256 assert_eq!(chunks_room2.len(), 1);
2257
2258 let c = chunks_room2.remove(0);
2259 assert_matches!(c.content, ChunkContent::Items(events) => {
2260 assert_eq!(events.len(), 1);
2261 check_test_event(&events[0], "beaufort is the best");
2262 });
2263 }
2264
2265 #[async_test]
2266 async fn test_linked_chunk_update_is_a_transaction() {
2267 let store = get_event_cache_store().await.expect("creating cache store failed");
2268
2269 let room_id = *DEFAULT_TEST_ROOM_ID;
2270 let linked_chunk_id = LinkedChunkId::Room(room_id);
2271
2272 let err = store
2275 .handle_linked_chunk_updates(
2276 linked_chunk_id,
2277 vec![
2278 Update::NewItemsChunk {
2279 previous: None,
2280 new: ChunkIdentifier::new(42),
2281 next: None,
2282 },
2283 Update::NewItemsChunk {
2284 previous: None,
2285 new: ChunkIdentifier::new(42),
2286 next: None,
2287 },
2288 ],
2289 )
2290 .await
2291 .unwrap_err();
2292
2293 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2295 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2296 });
2297
2298 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2302 assert!(chunks.is_empty());
2303 }
2304
2305 #[async_test]
2306 async fn test_filter_duplicate_events_no_events() {
2307 let store = get_event_cache_store().await.expect("creating cache store failed");
2308
2309 let room_id = *DEFAULT_TEST_ROOM_ID;
2310 let linked_chunk_id = LinkedChunkId::Room(room_id);
2311 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2312 assert!(duplicates.is_empty());
2313 }
2314
2315 #[async_test]
2316 async fn test_load_last_chunk() {
2317 let room_id = room_id!("!r0:matrix.org");
2318 let linked_chunk_id = LinkedChunkId::Room(room_id);
2319 let event = |msg: &str| make_test_event(room_id, msg);
2320 let store = get_event_cache_store().await.expect("creating cache store failed");
2321
2322 {
2324 let (last_chunk, chunk_identifier_generator) =
2325 store.load_last_chunk(linked_chunk_id).await.unwrap();
2326
2327 assert!(last_chunk.is_none());
2328 assert_eq!(chunk_identifier_generator.current(), 0);
2329 }
2330
2331 {
2333 store
2334 .handle_linked_chunk_updates(
2335 linked_chunk_id,
2336 vec![
2337 Update::NewItemsChunk {
2338 previous: None,
2339 new: ChunkIdentifier::new(42),
2340 next: None,
2341 },
2342 Update::PushItems {
2343 at: Position::new(ChunkIdentifier::new(42), 0),
2344 items: vec![event("saucisse de morteau"), event("comté")],
2345 },
2346 ],
2347 )
2348 .await
2349 .unwrap();
2350
2351 let (last_chunk, chunk_identifier_generator) =
2352 store.load_last_chunk(linked_chunk_id).await.unwrap();
2353
2354 assert_matches!(last_chunk, Some(last_chunk) => {
2355 assert_eq!(last_chunk.identifier, 42);
2356 assert!(last_chunk.previous.is_none());
2357 assert!(last_chunk.next.is_none());
2358 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2359 assert_eq!(items.len(), 2);
2360 check_test_event(&items[0], "saucisse de morteau");
2361 check_test_event(&items[1], "comté");
2362 });
2363 });
2364 assert_eq!(chunk_identifier_generator.current(), 42);
2365 }
2366
2367 {
2369 store
2370 .handle_linked_chunk_updates(
2371 linked_chunk_id,
2372 vec![
2373 Update::NewItemsChunk {
2374 previous: Some(ChunkIdentifier::new(42)),
2375 new: ChunkIdentifier::new(7),
2376 next: None,
2377 },
2378 Update::PushItems {
2379 at: Position::new(ChunkIdentifier::new(7), 0),
2380 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2381 },
2382 ],
2383 )
2384 .await
2385 .unwrap();
2386
2387 let (last_chunk, chunk_identifier_generator) =
2388 store.load_last_chunk(linked_chunk_id).await.unwrap();
2389
2390 assert_matches!(last_chunk, Some(last_chunk) => {
2391 assert_eq!(last_chunk.identifier, 7);
2392 assert_matches!(last_chunk.previous, Some(previous) => {
2393 assert_eq!(previous, 42);
2394 });
2395 assert!(last_chunk.next.is_none());
2396 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2397 assert_eq!(items.len(), 3);
2398 check_test_event(&items[0], "fondue");
2399 check_test_event(&items[1], "gruyère");
2400 check_test_event(&items[2], "mont d'or");
2401 });
2402 });
2403 assert_eq!(chunk_identifier_generator.current(), 42);
2404 }
2405 }
2406
2407 #[async_test]
2408 async fn test_load_last_chunk_with_a_cycle() {
2409 let room_id = room_id!("!r0:matrix.org");
2410 let linked_chunk_id = LinkedChunkId::Room(room_id);
2411 let store = get_event_cache_store().await.expect("creating cache store failed");
2412
2413 store
2414 .handle_linked_chunk_updates(
2415 linked_chunk_id,
2416 vec![
2417 Update::NewItemsChunk {
2418 previous: None,
2419 new: ChunkIdentifier::new(0),
2420 next: None,
2421 },
2422 Update::NewItemsChunk {
2423 previous: Some(ChunkIdentifier::new(0)),
2427 new: ChunkIdentifier::new(1),
2428 next: Some(ChunkIdentifier::new(0)),
2429 },
2430 ],
2431 )
2432 .await
2433 .unwrap();
2434
2435 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2436 }
2437
2438 #[async_test]
2439 async fn test_load_previous_chunk() {
2440 let room_id = room_id!("!r0:matrix.org");
2441 let linked_chunk_id = LinkedChunkId::Room(room_id);
2442 let event = |msg: &str| make_test_event(room_id, msg);
2443 let store = get_event_cache_store().await.expect("creating cache store failed");
2444
2445 {
2448 let previous_chunk = store
2449 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2450 .await
2451 .unwrap();
2452
2453 assert!(previous_chunk.is_none());
2454 }
2455
2456 {
2459 store
2460 .handle_linked_chunk_updates(
2461 linked_chunk_id,
2462 vec![Update::NewItemsChunk {
2463 previous: None,
2464 new: ChunkIdentifier::new(42),
2465 next: None,
2466 }],
2467 )
2468 .await
2469 .unwrap();
2470
2471 let previous_chunk =
2472 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2473
2474 assert!(previous_chunk.is_none());
2475 }
2476
2477 {
2479 store
2480 .handle_linked_chunk_updates(
2481 linked_chunk_id,
2482 vec![
2483 Update::NewItemsChunk {
2485 previous: None,
2486 new: ChunkIdentifier::new(7),
2487 next: Some(ChunkIdentifier::new(42)),
2488 },
2489 Update::PushItems {
2490 at: Position::new(ChunkIdentifier::new(7), 0),
2491 items: vec![event("brigand du jorat"), event("morbier")],
2492 },
2493 ],
2494 )
2495 .await
2496 .unwrap();
2497
2498 let previous_chunk =
2499 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2500
2501 assert_matches!(previous_chunk, Some(previous_chunk) => {
2502 assert_eq!(previous_chunk.identifier, 7);
2503 assert!(previous_chunk.previous.is_none());
2504 assert_matches!(previous_chunk.next, Some(next) => {
2505 assert_eq!(next, 42);
2506 });
2507 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2508 assert_eq!(items.len(), 2);
2509 check_test_event(&items[0], "brigand du jorat");
2510 check_test_event(&items[1], "morbier");
2511 });
2512 });
2513 }
2514 }
2515}
2516
2517#[cfg(test)]
2518mod encrypted_tests {
2519 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2520
2521 use matrix_sdk_base::{
2522 event_cache::store::{EventCacheStore, EventCacheStoreError},
2523 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2524 };
2525 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2526 use once_cell::sync::Lazy;
2527 use ruma::{
2528 event_id,
2529 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2530 room_id, user_id,
2531 };
2532 use tempfile::{tempdir, TempDir};
2533
2534 use super::SqliteEventCacheStore;
2535
2536 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2537 static NUM: AtomicU32 = AtomicU32::new(0);
2538
2539 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2540 let name = NUM.fetch_add(1, SeqCst).to_string();
2541 let tmpdir_path = TMP_DIR.path().join(name);
2542
2543 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2544
2545 Ok(SqliteEventCacheStore::open(
2546 tmpdir_path.to_str().unwrap(),
2547 Some("default_test_password"),
2548 )
2549 .await
2550 .unwrap())
2551 }
2552
2553 event_cache_store_integration_tests!();
2554 event_cache_store_integration_tests_time!();
2555
2556 #[async_test]
2557 async fn test_no_sqlite_injection_in_find_event_relations() {
2558 let room_id = room_id!("!test:localhost");
2559 let another_room_id = room_id!("!r1:matrix.org");
2560 let sender = user_id!("@alice:localhost");
2561
2562 let store = get_event_cache_store()
2563 .await
2564 .expect("We should be able to create a new, empty, event cache store");
2565
2566 let f = EventFactory::new().room(room_id).sender(sender);
2567
2568 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2570 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2571
2572 let edit_id = event_id!("$find_me:matrix.org");
2574 let edit = f
2575 .text_msg("Find me")
2576 .event_id(edit_id)
2577 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2578 .into_event();
2579
2580 let f = f.room(another_room_id);
2582
2583 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2584 let another_event =
2585 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2586
2587 store.save_event(room_id, event).await.unwrap();
2589 store.save_event(room_id, edit).await.unwrap();
2590 store.save_event(another_room_id, another_event).await.unwrap();
2591
2592 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2596
2597 let results = store
2599 .find_event_relations(room_id, event_id, filter.as_deref())
2600 .await
2601 .expect("We should be able to attempt to find event relations");
2602
2603 similar_asserts::assert_eq!(
2605 results.len(),
2606 1,
2607 "We should only have loaded events for the first room {results:#?}"
2608 );
2609
2610 let (found_event, _) = &results[0];
2612 assert_eq!(
2613 found_event.event_id().as_deref(),
2614 Some(edit_id),
2615 "The single event we found should be the edit event"
2616 );
2617 }
2618}