1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{extract_event_relation, EventCacheStore},
25 Event, Gap,
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{
38 params, params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior,
39};
40use tokio::{
41 fs,
42 sync::{Mutex, OwnedMutexGuard},
43};
44use tracing::{debug, error, instrument, trace};
45
46use crate::{
47 error::{Error, Result},
48 utils::{
49 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
50 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
51 },
52 OpenStoreError, Secret, SqliteStoreConfig,
53};
54
55mod keys {
56 pub const LINKED_CHUNKS: &str = "linked_chunks";
58 pub const EVENTS: &str = "events";
59}
60
61const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
63
64const DATABASE_VERSION: u8 = 12;
70
71const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
74const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
77
78#[derive(Clone)]
80pub struct SqliteEventCacheStore {
81 store_cipher: Option<Arc<StoreCipher>>,
82
83 pool: SqlitePool,
85
86 write_connection: Arc<Mutex<SqliteAsyncConn>>,
91}
92
93#[cfg(not(tarpaulin_include))]
94impl fmt::Debug for SqliteEventCacheStore {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
97 }
98}
99
100impl EncryptableStore for SqliteEventCacheStore {
101 fn get_cypher(&self) -> Option<&StoreCipher> {
102 self.store_cipher.as_deref()
103 }
104}
105
106impl SqliteEventCacheStore {
107 pub async fn open(
110 path: impl AsRef<Path>,
111 passphrase: Option<&str>,
112 ) -> Result<Self, OpenStoreError> {
113 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
114 }
115
116 pub async fn open_with_key(
119 path: impl AsRef<Path>,
120 key: Option<&[u8; 32]>,
121 ) -> Result<Self, OpenStoreError> {
122 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
123 }
124
125 #[instrument(skip(config), fields(path = ?config.path))]
127 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
128 debug!(?config);
129
130 let _timer = timer!("open_with_config");
131
132 let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
133
134 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
135
136 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
137 config.pool = Some(pool_config);
138
139 let pool = config.create_pool(Runtime::Tokio1)?;
140
141 let this = Self::open_with_pool(pool, secret).await?;
142 this.write().await?.apply_runtime_config(runtime_config).await?;
143
144 Ok(this)
145 }
146
147 async fn open_with_pool(
150 pool: SqlitePool,
151 secret: Option<Secret>,
152 ) -> Result<Self, OpenStoreError> {
153 let conn = pool.get().await?;
154
155 let version = conn.db_version().await?;
156 run_migrations(&conn, version).await?;
157
158 let store_cipher = match secret {
159 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
160 None => None,
161 };
162
163 Ok(Self {
164 store_cipher,
165 pool,
166 write_connection: Arc::new(Mutex::new(conn)),
168 })
169 }
170
171 #[instrument(skip_all)]
173 async fn read(&self) -> Result<SqliteAsyncConn> {
174 trace!("Taking a `read` connection");
175 let _timer = timer!("connection");
176
177 let connection = self.pool.get().await?;
178
179 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
184
185 Ok(connection)
186 }
187
188 #[instrument(skip_all)]
190 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
191 trace!("Taking a `write` connection");
192 let _timer = timer!("connection");
193
194 let connection = self.write_connection.clone().lock_owned().await;
195
196 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
201
202 Ok(connection)
203 }
204
205 fn map_row_to_chunk(
206 row: &rusqlite::Row<'_>,
207 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
208 Ok((
209 row.get::<_, u64>(0)?,
210 row.get::<_, Option<u64>>(1)?,
211 row.get::<_, Option<u64>>(2)?,
212 row.get::<_, String>(3)?,
213 ))
214 }
215
216 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
217 let serialized = serde_json::to_vec(event)?;
218
219 let raw_event = event.raw();
221 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
222
223 let content = self.encode_value(serialized)?;
225
226 Ok(EncodedEvent {
227 content,
228 rel_type,
229 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
230 })
231 }
232}
233
234struct EncodedEvent {
235 content: Vec<u8>,
236 rel_type: Option<String>,
237 relates_to: Option<String>,
238}
239
240trait TransactionExtForLinkedChunks {
241 fn rebuild_chunk(
242 &self,
243 store: &SqliteEventCacheStore,
244 linked_chunk_id: &Key,
245 previous: Option<u64>,
246 index: u64,
247 next: Option<u64>,
248 chunk_type: &str,
249 ) -> Result<RawChunk<Event, Gap>>;
250
251 fn load_gap_content(
252 &self,
253 store: &SqliteEventCacheStore,
254 linked_chunk_id: &Key,
255 chunk_id: ChunkIdentifier,
256 ) -> Result<Gap>;
257
258 fn load_events_content(
259 &self,
260 store: &SqliteEventCacheStore,
261 linked_chunk_id: &Key,
262 chunk_id: ChunkIdentifier,
263 ) -> Result<Vec<Event>>;
264}
265
266impl TransactionExtForLinkedChunks for Transaction<'_> {
267 fn rebuild_chunk(
268 &self,
269 store: &SqliteEventCacheStore,
270 linked_chunk_id: &Key,
271 previous: Option<u64>,
272 id: u64,
273 next: Option<u64>,
274 chunk_type: &str,
275 ) -> Result<RawChunk<Event, Gap>> {
276 let previous = previous.map(ChunkIdentifier::new);
277 let next = next.map(ChunkIdentifier::new);
278 let id = ChunkIdentifier::new(id);
279
280 match chunk_type {
281 CHUNK_TYPE_GAP_TYPE_STRING => {
282 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
284 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
285 }
286
287 CHUNK_TYPE_EVENT_TYPE_STRING => {
288 let events = self.load_events_content(store, linked_chunk_id, id)?;
290 Ok(RawChunk {
291 content: ChunkContent::Items(events),
292 previous,
293 identifier: id,
294 next,
295 })
296 }
297
298 other => {
299 Err(Error::InvalidData {
301 details: format!("a linked chunk has an unknown type {other}"),
302 })
303 }
304 }
305 }
306
307 fn load_gap_content(
308 &self,
309 store: &SqliteEventCacheStore,
310 linked_chunk_id: &Key,
311 chunk_id: ChunkIdentifier,
312 ) -> Result<Gap> {
313 let encoded_prev_token: Vec<u8> = self.query_row(
316 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
317 (chunk_id.index(), &linked_chunk_id),
318 |row| row.get(0),
319 )?;
320 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
321 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
322 Ok(Gap { prev_token })
323 }
324
325 fn load_events_content(
326 &self,
327 store: &SqliteEventCacheStore,
328 linked_chunk_id: &Key,
329 chunk_id: ChunkIdentifier,
330 ) -> Result<Vec<Event>> {
331 let mut events = Vec::new();
333
334 for event_data in self
335 .prepare(
336 r#"
337 SELECT events.content
338 FROM event_chunks ec, events
339 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
340 ORDER BY ec.position ASC
341 "#,
342 )?
343 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
344 {
345 let encoded_content = event_data?;
346 let serialized_content = store.decode_value(&encoded_content)?;
347 let event = serde_json::from_slice(&serialized_content)?;
348
349 events.push(event);
350 }
351
352 Ok(events)
353 }
354}
355
356async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
358 if version == 0 {
359 debug!("Creating database");
360 } else if version < DATABASE_VERSION {
361 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
362 } else {
363 return Ok(());
364 }
365
366 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
368
369 if version < 1 {
370 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
373 conn.with_transaction(|txn| {
374 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
375 txn.set_db_version(1)
376 })
377 .await?;
378 }
379
380 if version < 2 {
381 conn.with_transaction(|txn| {
382 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
383 txn.set_db_version(2)
384 })
385 .await?;
386 }
387
388 if version < 3 {
389 conn.with_transaction(|txn| {
390 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
391 txn.set_db_version(3)
392 })
393 .await?;
394 }
395
396 if version < 4 {
397 conn.with_transaction(|txn| {
398 txn.execute_batch(include_str!(
399 "../migrations/event_cache_store/004_ignore_policy.sql"
400 ))?;
401 txn.set_db_version(4)
402 })
403 .await?;
404 }
405
406 if version < 5 {
407 conn.with_transaction(|txn| {
408 txn.execute_batch(include_str!(
409 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
410 ))?;
411 txn.set_db_version(5)
412 })
413 .await?;
414 }
415
416 if version < 6 {
417 conn.with_transaction(|txn| {
418 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
419 txn.set_db_version(6)
420 })
421 .await?;
422 }
423
424 if version < 7 {
425 conn.with_transaction(|txn| {
426 txn.execute_batch(include_str!(
427 "../migrations/event_cache_store/007_event_chunks.sql"
428 ))?;
429 txn.set_db_version(7)
430 })
431 .await?;
432 }
433
434 if version < 8 {
435 conn.with_transaction(|txn| {
436 txn.execute_batch(include_str!(
437 "../migrations/event_cache_store/008_linked_chunk_id.sql"
438 ))?;
439 txn.set_db_version(8)
440 })
441 .await?;
442 }
443
444 if version < 9 {
445 conn.with_transaction(|txn| {
446 txn.execute_batch(include_str!(
447 "../migrations/event_cache_store/009_related_event_index.sql"
448 ))?;
449 txn.set_db_version(9)
450 })
451 .await?;
452 }
453
454 if version < 10 {
455 conn.with_transaction(|txn| {
456 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
457 txn.set_db_version(10)
458 })
459 .await?;
460
461 if version >= 1 {
462 conn.vacuum().await?;
465 }
466 }
467
468 if version < 11 {
469 conn.with_transaction(|txn| {
470 txn.execute_batch(include_str!(
471 "../migrations/event_cache_store/011_empty_event_cache.sql"
472 ))?;
473 txn.set_db_version(11)
474 })
475 .await?;
476 }
477
478 if version < 12 {
479 conn.with_transaction(|txn| {
480 txn.execute_batch(include_str!(
481 "../migrations/event_cache_store/012_store_event_type.sql"
482 ))?;
483 txn.set_db_version(12)
484 })
485 .await?;
486 }
487
488 Ok(())
489}
490
491#[async_trait]
492impl EventCacheStore for SqliteEventCacheStore {
493 type Error = Error;
494
495 #[instrument(skip(self))]
496 async fn try_take_leased_lock(
497 &self,
498 lease_duration_ms: u32,
499 key: &str,
500 holder: &str,
501 ) -> Result<bool> {
502 let _timer = timer!("method");
503
504 let key = key.to_owned();
505 let holder = holder.to_owned();
506
507 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
508 let expiration = now + lease_duration_ms as u64;
509
510 let num_touched = self
511 .write()
512 .await?
513 .with_transaction(move |txn| {
514 txn.execute(
515 "INSERT INTO lease_locks (key, holder, expiration)
516 VALUES (?1, ?2, ?3)
517 ON CONFLICT (key)
518 DO
519 UPDATE SET holder = ?2, expiration = ?3
520 WHERE holder = ?2
521 OR expiration < ?4
522 ",
523 (key, holder, expiration, now),
524 )
525 })
526 .await?;
527
528 Ok(num_touched == 1)
529 }
530
531 #[instrument(skip(self, updates))]
532 async fn handle_linked_chunk_updates(
533 &self,
534 linked_chunk_id: LinkedChunkId<'_>,
535 updates: Vec<Update<Event, Gap>>,
536 ) -> Result<(), Self::Error> {
537 let _timer = timer!("method");
538
539 let hashed_linked_chunk_id =
542 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
543 let linked_chunk_id = linked_chunk_id.to_owned();
544 let this = self.clone();
545
546 with_immediate_transaction(self, move |txn| {
547 for up in updates {
548 match up {
549 Update::NewItemsChunk { previous, new, next } => {
550 let previous = previous.as_ref().map(ChunkIdentifier::index);
551 let new = new.index();
552 let next = next.as_ref().map(ChunkIdentifier::index);
553
554 trace!(
555 %linked_chunk_id,
556 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
557 );
558
559 insert_chunk(
560 txn,
561 &hashed_linked_chunk_id,
562 previous,
563 new,
564 next,
565 CHUNK_TYPE_EVENT_TYPE_STRING,
566 )?;
567 }
568
569 Update::NewGapChunk { previous, new, next, gap } => {
570 let serialized = serde_json::to_vec(&gap.prev_token)?;
571 let prev_token = this.encode_value(serialized)?;
572
573 let previous = previous.as_ref().map(ChunkIdentifier::index);
574 let new = new.index();
575 let next = next.as_ref().map(ChunkIdentifier::index);
576
577 trace!(
578 %linked_chunk_id,
579 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
580 );
581
582 insert_chunk(
584 txn,
585 &hashed_linked_chunk_id,
586 previous,
587 new,
588 next,
589 CHUNK_TYPE_GAP_TYPE_STRING,
590 )?;
591
592 txn.execute(
594 r#"
595 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
596 VALUES (?, ?, ?)
597 "#,
598 (new, &hashed_linked_chunk_id, prev_token),
599 )?;
600 }
601
602 Update::RemoveChunk(chunk_identifier) => {
603 let chunk_id = chunk_identifier.index();
604
605 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
606
607 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
609 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
610 (chunk_id, &hashed_linked_chunk_id),
611 |row| Ok((row.get(0)?, row.get(1)?))
612 )?;
613
614 if let Some(previous) = previous {
616 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
617 }
618
619 if let Some(next) = next {
621 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
622 }
623
624 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
627 }
628
629 Update::PushItems { at, items } => {
630 if items.is_empty() {
631 continue;
633 }
634
635 let chunk_id = at.chunk_identifier().index();
636
637 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
638
639 let mut chunk_statement = txn.prepare(
640 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
641 )?;
642
643 let mut content_statement = txn.prepare(
648 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
649 )?;
650
651 let invalid_event = |event: TimelineEvent| {
652 let Some(event_id) = event.event_id() else {
653 error!(%linked_chunk_id, "Trying to push an event with no ID");
654 return None;
655 };
656
657 let Some(event_type) = event.kind.event_type() else {
658 error!(%event_id, "Trying to save an event with no event type");
659 return None;
660 };
661
662 Some((event_id.to_string(), event_type, event))
663 };
664
665 let room_id = linked_chunk_id.room_id();
666 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
667
668 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
669 let index = at.index() + i;
671 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
672
673 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
674 let event_type = this.encode_key(keys::EVENTS, event_type);
675
676 let encoded_event = this.encode_event(&event)?;
678 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
679 }
680 }
681
682 Update::ReplaceItem { at, item: event } => {
683 let chunk_id = at.chunk_identifier().index();
684
685 let index = at.index();
686
687 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
688
689 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
691 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
692 continue;
693 };
694
695 let Some(event_type) = event.kind.event_type() else {
696 error!(%event_id, "Trying to save an event with no event type");
697 continue;
698 };
699
700 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
701 let event_type = this.encode_key(keys::EVENTS, event_type);
702
703 let encoded_event = this.encode_event(&event)?;
707 let room_id = linked_chunk_id.room_id();
708 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
709 txn.execute(
710 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
711 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
712
713 txn.execute(
715 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
716 (event_id, &hashed_linked_chunk_id, chunk_id, index)
717 )?;
718 }
719
720 Update::RemoveItem { at } => {
721 let chunk_id = at.chunk_identifier().index();
722 let index = at.index();
723
724 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
725
726 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
728
729 txn.execute(
828 r#"
829 UPDATE event_chunks
830 SET position = -(position - 1)
831 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
832 "#,
833 (&hashed_linked_chunk_id, chunk_id, index)
834 )?;
835 txn.execute(
836 r#"
837 UPDATE event_chunks
838 SET position = -position
839 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
840 "#,
841 (&hashed_linked_chunk_id, chunk_id)
842 )?;
843 }
844
845 Update::DetachLastItems { at } => {
846 let chunk_id = at.chunk_identifier().index();
847 let index = at.index();
848
849 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
850
851 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
853 }
854
855 Update::Clear => {
856 trace!(%linked_chunk_id, "clearing items");
857
858 txn.execute(
860 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
861 (&hashed_linked_chunk_id,),
862 )?;
863 }
864
865 Update::StartReattachItems | Update::EndReattachItems => {
866 }
868 }
869 }
870
871 Ok(())
872 })
873 .await?;
874
875 Ok(())
876 }
877
878 #[instrument(skip(self))]
879 async fn load_all_chunks(
880 &self,
881 linked_chunk_id: LinkedChunkId<'_>,
882 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
883 let _timer = timer!("method");
884
885 let hashed_linked_chunk_id =
886 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
887
888 let this = self.clone();
889
890 let result = self
891 .read()
892 .await?
893 .with_transaction(move |txn| -> Result<_> {
894 let mut items = Vec::new();
895
896 for data in txn
898 .prepare(
899 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
900 )?
901 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
902 {
903 let (id, previous, next, chunk_type) = data?;
904 let new = txn.rebuild_chunk(
905 &this,
906 &hashed_linked_chunk_id,
907 previous,
908 id,
909 next,
910 chunk_type.as_str(),
911 )?;
912 items.push(new);
913 }
914
915 Ok(items)
916 })
917 .await?;
918
919 Ok(result)
920 }
921
922 #[instrument(skip(self))]
923 async fn load_all_chunks_metadata(
924 &self,
925 linked_chunk_id: LinkedChunkId<'_>,
926 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
927 let _timer = timer!("method");
928
929 let hashed_linked_chunk_id =
930 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
931
932 self.read()
933 .await?
934 .with_transaction(move |txn| -> Result<_> {
935 let num_events_by_chunk_ids = txn
962 .prepare(
963 r#"
964 SELECT ec.chunk_id, COUNT(ec.event_id)
965 FROM event_chunks as ec
966 WHERE ec.linked_chunk_id = ?
967 GROUP BY ec.chunk_id
968 "#,
969 )?
970 .query_map((&hashed_linked_chunk_id,), |row| {
971 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
972 })?
973 .collect::<Result<HashMap<_, _>, _>>()?;
974
975 txn.prepare(
976 r#"
977 SELECT
978 lc.id,
979 lc.previous,
980 lc.next,
981 lc.type
982 FROM linked_chunks as lc
983 WHERE lc.linked_chunk_id = ?
984 ORDER BY lc.id"#,
985 )?
986 .query_map((&hashed_linked_chunk_id,), |row| {
987 Ok((
988 row.get::<_, u64>(0)?,
989 row.get::<_, Option<u64>>(1)?,
990 row.get::<_, Option<u64>>(2)?,
991 row.get::<_, String>(3)?,
992 ))
993 })?
994 .map(|data| -> Result<_> {
995 let (id, previous, next, chunk_type) = data?;
996
997 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1003 0
1004 } else {
1005 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1006 };
1007
1008 Ok(ChunkMetadata {
1009 identifier: ChunkIdentifier::new(id),
1010 previous: previous.map(ChunkIdentifier::new),
1011 next: next.map(ChunkIdentifier::new),
1012 num_items,
1013 })
1014 })
1015 .collect::<Result<Vec<_>, _>>()
1016 })
1017 .await
1018 }
1019
1020 #[instrument(skip(self))]
1021 async fn load_last_chunk(
1022 &self,
1023 linked_chunk_id: LinkedChunkId<'_>,
1024 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1025 let _timer = timer!("method");
1026
1027 let hashed_linked_chunk_id =
1028 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1029
1030 let this = self.clone();
1031
1032 self
1033 .read()
1034 .await?
1035 .with_transaction(move |txn| -> Result<_> {
1036 let (observed_max_identifier, number_of_chunks) = txn
1038 .prepare(
1039 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1040 )?
1041 .query_row(
1042 (&hashed_linked_chunk_id,),
1043 |row| {
1044 Ok((
1045 row.get::<_, Option<u64>>(0)?,
1050 row.get::<_, u64>(1)?,
1051 ))
1052 }
1053 )?;
1054
1055 let chunk_identifier_generator = match observed_max_identifier {
1056 Some(max_observed_identifier) => {
1057 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1058 ChunkIdentifier::new(max_observed_identifier)
1059 )
1060 },
1061 None => ChunkIdentifierGenerator::new_from_scratch(),
1062 };
1063
1064 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1066 .prepare(
1067 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1068 )?
1069 .query_row(
1070 (&hashed_linked_chunk_id,),
1071 |row| {
1072 Ok((
1073 row.get::<_, u64>(0)?,
1074 row.get::<_, Option<u64>>(1)?,
1075 row.get::<_, String>(2)?,
1076 ))
1077 }
1078 )
1079 .optional()?
1080 else {
1081 if number_of_chunks == 0 {
1084 return Ok((None, chunk_identifier_generator));
1085 }
1086 else {
1091 return Err(Error::InvalidData {
1092 details:
1093 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1094 .to_owned()
1095 }
1096 )
1097 }
1098 };
1099
1100 let last_chunk = txn.rebuild_chunk(
1102 &this,
1103 &hashed_linked_chunk_id,
1104 previous_chunk,
1105 chunk_identifier,
1106 None,
1107 &chunk_type
1108 )?;
1109
1110 Ok((Some(last_chunk), chunk_identifier_generator))
1111 })
1112 .await
1113 }
1114
1115 #[instrument(skip(self))]
1116 async fn load_previous_chunk(
1117 &self,
1118 linked_chunk_id: LinkedChunkId<'_>,
1119 before_chunk_identifier: ChunkIdentifier,
1120 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1121 let _timer = timer!("method");
1122
1123 let hashed_linked_chunk_id =
1124 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1125
1126 let this = self.clone();
1127
1128 self
1129 .read()
1130 .await?
1131 .with_transaction(move |txn| -> Result<_> {
1132 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1134 .prepare(
1135 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1136 )?
1137 .query_row(
1138 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1139 |row| {
1140 Ok((
1141 row.get::<_, u64>(0)?,
1142 row.get::<_, Option<u64>>(1)?,
1143 row.get::<_, Option<u64>>(2)?,
1144 row.get::<_, String>(3)?,
1145 ))
1146 }
1147 )
1148 .optional()?
1149 else {
1150 return Ok(None);
1152 };
1153
1154 let last_chunk = txn.rebuild_chunk(
1156 &this,
1157 &hashed_linked_chunk_id,
1158 previous_chunk,
1159 chunk_identifier,
1160 next_chunk,
1161 &chunk_type
1162 )?;
1163
1164 Ok(Some(last_chunk))
1165 })
1166 .await
1167 }
1168
1169 #[instrument(skip(self))]
1170 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1171 let _timer = timer!("method");
1172
1173 self.write()
1174 .await?
1175 .with_transaction(move |txn| {
1176 txn.execute("DELETE FROM linked_chunks", ())?;
1178 txn.execute("DELETE FROM events", ())
1180 })
1181 .await?;
1182
1183 Ok(())
1184 }
1185
1186 #[instrument(skip(self, events))]
1187 async fn filter_duplicated_events(
1188 &self,
1189 linked_chunk_id: LinkedChunkId<'_>,
1190 events: Vec<OwnedEventId>,
1191 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1192 let _timer = timer!("method");
1193
1194 if events.is_empty() {
1198 return Ok(Vec::new());
1199 }
1200
1201 let hashed_linked_chunk_id =
1203 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1204 let linked_chunk_id = linked_chunk_id.to_owned();
1205
1206 self.read()
1207 .await?
1208 .with_transaction(move |txn| -> Result<_> {
1209 txn.chunk_large_query_over(events, None, move |txn, events| {
1210 let query = format!(
1211 r#"
1212 SELECT event_id, chunk_id, position
1213 FROM event_chunks
1214 WHERE linked_chunk_id = ? AND event_id IN ({})
1215 ORDER BY chunk_id ASC, position ASC
1216 "#,
1217 repeat_vars(events.len()),
1218 );
1219
1220 let parameters = params_from_iter(
1221 once(
1223 hashed_linked_chunk_id
1224 .to_sql()
1225 .unwrap(),
1227 )
1228 .chain(events.iter().map(|event| {
1230 event
1231 .as_str()
1232 .to_sql()
1233 .unwrap()
1235 })),
1236 );
1237
1238 let mut duplicated_events = Vec::new();
1239
1240 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1241 Ok((
1242 row.get::<_, String>(0)?,
1243 row.get::<_, u64>(1)?,
1244 row.get::<_, usize>(2)?,
1245 ))
1246 })? {
1247 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1248
1249 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1250 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1253 continue;
1254 };
1255
1256 duplicated_events.push((
1257 duplicated_event,
1258 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1259 ));
1260 }
1261
1262 Ok(duplicated_events)
1263 })
1264 })
1265 .await
1266 }
1267
1268 #[instrument(skip(self, event_id))]
1269 async fn find_event(
1270 &self,
1271 room_id: &RoomId,
1272 event_id: &EventId,
1273 ) -> Result<Option<Event>, Self::Error> {
1274 let _timer = timer!("method");
1275
1276 let event_id = event_id.to_owned();
1277 let this = self.clone();
1278
1279 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1280
1281 self.read()
1282 .await?
1283 .with_transaction(move |txn| -> Result<_> {
1284 let Some(event) = txn
1285 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1286 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1287 .optional()?
1288 else {
1289 return Ok(None);
1291 };
1292
1293 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1294
1295 Ok(Some(event))
1296 })
1297 .await
1298 }
1299
1300 #[instrument(skip(self, event_id, filters))]
1301 async fn find_event_relations(
1302 &self,
1303 room_id: &RoomId,
1304 event_id: &EventId,
1305 filters: Option<&[RelationType]>,
1306 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1307 let _timer = timer!("method");
1308
1309 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1310
1311 let hashed_linked_chunk_id =
1312 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1313
1314 let event_id = event_id.to_owned();
1315 let filters = filters.map(ToOwned::to_owned);
1316 let store = self.clone();
1317
1318 self.read()
1319 .await?
1320 .with_transaction(move |txn| -> Result<_> {
1321 find_event_relations_transaction(
1322 store,
1323 hashed_room_id,
1324 hashed_linked_chunk_id,
1325 event_id,
1326 filters,
1327 txn,
1328 )
1329 })
1330 .await
1331 }
1332
1333 #[instrument(skip(self))]
1334 async fn get_room_events(
1335 &self,
1336 room_id: &RoomId,
1337 event_type: Option<&str>,
1338 session_id: Option<&str>,
1339 ) -> Result<Vec<Event>, Self::Error> {
1340 let _timer = timer!("method");
1341
1342 let this = self.clone();
1343
1344 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1345 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1346 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1347
1348 self.read()
1349 .await?
1350 .with_transaction(move |txn| -> Result<_> {
1351 #[allow(clippy::redundant_clone)]
1355 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1356 (None, None) => {
1357 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1358 }
1359 (None, Some(session_id)) => (
1360 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1361 params![hashed_room_id, session_id.to_owned()],
1362 ),
1363 (Some(event_type), None) => (
1364 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1365 params![hashed_room_id, event_type.to_owned()]
1366 ),
1367 (Some(event_type), Some(session_id)) => (
1368 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1369 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1370 ),
1371 };
1372
1373 let mut statement = txn.prepare(query)?;
1374 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1375
1376 let mut events = Vec::new();
1377 for ev in maybe_events {
1378 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1379 events.push(event);
1380 }
1381
1382 Ok(events)
1383 })
1384 .await
1385 }
1386
1387 #[instrument(skip(self, event))]
1388 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1389 let _timer = timer!("method");
1390
1391 let Some(event_id) = event.event_id() else {
1392 error!("Trying to save an event with no ID");
1393 return Ok(());
1394 };
1395
1396 let Some(event_type) = event.kind.event_type() else {
1397 error!(%event_id, "Trying to save an event with no event type");
1398 return Ok(());
1399 };
1400
1401 let event_type = self.encode_key(keys::EVENTS, event_type);
1402 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1403
1404 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1405 let event_id = event_id.to_string();
1406 let encoded_event = self.encode_event(&event)?;
1407
1408 self.write()
1409 .await?
1410 .with_transaction(move |txn| -> Result<_> {
1411 txn.execute(
1412 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1413 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1414
1415 Ok(())
1416 })
1417 .await
1418 }
1419}
1420
1421fn find_event_relations_transaction(
1422 store: SqliteEventCacheStore,
1423 hashed_room_id: Key,
1424 hashed_linked_chunk_id: Key,
1425 event_id: OwnedEventId,
1426 filters: Option<Vec<RelationType>>,
1427 txn: &Transaction<'_>,
1428) -> Result<Vec<(Event, Option<Position>)>> {
1429 let get_rows = |row: &rusqlite::Row<'_>| {
1430 Ok((
1431 row.get::<_, Vec<u8>>(0)?,
1432 row.get::<_, Option<u64>>(1)?,
1433 row.get::<_, Option<usize>>(2)?,
1434 ))
1435 };
1436
1437 let collect_results = |transaction| {
1439 let mut related = Vec::new();
1440
1441 for result in transaction {
1442 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1443
1444 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1445
1446 let pos = chunk_id
1449 .zip(index)
1450 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1451
1452 related.push((event, pos));
1453 }
1454
1455 Ok(related)
1456 };
1457
1458 let related = if let Some(filters) = filters {
1459 let question_marks = repeat_vars(filters.len());
1460 let query = format!(
1461 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1462 FROM events
1463 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1464 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1465 );
1466
1467 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1472 let filters_params: Vec<_> = filter_strings
1473 .iter()
1474 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1475 .collect();
1476
1477 let parameters = params_from_iter(
1478 [
1479 hashed_linked_chunk_id.to_sql().expect(
1480 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1481 ),
1482 event_id
1483 .as_str()
1484 .to_sql()
1485 .expect("We should be able to convert an event ID to a SQLite value"),
1486 hashed_room_id
1487 .to_sql()
1488 .expect("We should be able to convert a room ID to a SQLite value"),
1489 ]
1490 .into_iter()
1491 .chain(filters_params),
1492 );
1493
1494 let mut transaction = txn.prepare(&query)?;
1495 let transaction = transaction.query_map(parameters, get_rows)?;
1496
1497 collect_results(transaction)
1498 } else {
1499 let query =
1500 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1501 FROM events
1502 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1503 WHERE relates_to = ? AND room_id = ?";
1504 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1505
1506 let mut transaction = txn.prepare(query)?;
1507 let transaction = transaction.query_map(parameters, get_rows)?;
1508
1509 collect_results(transaction)
1510 };
1511
1512 related
1513}
1514
1515async fn with_immediate_transaction<
1520 T: Send + 'static,
1521 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1522>(
1523 this: &SqliteEventCacheStore,
1524 f: F,
1525) -> Result<T, Error> {
1526 this.write()
1527 .await?
1528 .interact(move |conn| -> Result<T, Error> {
1529 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1533
1534 let code = || -> Result<T, Error> {
1535 let txn = conn.transaction()?;
1536 let res = f(&txn)?;
1537 txn.commit()?;
1538 Ok(res)
1539 };
1540
1541 let res = code();
1542
1543 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1546
1547 res
1548 })
1549 .await
1550 .unwrap()
1552}
1553
1554fn insert_chunk(
1555 txn: &Transaction<'_>,
1556 linked_chunk_id: &Key,
1557 previous: Option<u64>,
1558 new: u64,
1559 next: Option<u64>,
1560 type_str: &str,
1561) -> rusqlite::Result<()> {
1562 txn.execute(
1564 r#"
1565 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1566 VALUES (?, ?, ?, ?, ?)
1567 "#,
1568 (new, linked_chunk_id, previous, next, type_str),
1569 )?;
1570
1571 if let Some(previous) = previous {
1573 txn.execute(
1574 r#"
1575 UPDATE linked_chunks
1576 SET next = ?
1577 WHERE id = ? AND linked_chunk_id = ?
1578 "#,
1579 (new, previous, linked_chunk_id),
1580 )?;
1581 }
1582
1583 if let Some(next) = next {
1585 txn.execute(
1586 r#"
1587 UPDATE linked_chunks
1588 SET previous = ?
1589 WHERE id = ? AND linked_chunk_id = ?
1590 "#,
1591 (new, next, linked_chunk_id),
1592 )?;
1593 }
1594
1595 Ok(())
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600 use std::{
1601 path::PathBuf,
1602 sync::atomic::{AtomicU32, Ordering::SeqCst},
1603 };
1604
1605 use assert_matches::assert_matches;
1606 use matrix_sdk_base::{
1607 event_cache::{
1608 store::{
1609 integration_tests::{
1610 check_test_event, make_test_event, make_test_event_with_event_id,
1611 },
1612 EventCacheStore, EventCacheStoreError,
1613 },
1614 Gap,
1615 },
1616 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1617 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1618 };
1619 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1620 use once_cell::sync::Lazy;
1621 use ruma::{event_id, room_id};
1622 use tempfile::{tempdir, TempDir};
1623
1624 use super::SqliteEventCacheStore;
1625 use crate::{
1626 event_cache_store::keys,
1627 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1628 SqliteStoreConfig,
1629 };
1630
1631 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1632 static NUM: AtomicU32 = AtomicU32::new(0);
1633
1634 fn new_event_cache_store_workspace() -> PathBuf {
1635 let name = NUM.fetch_add(1, SeqCst).to_string();
1636 TMP_DIR.path().join(name)
1637 }
1638
1639 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1640 let tmpdir_path = new_event_cache_store_workspace();
1641
1642 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1643
1644 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1645 }
1646
1647 event_cache_store_integration_tests!();
1648 event_cache_store_integration_tests_time!();
1649
1650 #[async_test]
1651 async fn test_pool_size() {
1652 let tmpdir_path = new_event_cache_store_workspace();
1653 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1654
1655 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1656
1657 assert_eq!(store.pool.status().max_size, 42);
1658 }
1659
1660 #[async_test]
1661 async fn test_linked_chunk_new_items_chunk() {
1662 let store = get_event_cache_store().await.expect("creating cache store failed");
1663
1664 let room_id = &DEFAULT_TEST_ROOM_ID;
1665 let linked_chunk_id = LinkedChunkId::Room(room_id);
1666
1667 store
1668 .handle_linked_chunk_updates(
1669 linked_chunk_id,
1670 vec![
1671 Update::NewItemsChunk {
1672 previous: None,
1673 new: ChunkIdentifier::new(42),
1674 next: None, },
1676 Update::NewItemsChunk {
1677 previous: Some(ChunkIdentifier::new(42)),
1678 new: ChunkIdentifier::new(13),
1679 next: Some(ChunkIdentifier::new(37)), },
1682 Update::NewItemsChunk {
1683 previous: Some(ChunkIdentifier::new(13)),
1684 new: ChunkIdentifier::new(37),
1685 next: None,
1686 },
1687 ],
1688 )
1689 .await
1690 .unwrap();
1691
1692 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1693
1694 assert_eq!(chunks.len(), 3);
1695
1696 {
1697 let c = chunks.remove(0);
1699 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1700 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1701 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1702 assert_matches!(c.content, ChunkContent::Items(events) => {
1703 assert!(events.is_empty());
1704 });
1705
1706 let c = chunks.remove(0);
1707 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1708 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1709 assert_eq!(c.next, None);
1710 assert_matches!(c.content, ChunkContent::Items(events) => {
1711 assert!(events.is_empty());
1712 });
1713
1714 let c = chunks.remove(0);
1715 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1716 assert_eq!(c.previous, None);
1717 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1718 assert_matches!(c.content, ChunkContent::Items(events) => {
1719 assert!(events.is_empty());
1720 });
1721 }
1722 }
1723
1724 #[async_test]
1725 async fn test_linked_chunk_new_gap_chunk() {
1726 let store = get_event_cache_store().await.expect("creating cache store failed");
1727
1728 let room_id = &DEFAULT_TEST_ROOM_ID;
1729 let linked_chunk_id = LinkedChunkId::Room(room_id);
1730
1731 store
1732 .handle_linked_chunk_updates(
1733 linked_chunk_id,
1734 vec![Update::NewGapChunk {
1735 previous: None,
1736 new: ChunkIdentifier::new(42),
1737 next: None,
1738 gap: Gap { prev_token: "raclette".to_owned() },
1739 }],
1740 )
1741 .await
1742 .unwrap();
1743
1744 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1745
1746 assert_eq!(chunks.len(), 1);
1747
1748 let c = chunks.remove(0);
1750 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1751 assert_eq!(c.previous, None);
1752 assert_eq!(c.next, None);
1753 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1754 assert_eq!(gap.prev_token, "raclette");
1755 });
1756 }
1757
1758 #[async_test]
1759 async fn test_linked_chunk_replace_item() {
1760 let store = get_event_cache_store().await.expect("creating cache store failed");
1761
1762 let room_id = &DEFAULT_TEST_ROOM_ID;
1763 let linked_chunk_id = LinkedChunkId::Room(room_id);
1764 let event_id = event_id!("$world");
1765
1766 store
1767 .handle_linked_chunk_updates(
1768 linked_chunk_id,
1769 vec![
1770 Update::NewItemsChunk {
1771 previous: None,
1772 new: ChunkIdentifier::new(42),
1773 next: None,
1774 },
1775 Update::PushItems {
1776 at: Position::new(ChunkIdentifier::new(42), 0),
1777 items: vec![
1778 make_test_event(room_id, "hello"),
1779 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1780 ],
1781 },
1782 Update::ReplaceItem {
1783 at: Position::new(ChunkIdentifier::new(42), 1),
1784 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1785 },
1786 ],
1787 )
1788 .await
1789 .unwrap();
1790
1791 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1792
1793 assert_eq!(chunks.len(), 1);
1794
1795 let c = chunks.remove(0);
1796 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1797 assert_eq!(c.previous, None);
1798 assert_eq!(c.next, None);
1799 assert_matches!(c.content, ChunkContent::Items(events) => {
1800 assert_eq!(events.len(), 2);
1801 check_test_event(&events[0], "hello");
1802 check_test_event(&events[1], "yolo");
1803 });
1804 }
1805
1806 #[async_test]
1807 async fn test_linked_chunk_remove_chunk() {
1808 let store = get_event_cache_store().await.expect("creating cache store failed");
1809
1810 let room_id = &DEFAULT_TEST_ROOM_ID;
1811 let linked_chunk_id = LinkedChunkId::Room(room_id);
1812
1813 store
1814 .handle_linked_chunk_updates(
1815 linked_chunk_id,
1816 vec![
1817 Update::NewGapChunk {
1818 previous: None,
1819 new: ChunkIdentifier::new(42),
1820 next: None,
1821 gap: Gap { prev_token: "raclette".to_owned() },
1822 },
1823 Update::NewGapChunk {
1824 previous: Some(ChunkIdentifier::new(42)),
1825 new: ChunkIdentifier::new(43),
1826 next: None,
1827 gap: Gap { prev_token: "fondue".to_owned() },
1828 },
1829 Update::NewGapChunk {
1830 previous: Some(ChunkIdentifier::new(43)),
1831 new: ChunkIdentifier::new(44),
1832 next: None,
1833 gap: Gap { prev_token: "tartiflette".to_owned() },
1834 },
1835 Update::RemoveChunk(ChunkIdentifier::new(43)),
1836 ],
1837 )
1838 .await
1839 .unwrap();
1840
1841 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1842
1843 assert_eq!(chunks.len(), 2);
1844
1845 let c = chunks.remove(0);
1847 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1848 assert_eq!(c.previous, None);
1849 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1850 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1851 assert_eq!(gap.prev_token, "raclette");
1852 });
1853
1854 let c = chunks.remove(0);
1855 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1856 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1857 assert_eq!(c.next, None);
1858 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1859 assert_eq!(gap.prev_token, "tartiflette");
1860 });
1861
1862 let gaps = store
1864 .read()
1865 .await
1866 .unwrap()
1867 .with_transaction(|txn| -> rusqlite::Result<_> {
1868 let mut gaps = Vec::new();
1869 for data in txn
1870 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1871 .query_map((), |row| row.get::<_, u64>(0))?
1872 {
1873 gaps.push(data?);
1874 }
1875 Ok(gaps)
1876 })
1877 .await
1878 .unwrap();
1879
1880 assert_eq!(gaps, vec![42, 44]);
1881 }
1882
1883 #[async_test]
1884 async fn test_linked_chunk_push_items() {
1885 let store = get_event_cache_store().await.expect("creating cache store failed");
1886
1887 let room_id = &DEFAULT_TEST_ROOM_ID;
1888 let linked_chunk_id = LinkedChunkId::Room(room_id);
1889
1890 store
1891 .handle_linked_chunk_updates(
1892 linked_chunk_id,
1893 vec![
1894 Update::NewItemsChunk {
1895 previous: None,
1896 new: ChunkIdentifier::new(42),
1897 next: None,
1898 },
1899 Update::PushItems {
1900 at: Position::new(ChunkIdentifier::new(42), 0),
1901 items: vec![
1902 make_test_event(room_id, "hello"),
1903 make_test_event(room_id, "world"),
1904 ],
1905 },
1906 Update::PushItems {
1907 at: Position::new(ChunkIdentifier::new(42), 2),
1908 items: vec![make_test_event(room_id, "who?")],
1909 },
1910 ],
1911 )
1912 .await
1913 .unwrap();
1914
1915 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1916
1917 assert_eq!(chunks.len(), 1);
1918
1919 let c = chunks.remove(0);
1920 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1921 assert_eq!(c.previous, None);
1922 assert_eq!(c.next, None);
1923 assert_matches!(c.content, ChunkContent::Items(events) => {
1924 assert_eq!(events.len(), 3);
1925
1926 check_test_event(&events[0], "hello");
1927 check_test_event(&events[1], "world");
1928 check_test_event(&events[2], "who?");
1929 });
1930 }
1931
1932 #[async_test]
1933 async fn test_linked_chunk_remove_item() {
1934 let store = get_event_cache_store().await.expect("creating cache store failed");
1935
1936 let room_id = *DEFAULT_TEST_ROOM_ID;
1937 let linked_chunk_id = LinkedChunkId::Room(room_id);
1938
1939 store
1940 .handle_linked_chunk_updates(
1941 linked_chunk_id,
1942 vec![
1943 Update::NewItemsChunk {
1944 previous: None,
1945 new: ChunkIdentifier::new(42),
1946 next: None,
1947 },
1948 Update::PushItems {
1949 at: Position::new(ChunkIdentifier::new(42), 0),
1950 items: vec![
1951 make_test_event(room_id, "one"),
1952 make_test_event(room_id, "two"),
1953 make_test_event(room_id, "three"),
1954 make_test_event(room_id, "four"),
1955 make_test_event(room_id, "five"),
1956 make_test_event(room_id, "six"),
1957 ],
1958 },
1959 Update::RemoveItem {
1960 at: Position::new(ChunkIdentifier::new(42), 2), },
1962 ],
1963 )
1964 .await
1965 .unwrap();
1966
1967 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1968
1969 assert_eq!(chunks.len(), 1);
1970
1971 let c = chunks.remove(0);
1972 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1973 assert_eq!(c.previous, None);
1974 assert_eq!(c.next, None);
1975 assert_matches!(c.content, ChunkContent::Items(events) => {
1976 assert_eq!(events.len(), 5);
1977 check_test_event(&events[0], "one");
1978 check_test_event(&events[1], "two");
1979 check_test_event(&events[2], "four");
1980 check_test_event(&events[3], "five");
1981 check_test_event(&events[4], "six");
1982 });
1983
1984 let num_rows: u64 = store
1986 .read()
1987 .await
1988 .unwrap()
1989 .with_transaction(move |txn| {
1990 txn.query_row(
1991 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1992 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1993 |row| row.get(0),
1994 )
1995 })
1996 .await
1997 .unwrap();
1998 assert_eq!(num_rows, 3);
1999 }
2000
2001 #[async_test]
2002 async fn test_linked_chunk_detach_last_items() {
2003 let store = get_event_cache_store().await.expect("creating cache store failed");
2004
2005 let room_id = *DEFAULT_TEST_ROOM_ID;
2006 let linked_chunk_id = LinkedChunkId::Room(room_id);
2007
2008 store
2009 .handle_linked_chunk_updates(
2010 linked_chunk_id,
2011 vec![
2012 Update::NewItemsChunk {
2013 previous: None,
2014 new: ChunkIdentifier::new(42),
2015 next: None,
2016 },
2017 Update::PushItems {
2018 at: Position::new(ChunkIdentifier::new(42), 0),
2019 items: vec![
2020 make_test_event(room_id, "hello"),
2021 make_test_event(room_id, "world"),
2022 make_test_event(room_id, "howdy"),
2023 ],
2024 },
2025 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2026 ],
2027 )
2028 .await
2029 .unwrap();
2030
2031 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2032
2033 assert_eq!(chunks.len(), 1);
2034
2035 let c = chunks.remove(0);
2036 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2037 assert_eq!(c.previous, None);
2038 assert_eq!(c.next, None);
2039 assert_matches!(c.content, ChunkContent::Items(events) => {
2040 assert_eq!(events.len(), 1);
2041 check_test_event(&events[0], "hello");
2042 });
2043 }
2044
2045 #[async_test]
2046 async fn test_linked_chunk_start_end_reattach_items() {
2047 let store = get_event_cache_store().await.expect("creating cache store failed");
2048
2049 let room_id = *DEFAULT_TEST_ROOM_ID;
2050 let linked_chunk_id = LinkedChunkId::Room(room_id);
2051
2052 store
2056 .handle_linked_chunk_updates(
2057 linked_chunk_id,
2058 vec![
2059 Update::NewItemsChunk {
2060 previous: None,
2061 new: ChunkIdentifier::new(42),
2062 next: None,
2063 },
2064 Update::PushItems {
2065 at: Position::new(ChunkIdentifier::new(42), 0),
2066 items: vec![
2067 make_test_event(room_id, "hello"),
2068 make_test_event(room_id, "world"),
2069 make_test_event(room_id, "howdy"),
2070 ],
2071 },
2072 Update::StartReattachItems,
2073 Update::EndReattachItems,
2074 ],
2075 )
2076 .await
2077 .unwrap();
2078
2079 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2080
2081 assert_eq!(chunks.len(), 1);
2082
2083 let c = chunks.remove(0);
2084 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2085 assert_eq!(c.previous, None);
2086 assert_eq!(c.next, None);
2087 assert_matches!(c.content, ChunkContent::Items(events) => {
2088 assert_eq!(events.len(), 3);
2089 check_test_event(&events[0], "hello");
2090 check_test_event(&events[1], "world");
2091 check_test_event(&events[2], "howdy");
2092 });
2093 }
2094
2095 #[async_test]
2096 async fn test_linked_chunk_clear() {
2097 let store = get_event_cache_store().await.expect("creating cache store failed");
2098
2099 let room_id = *DEFAULT_TEST_ROOM_ID;
2100 let linked_chunk_id = LinkedChunkId::Room(room_id);
2101 let event_0 = make_test_event(room_id, "hello");
2102 let event_1 = make_test_event(room_id, "world");
2103 let event_2 = make_test_event(room_id, "howdy");
2104
2105 store
2106 .handle_linked_chunk_updates(
2107 linked_chunk_id,
2108 vec![
2109 Update::NewItemsChunk {
2110 previous: None,
2111 new: ChunkIdentifier::new(42),
2112 next: None,
2113 },
2114 Update::NewGapChunk {
2115 previous: Some(ChunkIdentifier::new(42)),
2116 new: ChunkIdentifier::new(54),
2117 next: None,
2118 gap: Gap { prev_token: "fondue".to_owned() },
2119 },
2120 Update::PushItems {
2121 at: Position::new(ChunkIdentifier::new(42), 0),
2122 items: vec![event_0.clone(), event_1, event_2],
2123 },
2124 Update::Clear,
2125 ],
2126 )
2127 .await
2128 .unwrap();
2129
2130 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2131 assert!(chunks.is_empty());
2132
2133 store
2135 .read()
2136 .await
2137 .unwrap()
2138 .with_transaction(|txn| -> rusqlite::Result<_> {
2139 let num_gaps = txn
2140 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2141 .query_row((), |row| row.get::<_, u64>(0))?;
2142 assert_eq!(num_gaps, 0);
2143
2144 let num_events = txn
2145 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2146 .query_row((), |row| row.get::<_, u64>(0))?;
2147 assert_eq!(num_events, 0);
2148
2149 Ok(())
2150 })
2151 .await
2152 .unwrap();
2153
2154 store
2156 .handle_linked_chunk_updates(
2157 linked_chunk_id,
2158 vec![
2159 Update::NewItemsChunk {
2160 previous: None,
2161 new: ChunkIdentifier::new(42),
2162 next: None,
2163 },
2164 Update::PushItems {
2165 at: Position::new(ChunkIdentifier::new(42), 0),
2166 items: vec![event_0],
2167 },
2168 ],
2169 )
2170 .await
2171 .unwrap();
2172 }
2173
2174 #[async_test]
2175 async fn test_linked_chunk_multiple_rooms() {
2176 let store = get_event_cache_store().await.expect("creating cache store failed");
2177
2178 let room1 = room_id!("!realcheeselovers:raclette.fr");
2179 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2180 let room2 = room_id!("!realcheeselovers:fondue.ch");
2181 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2182
2183 store
2187 .handle_linked_chunk_updates(
2188 linked_chunk_id1,
2189 vec![
2190 Update::NewItemsChunk {
2191 previous: None,
2192 new: ChunkIdentifier::new(42),
2193 next: None,
2194 },
2195 Update::PushItems {
2196 at: Position::new(ChunkIdentifier::new(42), 0),
2197 items: vec![
2198 make_test_event(room1, "best cheese is raclette"),
2199 make_test_event(room1, "obviously"),
2200 ],
2201 },
2202 ],
2203 )
2204 .await
2205 .unwrap();
2206
2207 store
2208 .handle_linked_chunk_updates(
2209 linked_chunk_id2,
2210 vec![
2211 Update::NewItemsChunk {
2212 previous: None,
2213 new: ChunkIdentifier::new(42),
2214 next: None,
2215 },
2216 Update::PushItems {
2217 at: Position::new(ChunkIdentifier::new(42), 0),
2218 items: vec![make_test_event(room1, "beaufort is the best")],
2219 },
2220 ],
2221 )
2222 .await
2223 .unwrap();
2224
2225 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2227 assert_eq!(chunks_room1.len(), 1);
2228
2229 let c = chunks_room1.remove(0);
2230 assert_matches!(c.content, ChunkContent::Items(events) => {
2231 assert_eq!(events.len(), 2);
2232 check_test_event(&events[0], "best cheese is raclette");
2233 check_test_event(&events[1], "obviously");
2234 });
2235
2236 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2238 assert_eq!(chunks_room2.len(), 1);
2239
2240 let c = chunks_room2.remove(0);
2241 assert_matches!(c.content, ChunkContent::Items(events) => {
2242 assert_eq!(events.len(), 1);
2243 check_test_event(&events[0], "beaufort is the best");
2244 });
2245 }
2246
2247 #[async_test]
2248 async fn test_linked_chunk_update_is_a_transaction() {
2249 let store = get_event_cache_store().await.expect("creating cache store failed");
2250
2251 let room_id = *DEFAULT_TEST_ROOM_ID;
2252 let linked_chunk_id = LinkedChunkId::Room(room_id);
2253
2254 let err = store
2257 .handle_linked_chunk_updates(
2258 linked_chunk_id,
2259 vec![
2260 Update::NewItemsChunk {
2261 previous: None,
2262 new: ChunkIdentifier::new(42),
2263 next: None,
2264 },
2265 Update::NewItemsChunk {
2266 previous: None,
2267 new: ChunkIdentifier::new(42),
2268 next: None,
2269 },
2270 ],
2271 )
2272 .await
2273 .unwrap_err();
2274
2275 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2277 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2278 });
2279
2280 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2284 assert!(chunks.is_empty());
2285 }
2286
2287 #[async_test]
2288 async fn test_filter_duplicate_events_no_events() {
2289 let store = get_event_cache_store().await.expect("creating cache store failed");
2290
2291 let room_id = *DEFAULT_TEST_ROOM_ID;
2292 let linked_chunk_id = LinkedChunkId::Room(room_id);
2293 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2294 assert!(duplicates.is_empty());
2295 }
2296
2297 #[async_test]
2298 async fn test_load_last_chunk() {
2299 let room_id = room_id!("!r0:matrix.org");
2300 let linked_chunk_id = LinkedChunkId::Room(room_id);
2301 let event = |msg: &str| make_test_event(room_id, msg);
2302 let store = get_event_cache_store().await.expect("creating cache store failed");
2303
2304 {
2306 let (last_chunk, chunk_identifier_generator) =
2307 store.load_last_chunk(linked_chunk_id).await.unwrap();
2308
2309 assert!(last_chunk.is_none());
2310 assert_eq!(chunk_identifier_generator.current(), 0);
2311 }
2312
2313 {
2315 store
2316 .handle_linked_chunk_updates(
2317 linked_chunk_id,
2318 vec![
2319 Update::NewItemsChunk {
2320 previous: None,
2321 new: ChunkIdentifier::new(42),
2322 next: None,
2323 },
2324 Update::PushItems {
2325 at: Position::new(ChunkIdentifier::new(42), 0),
2326 items: vec![event("saucisse de morteau"), event("comté")],
2327 },
2328 ],
2329 )
2330 .await
2331 .unwrap();
2332
2333 let (last_chunk, chunk_identifier_generator) =
2334 store.load_last_chunk(linked_chunk_id).await.unwrap();
2335
2336 assert_matches!(last_chunk, Some(last_chunk) => {
2337 assert_eq!(last_chunk.identifier, 42);
2338 assert!(last_chunk.previous.is_none());
2339 assert!(last_chunk.next.is_none());
2340 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2341 assert_eq!(items.len(), 2);
2342 check_test_event(&items[0], "saucisse de morteau");
2343 check_test_event(&items[1], "comté");
2344 });
2345 });
2346 assert_eq!(chunk_identifier_generator.current(), 42);
2347 }
2348
2349 {
2351 store
2352 .handle_linked_chunk_updates(
2353 linked_chunk_id,
2354 vec![
2355 Update::NewItemsChunk {
2356 previous: Some(ChunkIdentifier::new(42)),
2357 new: ChunkIdentifier::new(7),
2358 next: None,
2359 },
2360 Update::PushItems {
2361 at: Position::new(ChunkIdentifier::new(7), 0),
2362 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2363 },
2364 ],
2365 )
2366 .await
2367 .unwrap();
2368
2369 let (last_chunk, chunk_identifier_generator) =
2370 store.load_last_chunk(linked_chunk_id).await.unwrap();
2371
2372 assert_matches!(last_chunk, Some(last_chunk) => {
2373 assert_eq!(last_chunk.identifier, 7);
2374 assert_matches!(last_chunk.previous, Some(previous) => {
2375 assert_eq!(previous, 42);
2376 });
2377 assert!(last_chunk.next.is_none());
2378 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2379 assert_eq!(items.len(), 3);
2380 check_test_event(&items[0], "fondue");
2381 check_test_event(&items[1], "gruyère");
2382 check_test_event(&items[2], "mont d'or");
2383 });
2384 });
2385 assert_eq!(chunk_identifier_generator.current(), 42);
2386 }
2387 }
2388
2389 #[async_test]
2390 async fn test_load_last_chunk_with_a_cycle() {
2391 let room_id = room_id!("!r0:matrix.org");
2392 let linked_chunk_id = LinkedChunkId::Room(room_id);
2393 let store = get_event_cache_store().await.expect("creating cache store failed");
2394
2395 store
2396 .handle_linked_chunk_updates(
2397 linked_chunk_id,
2398 vec![
2399 Update::NewItemsChunk {
2400 previous: None,
2401 new: ChunkIdentifier::new(0),
2402 next: None,
2403 },
2404 Update::NewItemsChunk {
2405 previous: Some(ChunkIdentifier::new(0)),
2409 new: ChunkIdentifier::new(1),
2410 next: Some(ChunkIdentifier::new(0)),
2411 },
2412 ],
2413 )
2414 .await
2415 .unwrap();
2416
2417 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2418 }
2419
2420 #[async_test]
2421 async fn test_load_previous_chunk() {
2422 let room_id = room_id!("!r0:matrix.org");
2423 let linked_chunk_id = LinkedChunkId::Room(room_id);
2424 let event = |msg: &str| make_test_event(room_id, msg);
2425 let store = get_event_cache_store().await.expect("creating cache store failed");
2426
2427 {
2430 let previous_chunk = store
2431 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2432 .await
2433 .unwrap();
2434
2435 assert!(previous_chunk.is_none());
2436 }
2437
2438 {
2441 store
2442 .handle_linked_chunk_updates(
2443 linked_chunk_id,
2444 vec![Update::NewItemsChunk {
2445 previous: None,
2446 new: ChunkIdentifier::new(42),
2447 next: None,
2448 }],
2449 )
2450 .await
2451 .unwrap();
2452
2453 let previous_chunk =
2454 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2455
2456 assert!(previous_chunk.is_none());
2457 }
2458
2459 {
2461 store
2462 .handle_linked_chunk_updates(
2463 linked_chunk_id,
2464 vec![
2465 Update::NewItemsChunk {
2467 previous: None,
2468 new: ChunkIdentifier::new(7),
2469 next: Some(ChunkIdentifier::new(42)),
2470 },
2471 Update::PushItems {
2472 at: Position::new(ChunkIdentifier::new(7), 0),
2473 items: vec![event("brigand du jorat"), event("morbier")],
2474 },
2475 ],
2476 )
2477 .await
2478 .unwrap();
2479
2480 let previous_chunk =
2481 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2482
2483 assert_matches!(previous_chunk, Some(previous_chunk) => {
2484 assert_eq!(previous_chunk.identifier, 7);
2485 assert!(previous_chunk.previous.is_none());
2486 assert_matches!(previous_chunk.next, Some(next) => {
2487 assert_eq!(next, 42);
2488 });
2489 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2490 assert_eq!(items.len(), 2);
2491 check_test_event(&items[0], "brigand du jorat");
2492 check_test_event(&items[1], "morbier");
2493 });
2494 });
2495 }
2496 }
2497}
2498
2499#[cfg(test)]
2500mod encrypted_tests {
2501 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2502
2503 use matrix_sdk_base::{
2504 event_cache::store::{EventCacheStore, EventCacheStoreError},
2505 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2506 };
2507 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2508 use once_cell::sync::Lazy;
2509 use ruma::{
2510 event_id,
2511 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2512 room_id, user_id,
2513 };
2514 use tempfile::{tempdir, TempDir};
2515
2516 use super::SqliteEventCacheStore;
2517
2518 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2519 static NUM: AtomicU32 = AtomicU32::new(0);
2520
2521 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2522 let name = NUM.fetch_add(1, SeqCst).to_string();
2523 let tmpdir_path = TMP_DIR.path().join(name);
2524
2525 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2526
2527 Ok(SqliteEventCacheStore::open(
2528 tmpdir_path.to_str().unwrap(),
2529 Some("default_test_password"),
2530 )
2531 .await
2532 .unwrap())
2533 }
2534
2535 event_cache_store_integration_tests!();
2536 event_cache_store_integration_tests_time!();
2537
2538 #[async_test]
2539 async fn test_no_sqlite_injection_in_find_event_relations() {
2540 let room_id = room_id!("!test:localhost");
2541 let another_room_id = room_id!("!r1:matrix.org");
2542 let sender = user_id!("@alice:localhost");
2543
2544 let store = get_event_cache_store()
2545 .await
2546 .expect("We should be able to create a new, empty, event cache store");
2547
2548 let f = EventFactory::new().room(room_id).sender(sender);
2549
2550 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2552 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2553
2554 let edit_id = event_id!("$find_me:matrix.org");
2556 let edit = f
2557 .text_msg("Find me")
2558 .event_id(edit_id)
2559 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2560 .into_event();
2561
2562 let f = f.room(another_room_id);
2564
2565 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2566 let another_event =
2567 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2568
2569 store.save_event(room_id, event).await.unwrap();
2571 store.save_event(room_id, edit).await.unwrap();
2572 store.save_event(another_room_id, another_event).await.unwrap();
2573
2574 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2578
2579 let results = store
2581 .find_event_relations(room_id, event_id, filter.as_deref())
2582 .await
2583 .expect("We should be able to attempt to find event relations");
2584
2585 similar_asserts::assert_eq!(
2587 results.len(),
2588 1,
2589 "We should only have loaded events for the first room {results:#?}"
2590 );
2591
2592 let (found_event, _) = &results[0];
2594 assert_eq!(
2595 found_event.event_id().as_deref(),
2596 Some(edit_id),
2597 "The single event we found should be the edit event"
2598 );
2599 }
2600}