1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{extract_event_relation, EventCacheStore},
25 Event, Gap,
26 },
27 linked_chunk::{
28 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
29 Position, RawChunk, Update,
30 },
31 timer,
32};
33use matrix_sdk_store_encryption::StoreCipher;
34use ruma::{
35 events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
36};
37use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
38use tokio::{
39 fs,
40 sync::{Mutex, OwnedMutexGuard},
41};
42use tracing::{debug, error, instrument, trace};
43
44use crate::{
45 error::{Error, Result},
46 utils::{
47 repeat_vars, EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
48 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
49 },
50 OpenStoreError, Secret, SqliteStoreConfig,
51};
52
53mod keys {
54 pub const LINKED_CHUNKS: &str = "linked_chunks";
56}
57
58const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
60
61const DATABASE_VERSION: u8 = 11;
67
68const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
71const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
74
75#[derive(Clone)]
77pub struct SqliteEventCacheStore {
78 store_cipher: Option<Arc<StoreCipher>>,
79
80 pool: SqlitePool,
82
83 write_connection: Arc<Mutex<SqliteAsyncConn>>,
88}
89
90#[cfg(not(tarpaulin_include))]
91impl fmt::Debug for SqliteEventCacheStore {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
94 }
95}
96
97impl EncryptableStore for SqliteEventCacheStore {
98 fn get_cypher(&self) -> Option<&StoreCipher> {
99 self.store_cipher.as_deref()
100 }
101}
102
103impl SqliteEventCacheStore {
104 pub async fn open(
107 path: impl AsRef<Path>,
108 passphrase: Option<&str>,
109 ) -> Result<Self, OpenStoreError> {
110 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
111 }
112
113 pub async fn open_with_key(
116 path: impl AsRef<Path>,
117 key: Option<&[u8; 32]>,
118 ) -> Result<Self, OpenStoreError> {
119 Self::open_with_config(SqliteStoreConfig::new(path).key(key)).await
120 }
121
122 #[instrument(skip(config), fields(path = ?config.path))]
124 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
125 debug!(?config);
126
127 let _timer = timer!("open_with_config");
128
129 let SqliteStoreConfig { path, pool_config, runtime_config, secret } = config;
130
131 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
132
133 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
134 config.pool = Some(pool_config);
135
136 let pool = config.create_pool(Runtime::Tokio1)?;
137
138 let this = Self::open_with_pool(pool, secret).await?;
139 this.write().await?.apply_runtime_config(runtime_config).await?;
140
141 Ok(this)
142 }
143
144 async fn open_with_pool(
147 pool: SqlitePool,
148 secret: Option<Secret>,
149 ) -> Result<Self, OpenStoreError> {
150 let conn = pool.get().await?;
151
152 let version = conn.db_version().await?;
153 run_migrations(&conn, version).await?;
154
155 let store_cipher = match secret {
156 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
157 None => None,
158 };
159
160 Ok(Self {
161 store_cipher,
162 pool,
163 write_connection: Arc::new(Mutex::new(conn)),
165 })
166 }
167
168 #[instrument(skip_all)]
170 async fn read(&self) -> Result<SqliteAsyncConn> {
171 trace!("Taking a `read` connection");
172 let _timer = timer!("connection");
173
174 let connection = self.pool.get().await?;
175
176 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
181
182 Ok(connection)
183 }
184
185 #[instrument(skip_all)]
187 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
188 trace!("Taking a `write` connection");
189 let _timer = timer!("connection");
190
191 let connection = self.write_connection.clone().lock_owned().await;
192
193 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
198
199 Ok(connection)
200 }
201
202 fn map_row_to_chunk(
203 row: &rusqlite::Row<'_>,
204 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
205 Ok((
206 row.get::<_, u64>(0)?,
207 row.get::<_, Option<u64>>(1)?,
208 row.get::<_, Option<u64>>(2)?,
209 row.get::<_, String>(3)?,
210 ))
211 }
212
213 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
214 let serialized = serde_json::to_vec(event)?;
215
216 let raw_event = event.raw();
218 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
219
220 let content = self.encode_value(serialized)?;
222
223 Ok(EncodedEvent {
224 content,
225 rel_type,
226 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
227 })
228 }
229}
230
231struct EncodedEvent {
232 content: Vec<u8>,
233 rel_type: Option<String>,
234 relates_to: Option<String>,
235}
236
237trait TransactionExtForLinkedChunks {
238 fn rebuild_chunk(
239 &self,
240 store: &SqliteEventCacheStore,
241 linked_chunk_id: &Key,
242 previous: Option<u64>,
243 index: u64,
244 next: Option<u64>,
245 chunk_type: &str,
246 ) -> Result<RawChunk<Event, Gap>>;
247
248 fn load_gap_content(
249 &self,
250 store: &SqliteEventCacheStore,
251 linked_chunk_id: &Key,
252 chunk_id: ChunkIdentifier,
253 ) -> Result<Gap>;
254
255 fn load_events_content(
256 &self,
257 store: &SqliteEventCacheStore,
258 linked_chunk_id: &Key,
259 chunk_id: ChunkIdentifier,
260 ) -> Result<Vec<Event>>;
261}
262
263impl TransactionExtForLinkedChunks for Transaction<'_> {
264 fn rebuild_chunk(
265 &self,
266 store: &SqliteEventCacheStore,
267 linked_chunk_id: &Key,
268 previous: Option<u64>,
269 id: u64,
270 next: Option<u64>,
271 chunk_type: &str,
272 ) -> Result<RawChunk<Event, Gap>> {
273 let previous = previous.map(ChunkIdentifier::new);
274 let next = next.map(ChunkIdentifier::new);
275 let id = ChunkIdentifier::new(id);
276
277 match chunk_type {
278 CHUNK_TYPE_GAP_TYPE_STRING => {
279 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
281 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
282 }
283
284 CHUNK_TYPE_EVENT_TYPE_STRING => {
285 let events = self.load_events_content(store, linked_chunk_id, id)?;
287 Ok(RawChunk {
288 content: ChunkContent::Items(events),
289 previous,
290 identifier: id,
291 next,
292 })
293 }
294
295 other => {
296 Err(Error::InvalidData {
298 details: format!("a linked chunk has an unknown type {other}"),
299 })
300 }
301 }
302 }
303
304 fn load_gap_content(
305 &self,
306 store: &SqliteEventCacheStore,
307 linked_chunk_id: &Key,
308 chunk_id: ChunkIdentifier,
309 ) -> Result<Gap> {
310 let encoded_prev_token: Vec<u8> = self.query_row(
313 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
314 (chunk_id.index(), &linked_chunk_id),
315 |row| row.get(0),
316 )?;
317 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
318 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
319 Ok(Gap { prev_token })
320 }
321
322 fn load_events_content(
323 &self,
324 store: &SqliteEventCacheStore,
325 linked_chunk_id: &Key,
326 chunk_id: ChunkIdentifier,
327 ) -> Result<Vec<Event>> {
328 let mut events = Vec::new();
330
331 for event_data in self
332 .prepare(
333 r#"
334 SELECT events.content
335 FROM event_chunks ec, events
336 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
337 ORDER BY ec.position ASC
338 "#,
339 )?
340 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
341 {
342 let encoded_content = event_data?;
343 let serialized_content = store.decode_value(&encoded_content)?;
344 let event = serde_json::from_slice(&serialized_content)?;
345
346 events.push(event);
347 }
348
349 Ok(events)
350 }
351}
352
353async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
355 if version == 0 {
356 debug!("Creating database");
357 } else if version < DATABASE_VERSION {
358 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
359 } else {
360 return Ok(());
361 }
362
363 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
365
366 if version < 1 {
367 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
370 conn.with_transaction(|txn| {
371 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
372 txn.set_db_version(1)
373 })
374 .await?;
375 }
376
377 if version < 2 {
378 conn.with_transaction(|txn| {
379 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
380 txn.set_db_version(2)
381 })
382 .await?;
383 }
384
385 if version < 3 {
386 conn.with_transaction(|txn| {
387 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
388 txn.set_db_version(3)
389 })
390 .await?;
391 }
392
393 if version < 4 {
394 conn.with_transaction(|txn| {
395 txn.execute_batch(include_str!(
396 "../migrations/event_cache_store/004_ignore_policy.sql"
397 ))?;
398 txn.set_db_version(4)
399 })
400 .await?;
401 }
402
403 if version < 5 {
404 conn.with_transaction(|txn| {
405 txn.execute_batch(include_str!(
406 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
407 ))?;
408 txn.set_db_version(5)
409 })
410 .await?;
411 }
412
413 if version < 6 {
414 conn.with_transaction(|txn| {
415 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
416 txn.set_db_version(6)
417 })
418 .await?;
419 }
420
421 if version < 7 {
422 conn.with_transaction(|txn| {
423 txn.execute_batch(include_str!(
424 "../migrations/event_cache_store/007_event_chunks.sql"
425 ))?;
426 txn.set_db_version(7)
427 })
428 .await?;
429 }
430
431 if version < 8 {
432 conn.with_transaction(|txn| {
433 txn.execute_batch(include_str!(
434 "../migrations/event_cache_store/008_linked_chunk_id.sql"
435 ))?;
436 txn.set_db_version(8)
437 })
438 .await?;
439 }
440
441 if version < 9 {
442 conn.with_transaction(|txn| {
443 txn.execute_batch(include_str!(
444 "../migrations/event_cache_store/009_related_event_index.sql"
445 ))?;
446 txn.set_db_version(9)
447 })
448 .await?;
449 }
450
451 if version < 10 {
452 conn.with_transaction(|txn| {
453 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
454 txn.set_db_version(10)
455 })
456 .await?;
457
458 if version >= 1 {
459 conn.vacuum().await?;
462 }
463 }
464
465 if version < 11 {
466 conn.with_transaction(|txn| {
467 txn.execute_batch(include_str!(
468 "../migrations/event_cache_store/011_empty_event_cache.sql"
469 ))?;
470 txn.set_db_version(11)
471 })
472 .await?;
473 }
474
475 Ok(())
476}
477
478#[async_trait]
479impl EventCacheStore for SqliteEventCacheStore {
480 type Error = Error;
481
482 #[instrument(skip(self))]
483 async fn try_take_leased_lock(
484 &self,
485 lease_duration_ms: u32,
486 key: &str,
487 holder: &str,
488 ) -> Result<bool> {
489 let _timer = timer!("method");
490
491 let key = key.to_owned();
492 let holder = holder.to_owned();
493
494 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
495 let expiration = now + lease_duration_ms as u64;
496
497 let num_touched = self
498 .write()
499 .await?
500 .with_transaction(move |txn| {
501 txn.execute(
502 "INSERT INTO lease_locks (key, holder, expiration)
503 VALUES (?1, ?2, ?3)
504 ON CONFLICT (key)
505 DO
506 UPDATE SET holder = ?2, expiration = ?3
507 WHERE holder = ?2
508 OR expiration < ?4
509 ",
510 (key, holder, expiration, now),
511 )
512 })
513 .await?;
514
515 Ok(num_touched == 1)
516 }
517
518 #[instrument(skip(self, updates))]
519 async fn handle_linked_chunk_updates(
520 &self,
521 linked_chunk_id: LinkedChunkId<'_>,
522 updates: Vec<Update<Event, Gap>>,
523 ) -> Result<(), Self::Error> {
524 let _timer = timer!("method");
525
526 let hashed_linked_chunk_id =
529 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
530 let linked_chunk_id = linked_chunk_id.to_owned();
531 let this = self.clone();
532
533 with_immediate_transaction(self, move |txn| {
534 for up in updates {
535 match up {
536 Update::NewItemsChunk { previous, new, next } => {
537 let previous = previous.as_ref().map(ChunkIdentifier::index);
538 let new = new.index();
539 let next = next.as_ref().map(ChunkIdentifier::index);
540
541 trace!(
542 %linked_chunk_id,
543 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
544 );
545
546 insert_chunk(
547 txn,
548 &hashed_linked_chunk_id,
549 previous,
550 new,
551 next,
552 CHUNK_TYPE_EVENT_TYPE_STRING,
553 )?;
554 }
555
556 Update::NewGapChunk { previous, new, next, gap } => {
557 let serialized = serde_json::to_vec(&gap.prev_token)?;
558 let prev_token = this.encode_value(serialized)?;
559
560 let previous = previous.as_ref().map(ChunkIdentifier::index);
561 let new = new.index();
562 let next = next.as_ref().map(ChunkIdentifier::index);
563
564 trace!(
565 %linked_chunk_id,
566 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
567 );
568
569 insert_chunk(
571 txn,
572 &hashed_linked_chunk_id,
573 previous,
574 new,
575 next,
576 CHUNK_TYPE_GAP_TYPE_STRING,
577 )?;
578
579 txn.execute(
581 r#"
582 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
583 VALUES (?, ?, ?)
584 "#,
585 (new, &hashed_linked_chunk_id, prev_token),
586 )?;
587 }
588
589 Update::RemoveChunk(chunk_identifier) => {
590 let chunk_id = chunk_identifier.index();
591
592 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
593
594 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
596 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
597 (chunk_id, &hashed_linked_chunk_id),
598 |row| Ok((row.get(0)?, row.get(1)?))
599 )?;
600
601 if let Some(previous) = previous {
603 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
604 }
605
606 if let Some(next) = next {
608 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
609 }
610
611 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
614 }
615
616 Update::PushItems { at, items } => {
617 if items.is_empty() {
618 continue;
620 }
621
622 let chunk_id = at.chunk_identifier().index();
623
624 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
625
626 let mut chunk_statement = txn.prepare(
627 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
628 )?;
629
630 let mut content_statement = txn.prepare(
635 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
636 )?;
637
638 let invalid_event = |event: TimelineEvent| {
639 let Some(event_id) = event.event_id() else {
640 error!(%linked_chunk_id, "Trying to push an event with no ID");
641 return None;
642 };
643
644 Some((event_id.to_string(), event))
645 };
646
647 let room_id = linked_chunk_id.room_id();
648 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
649
650 for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
651 let index = at.index() + i;
653 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
654
655 let encoded_event = this.encode_event(&event)?;
657 content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
658 }
659 }
660
661 Update::ReplaceItem { at, item: event } => {
662 let chunk_id = at.chunk_identifier().index();
663
664 let index = at.index();
665
666 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
667
668 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
670 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
671 continue;
672 };
673
674 let encoded_event = this.encode_event(&event)?;
678 let room_id = linked_chunk_id.room_id();
679 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
680 txn.execute(
681 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
682 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
683
684 txn.execute(
686 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
687 (event_id, &hashed_linked_chunk_id, chunk_id, index)
688 )?;
689 }
690
691 Update::RemoveItem { at } => {
692 let chunk_id = at.chunk_identifier().index();
693 let index = at.index();
694
695 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
696
697 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
699
700 txn.execute(
799 r#"
800 UPDATE event_chunks
801 SET position = -(position - 1)
802 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
803 "#,
804 (&hashed_linked_chunk_id, chunk_id, index)
805 )?;
806 txn.execute(
807 r#"
808 UPDATE event_chunks
809 SET position = -position
810 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
811 "#,
812 (&hashed_linked_chunk_id, chunk_id)
813 )?;
814 }
815
816 Update::DetachLastItems { at } => {
817 let chunk_id = at.chunk_identifier().index();
818 let index = at.index();
819
820 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
821
822 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
824 }
825
826 Update::Clear => {
827 trace!(%linked_chunk_id, "clearing items");
828
829 txn.execute(
831 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
832 (&hashed_linked_chunk_id,),
833 )?;
834 }
835
836 Update::StartReattachItems | Update::EndReattachItems => {
837 }
839 }
840 }
841
842 Ok(())
843 })
844 .await?;
845
846 Ok(())
847 }
848
849 #[instrument(skip(self))]
850 async fn load_all_chunks(
851 &self,
852 linked_chunk_id: LinkedChunkId<'_>,
853 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
854 let _timer = timer!("method");
855
856 let hashed_linked_chunk_id =
857 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
858
859 let this = self.clone();
860
861 let result = self
862 .read()
863 .await?
864 .with_transaction(move |txn| -> Result<_> {
865 let mut items = Vec::new();
866
867 for data in txn
869 .prepare(
870 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
871 )?
872 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
873 {
874 let (id, previous, next, chunk_type) = data?;
875 let new = txn.rebuild_chunk(
876 &this,
877 &hashed_linked_chunk_id,
878 previous,
879 id,
880 next,
881 chunk_type.as_str(),
882 )?;
883 items.push(new);
884 }
885
886 Ok(items)
887 })
888 .await?;
889
890 Ok(result)
891 }
892
893 #[instrument(skip(self))]
894 async fn load_all_chunks_metadata(
895 &self,
896 linked_chunk_id: LinkedChunkId<'_>,
897 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
898 let _timer = timer!("method");
899
900 let hashed_linked_chunk_id =
901 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
902
903 self.read()
904 .await?
905 .with_transaction(move |txn| -> Result<_> {
906 let num_events_by_chunk_ids = txn
933 .prepare(
934 r#"
935 SELECT ec.chunk_id, COUNT(ec.event_id)
936 FROM event_chunks as ec
937 WHERE ec.linked_chunk_id = ?
938 GROUP BY ec.chunk_id
939 "#,
940 )?
941 .query_map((&hashed_linked_chunk_id,), |row| {
942 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
943 })?
944 .collect::<Result<HashMap<_, _>, _>>()?;
945
946 txn.prepare(
947 r#"
948 SELECT
949 lc.id,
950 lc.previous,
951 lc.next,
952 lc.type
953 FROM linked_chunks as lc
954 WHERE lc.linked_chunk_id = ?
955 ORDER BY lc.id"#,
956 )?
957 .query_map((&hashed_linked_chunk_id,), |row| {
958 Ok((
959 row.get::<_, u64>(0)?,
960 row.get::<_, Option<u64>>(1)?,
961 row.get::<_, Option<u64>>(2)?,
962 row.get::<_, String>(3)?,
963 ))
964 })?
965 .map(|data| -> Result<_> {
966 let (id, previous, next, chunk_type) = data?;
967
968 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
974 0
975 } else {
976 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
977 };
978
979 Ok(ChunkMetadata {
980 identifier: ChunkIdentifier::new(id),
981 previous: previous.map(ChunkIdentifier::new),
982 next: next.map(ChunkIdentifier::new),
983 num_items,
984 })
985 })
986 .collect::<Result<Vec<_>, _>>()
987 })
988 .await
989 }
990
991 #[instrument(skip(self))]
992 async fn load_last_chunk(
993 &self,
994 linked_chunk_id: LinkedChunkId<'_>,
995 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
996 let _timer = timer!("method");
997
998 let hashed_linked_chunk_id =
999 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1000
1001 let this = self.clone();
1002
1003 self
1004 .read()
1005 .await?
1006 .with_transaction(move |txn| -> Result<_> {
1007 let (observed_max_identifier, number_of_chunks) = txn
1009 .prepare(
1010 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1011 )?
1012 .query_row(
1013 (&hashed_linked_chunk_id,),
1014 |row| {
1015 Ok((
1016 row.get::<_, Option<u64>>(0)?,
1021 row.get::<_, u64>(1)?,
1022 ))
1023 }
1024 )?;
1025
1026 let chunk_identifier_generator = match observed_max_identifier {
1027 Some(max_observed_identifier) => {
1028 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1029 ChunkIdentifier::new(max_observed_identifier)
1030 )
1031 },
1032 None => ChunkIdentifierGenerator::new_from_scratch(),
1033 };
1034
1035 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1037 .prepare(
1038 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1039 )?
1040 .query_row(
1041 (&hashed_linked_chunk_id,),
1042 |row| {
1043 Ok((
1044 row.get::<_, u64>(0)?,
1045 row.get::<_, Option<u64>>(1)?,
1046 row.get::<_, String>(2)?,
1047 ))
1048 }
1049 )
1050 .optional()?
1051 else {
1052 if number_of_chunks == 0 {
1055 return Ok((None, chunk_identifier_generator));
1056 }
1057 else {
1062 return Err(Error::InvalidData {
1063 details:
1064 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1065 .to_owned()
1066 }
1067 )
1068 }
1069 };
1070
1071 let last_chunk = txn.rebuild_chunk(
1073 &this,
1074 &hashed_linked_chunk_id,
1075 previous_chunk,
1076 chunk_identifier,
1077 None,
1078 &chunk_type
1079 )?;
1080
1081 Ok((Some(last_chunk), chunk_identifier_generator))
1082 })
1083 .await
1084 }
1085
1086 #[instrument(skip(self))]
1087 async fn load_previous_chunk(
1088 &self,
1089 linked_chunk_id: LinkedChunkId<'_>,
1090 before_chunk_identifier: ChunkIdentifier,
1091 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1092 let _timer = timer!("method");
1093
1094 let hashed_linked_chunk_id =
1095 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1096
1097 let this = self.clone();
1098
1099 self
1100 .read()
1101 .await?
1102 .with_transaction(move |txn| -> Result<_> {
1103 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1105 .prepare(
1106 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1107 )?
1108 .query_row(
1109 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1110 |row| {
1111 Ok((
1112 row.get::<_, u64>(0)?,
1113 row.get::<_, Option<u64>>(1)?,
1114 row.get::<_, Option<u64>>(2)?,
1115 row.get::<_, String>(3)?,
1116 ))
1117 }
1118 )
1119 .optional()?
1120 else {
1121 return Ok(None);
1123 };
1124
1125 let last_chunk = txn.rebuild_chunk(
1127 &this,
1128 &hashed_linked_chunk_id,
1129 previous_chunk,
1130 chunk_identifier,
1131 next_chunk,
1132 &chunk_type
1133 )?;
1134
1135 Ok(Some(last_chunk))
1136 })
1137 .await
1138 }
1139
1140 #[instrument(skip(self))]
1141 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1142 let _timer = timer!("method");
1143
1144 self.write()
1145 .await?
1146 .with_transaction(move |txn| {
1147 txn.execute("DELETE FROM linked_chunks", ())?;
1149 txn.execute("DELETE FROM events", ())
1151 })
1152 .await?;
1153
1154 Ok(())
1155 }
1156
1157 #[instrument(skip(self, events))]
1158 async fn filter_duplicated_events(
1159 &self,
1160 linked_chunk_id: LinkedChunkId<'_>,
1161 events: Vec<OwnedEventId>,
1162 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1163 let _timer = timer!("method");
1164
1165 if events.is_empty() {
1169 return Ok(Vec::new());
1170 }
1171
1172 let hashed_linked_chunk_id =
1174 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1175 let linked_chunk_id = linked_chunk_id.to_owned();
1176
1177 self.read()
1178 .await?
1179 .with_transaction(move |txn| -> Result<_> {
1180 txn.chunk_large_query_over(events, None, move |txn, events| {
1181 let query = format!(
1182 r#"
1183 SELECT event_id, chunk_id, position
1184 FROM event_chunks
1185 WHERE linked_chunk_id = ? AND event_id IN ({})
1186 ORDER BY chunk_id ASC, position ASC
1187 "#,
1188 repeat_vars(events.len()),
1189 );
1190
1191 let parameters = params_from_iter(
1192 once(
1194 hashed_linked_chunk_id
1195 .to_sql()
1196 .unwrap(),
1198 )
1199 .chain(events.iter().map(|event| {
1201 event
1202 .as_str()
1203 .to_sql()
1204 .unwrap()
1206 })),
1207 );
1208
1209 let mut duplicated_events = Vec::new();
1210
1211 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1212 Ok((
1213 row.get::<_, String>(0)?,
1214 row.get::<_, u64>(1)?,
1215 row.get::<_, usize>(2)?,
1216 ))
1217 })? {
1218 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1219
1220 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1221 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1224 continue;
1225 };
1226
1227 duplicated_events.push((
1228 duplicated_event,
1229 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1230 ));
1231 }
1232
1233 Ok(duplicated_events)
1234 })
1235 })
1236 .await
1237 }
1238
1239 #[instrument(skip(self, event_id))]
1240 async fn find_event(
1241 &self,
1242 room_id: &RoomId,
1243 event_id: &EventId,
1244 ) -> Result<Option<Event>, Self::Error> {
1245 let _timer = timer!("method");
1246
1247 let event_id = event_id.to_owned();
1248 let this = self.clone();
1249
1250 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1251
1252 self.read()
1253 .await?
1254 .with_transaction(move |txn| -> Result<_> {
1255 let Some(event) = txn
1256 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1257 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1258 .optional()?
1259 else {
1260 return Ok(None);
1262 };
1263
1264 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1265
1266 Ok(Some(event))
1267 })
1268 .await
1269 }
1270
1271 #[instrument(skip(self, event_id, filters))]
1272 async fn find_event_relations(
1273 &self,
1274 room_id: &RoomId,
1275 event_id: &EventId,
1276 filters: Option<&[RelationType]>,
1277 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1278 let _timer = timer!("method");
1279
1280 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1281
1282 let hashed_linked_chunk_id =
1283 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1284
1285 let event_id = event_id.to_owned();
1286 let filters = filters.map(ToOwned::to_owned);
1287 let store = self.clone();
1288
1289 self.read()
1290 .await?
1291 .with_transaction(move |txn| -> Result<_> {
1292 find_event_relations_transaction(
1293 store,
1294 hashed_room_id,
1295 hashed_linked_chunk_id,
1296 event_id,
1297 filters,
1298 txn,
1299 )
1300 })
1301 .await
1302 }
1303
1304 #[instrument(skip(self))]
1305 async fn get_room_events(&self, room_id: &RoomId) -> Result<Vec<Event>, Self::Error> {
1306 let _timer = timer!("method");
1307
1308 let this = self.clone();
1309
1310 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1311
1312 self.read()
1313 .await?
1314 .with_transaction(move |txn| -> Result<_> {
1315 let mut statement = txn.prepare("SELECT content FROM events WHERE room_id = ?")?;
1316 let maybe_events =
1317 statement.query_map((hashed_room_id,), |row| row.get::<_, Vec<u8>>(0))?;
1318
1319 let mut events = Vec::new();
1320 for ev in maybe_events {
1321 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1322 events.push(event);
1323 }
1324
1325 Ok(events)
1326 })
1327 .await
1328 }
1329
1330 #[instrument(skip(self, event))]
1331 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1332 let _timer = timer!("method");
1333
1334 let Some(event_id) = event.event_id() else {
1335 error!(%room_id, "Trying to save an event with no ID");
1336 return Ok(());
1337 };
1338
1339 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1340 let event_id = event_id.to_string();
1341 let encoded_event = self.encode_event(&event)?;
1342
1343 self.write()
1344 .await?
1345 .with_transaction(move |txn| -> Result<_> {
1346 txn.execute(
1347 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1348 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1349
1350 Ok(())
1351 })
1352 .await
1353 }
1354}
1355
1356fn find_event_relations_transaction(
1357 store: SqliteEventCacheStore,
1358 hashed_room_id: Key,
1359 hashed_linked_chunk_id: Key,
1360 event_id: OwnedEventId,
1361 filters: Option<Vec<RelationType>>,
1362 txn: &Transaction<'_>,
1363) -> Result<Vec<(Event, Option<Position>)>> {
1364 let get_rows = |row: &rusqlite::Row<'_>| {
1365 Ok((
1366 row.get::<_, Vec<u8>>(0)?,
1367 row.get::<_, Option<u64>>(1)?,
1368 row.get::<_, Option<usize>>(2)?,
1369 ))
1370 };
1371
1372 let collect_results = |transaction| {
1374 let mut related = Vec::new();
1375
1376 for result in transaction {
1377 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1378
1379 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1380
1381 let pos = chunk_id
1384 .zip(index)
1385 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1386
1387 related.push((event, pos));
1388 }
1389
1390 Ok(related)
1391 };
1392
1393 let related = if let Some(filters) = filters {
1394 let question_marks = repeat_vars(filters.len());
1395 let query = format!(
1396 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1397 FROM events
1398 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1399 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1400 );
1401
1402 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1407 let filters_params: Vec<_> = filter_strings
1408 .iter()
1409 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1410 .collect();
1411
1412 let parameters = params_from_iter(
1413 [
1414 hashed_linked_chunk_id.to_sql().expect(
1415 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1416 ),
1417 event_id
1418 .as_str()
1419 .to_sql()
1420 .expect("We should be able to convert an event ID to a SQLite value"),
1421 hashed_room_id
1422 .to_sql()
1423 .expect("We should be able to convert a room ID to a SQLite value"),
1424 ]
1425 .into_iter()
1426 .chain(filters_params),
1427 );
1428
1429 let mut transaction = txn.prepare(&query)?;
1430 let transaction = transaction.query_map(parameters, get_rows)?;
1431
1432 collect_results(transaction)
1433 } else {
1434 let query =
1435 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1436 FROM events
1437 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1438 WHERE relates_to = ? AND room_id = ?";
1439 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1440
1441 let mut transaction = txn.prepare(query)?;
1442 let transaction = transaction.query_map(parameters, get_rows)?;
1443
1444 collect_results(transaction)
1445 };
1446
1447 related
1448}
1449
1450async fn with_immediate_transaction<
1455 T: Send + 'static,
1456 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1457>(
1458 this: &SqliteEventCacheStore,
1459 f: F,
1460) -> Result<T, Error> {
1461 this.write()
1462 .await?
1463 .interact(move |conn| -> Result<T, Error> {
1464 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1468
1469 let code = || -> Result<T, Error> {
1470 let txn = conn.transaction()?;
1471 let res = f(&txn)?;
1472 txn.commit()?;
1473 Ok(res)
1474 };
1475
1476 let res = code();
1477
1478 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1481
1482 res
1483 })
1484 .await
1485 .unwrap()
1487}
1488
1489fn insert_chunk(
1490 txn: &Transaction<'_>,
1491 linked_chunk_id: &Key,
1492 previous: Option<u64>,
1493 new: u64,
1494 next: Option<u64>,
1495 type_str: &str,
1496) -> rusqlite::Result<()> {
1497 txn.execute(
1499 r#"
1500 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1501 VALUES (?, ?, ?, ?, ?)
1502 "#,
1503 (new, linked_chunk_id, previous, next, type_str),
1504 )?;
1505
1506 if let Some(previous) = previous {
1508 txn.execute(
1509 r#"
1510 UPDATE linked_chunks
1511 SET next = ?
1512 WHERE id = ? AND linked_chunk_id = ?
1513 "#,
1514 (new, previous, linked_chunk_id),
1515 )?;
1516 }
1517
1518 if let Some(next) = next {
1520 txn.execute(
1521 r#"
1522 UPDATE linked_chunks
1523 SET previous = ?
1524 WHERE id = ? AND linked_chunk_id = ?
1525 "#,
1526 (new, next, linked_chunk_id),
1527 )?;
1528 }
1529
1530 Ok(())
1531}
1532
1533#[cfg(test)]
1534mod tests {
1535 use std::{
1536 path::PathBuf,
1537 sync::atomic::{AtomicU32, Ordering::SeqCst},
1538 };
1539
1540 use assert_matches::assert_matches;
1541 use matrix_sdk_base::{
1542 event_cache::{
1543 store::{
1544 integration_tests::{
1545 check_test_event, make_test_event, make_test_event_with_event_id,
1546 },
1547 EventCacheStore, EventCacheStoreError,
1548 },
1549 Gap,
1550 },
1551 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1552 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1553 };
1554 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1555 use once_cell::sync::Lazy;
1556 use ruma::{event_id, room_id};
1557 use tempfile::{tempdir, TempDir};
1558
1559 use super::SqliteEventCacheStore;
1560 use crate::{
1561 event_cache_store::keys,
1562 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1563 SqliteStoreConfig,
1564 };
1565
1566 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1567 static NUM: AtomicU32 = AtomicU32::new(0);
1568
1569 fn new_event_cache_store_workspace() -> PathBuf {
1570 let name = NUM.fetch_add(1, SeqCst).to_string();
1571 TMP_DIR.path().join(name)
1572 }
1573
1574 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1575 let tmpdir_path = new_event_cache_store_workspace();
1576
1577 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1578
1579 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1580 }
1581
1582 event_cache_store_integration_tests!();
1583 event_cache_store_integration_tests_time!();
1584
1585 #[async_test]
1586 async fn test_pool_size() {
1587 let tmpdir_path = new_event_cache_store_workspace();
1588 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1589
1590 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1591
1592 assert_eq!(store.pool.status().max_size, 42);
1593 }
1594
1595 #[async_test]
1596 async fn test_linked_chunk_new_items_chunk() {
1597 let store = get_event_cache_store().await.expect("creating cache store failed");
1598
1599 let room_id = &DEFAULT_TEST_ROOM_ID;
1600 let linked_chunk_id = LinkedChunkId::Room(room_id);
1601
1602 store
1603 .handle_linked_chunk_updates(
1604 linked_chunk_id,
1605 vec![
1606 Update::NewItemsChunk {
1607 previous: None,
1608 new: ChunkIdentifier::new(42),
1609 next: None, },
1611 Update::NewItemsChunk {
1612 previous: Some(ChunkIdentifier::new(42)),
1613 new: ChunkIdentifier::new(13),
1614 next: Some(ChunkIdentifier::new(37)), },
1617 Update::NewItemsChunk {
1618 previous: Some(ChunkIdentifier::new(13)),
1619 new: ChunkIdentifier::new(37),
1620 next: None,
1621 },
1622 ],
1623 )
1624 .await
1625 .unwrap();
1626
1627 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1628
1629 assert_eq!(chunks.len(), 3);
1630
1631 {
1632 let c = chunks.remove(0);
1634 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1635 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1636 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1637 assert_matches!(c.content, ChunkContent::Items(events) => {
1638 assert!(events.is_empty());
1639 });
1640
1641 let c = chunks.remove(0);
1642 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1643 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1644 assert_eq!(c.next, None);
1645 assert_matches!(c.content, ChunkContent::Items(events) => {
1646 assert!(events.is_empty());
1647 });
1648
1649 let c = chunks.remove(0);
1650 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1651 assert_eq!(c.previous, None);
1652 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1653 assert_matches!(c.content, ChunkContent::Items(events) => {
1654 assert!(events.is_empty());
1655 });
1656 }
1657 }
1658
1659 #[async_test]
1660 async fn test_linked_chunk_new_gap_chunk() {
1661 let store = get_event_cache_store().await.expect("creating cache store failed");
1662
1663 let room_id = &DEFAULT_TEST_ROOM_ID;
1664 let linked_chunk_id = LinkedChunkId::Room(room_id);
1665
1666 store
1667 .handle_linked_chunk_updates(
1668 linked_chunk_id,
1669 vec![Update::NewGapChunk {
1670 previous: None,
1671 new: ChunkIdentifier::new(42),
1672 next: None,
1673 gap: Gap { prev_token: "raclette".to_owned() },
1674 }],
1675 )
1676 .await
1677 .unwrap();
1678
1679 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1680
1681 assert_eq!(chunks.len(), 1);
1682
1683 let c = chunks.remove(0);
1685 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1686 assert_eq!(c.previous, None);
1687 assert_eq!(c.next, None);
1688 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1689 assert_eq!(gap.prev_token, "raclette");
1690 });
1691 }
1692
1693 #[async_test]
1694 async fn test_linked_chunk_replace_item() {
1695 let store = get_event_cache_store().await.expect("creating cache store failed");
1696
1697 let room_id = &DEFAULT_TEST_ROOM_ID;
1698 let linked_chunk_id = LinkedChunkId::Room(room_id);
1699 let event_id = event_id!("$world");
1700
1701 store
1702 .handle_linked_chunk_updates(
1703 linked_chunk_id,
1704 vec![
1705 Update::NewItemsChunk {
1706 previous: None,
1707 new: ChunkIdentifier::new(42),
1708 next: None,
1709 },
1710 Update::PushItems {
1711 at: Position::new(ChunkIdentifier::new(42), 0),
1712 items: vec![
1713 make_test_event(room_id, "hello"),
1714 make_test_event_with_event_id(room_id, "world", Some(event_id)),
1715 ],
1716 },
1717 Update::ReplaceItem {
1718 at: Position::new(ChunkIdentifier::new(42), 1),
1719 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
1720 },
1721 ],
1722 )
1723 .await
1724 .unwrap();
1725
1726 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1727
1728 assert_eq!(chunks.len(), 1);
1729
1730 let c = chunks.remove(0);
1731 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1732 assert_eq!(c.previous, None);
1733 assert_eq!(c.next, None);
1734 assert_matches!(c.content, ChunkContent::Items(events) => {
1735 assert_eq!(events.len(), 2);
1736 check_test_event(&events[0], "hello");
1737 check_test_event(&events[1], "yolo");
1738 });
1739 }
1740
1741 #[async_test]
1742 async fn test_linked_chunk_remove_chunk() {
1743 let store = get_event_cache_store().await.expect("creating cache store failed");
1744
1745 let room_id = &DEFAULT_TEST_ROOM_ID;
1746 let linked_chunk_id = LinkedChunkId::Room(room_id);
1747
1748 store
1749 .handle_linked_chunk_updates(
1750 linked_chunk_id,
1751 vec![
1752 Update::NewGapChunk {
1753 previous: None,
1754 new: ChunkIdentifier::new(42),
1755 next: None,
1756 gap: Gap { prev_token: "raclette".to_owned() },
1757 },
1758 Update::NewGapChunk {
1759 previous: Some(ChunkIdentifier::new(42)),
1760 new: ChunkIdentifier::new(43),
1761 next: None,
1762 gap: Gap { prev_token: "fondue".to_owned() },
1763 },
1764 Update::NewGapChunk {
1765 previous: Some(ChunkIdentifier::new(43)),
1766 new: ChunkIdentifier::new(44),
1767 next: None,
1768 gap: Gap { prev_token: "tartiflette".to_owned() },
1769 },
1770 Update::RemoveChunk(ChunkIdentifier::new(43)),
1771 ],
1772 )
1773 .await
1774 .unwrap();
1775
1776 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1777
1778 assert_eq!(chunks.len(), 2);
1779
1780 let c = chunks.remove(0);
1782 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1783 assert_eq!(c.previous, None);
1784 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1785 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1786 assert_eq!(gap.prev_token, "raclette");
1787 });
1788
1789 let c = chunks.remove(0);
1790 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1791 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1792 assert_eq!(c.next, None);
1793 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1794 assert_eq!(gap.prev_token, "tartiflette");
1795 });
1796
1797 let gaps = store
1799 .read()
1800 .await
1801 .unwrap()
1802 .with_transaction(|txn| -> rusqlite::Result<_> {
1803 let mut gaps = Vec::new();
1804 for data in txn
1805 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1806 .query_map((), |row| row.get::<_, u64>(0))?
1807 {
1808 gaps.push(data?);
1809 }
1810 Ok(gaps)
1811 })
1812 .await
1813 .unwrap();
1814
1815 assert_eq!(gaps, vec![42, 44]);
1816 }
1817
1818 #[async_test]
1819 async fn test_linked_chunk_push_items() {
1820 let store = get_event_cache_store().await.expect("creating cache store failed");
1821
1822 let room_id = &DEFAULT_TEST_ROOM_ID;
1823 let linked_chunk_id = LinkedChunkId::Room(room_id);
1824
1825 store
1826 .handle_linked_chunk_updates(
1827 linked_chunk_id,
1828 vec![
1829 Update::NewItemsChunk {
1830 previous: None,
1831 new: ChunkIdentifier::new(42),
1832 next: None,
1833 },
1834 Update::PushItems {
1835 at: Position::new(ChunkIdentifier::new(42), 0),
1836 items: vec![
1837 make_test_event(room_id, "hello"),
1838 make_test_event(room_id, "world"),
1839 ],
1840 },
1841 Update::PushItems {
1842 at: Position::new(ChunkIdentifier::new(42), 2),
1843 items: vec![make_test_event(room_id, "who?")],
1844 },
1845 ],
1846 )
1847 .await
1848 .unwrap();
1849
1850 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1851
1852 assert_eq!(chunks.len(), 1);
1853
1854 let c = chunks.remove(0);
1855 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1856 assert_eq!(c.previous, None);
1857 assert_eq!(c.next, None);
1858 assert_matches!(c.content, ChunkContent::Items(events) => {
1859 assert_eq!(events.len(), 3);
1860
1861 check_test_event(&events[0], "hello");
1862 check_test_event(&events[1], "world");
1863 check_test_event(&events[2], "who?");
1864 });
1865 }
1866
1867 #[async_test]
1868 async fn test_linked_chunk_remove_item() {
1869 let store = get_event_cache_store().await.expect("creating cache store failed");
1870
1871 let room_id = *DEFAULT_TEST_ROOM_ID;
1872 let linked_chunk_id = LinkedChunkId::Room(room_id);
1873
1874 store
1875 .handle_linked_chunk_updates(
1876 linked_chunk_id,
1877 vec![
1878 Update::NewItemsChunk {
1879 previous: None,
1880 new: ChunkIdentifier::new(42),
1881 next: None,
1882 },
1883 Update::PushItems {
1884 at: Position::new(ChunkIdentifier::new(42), 0),
1885 items: vec![
1886 make_test_event(room_id, "one"),
1887 make_test_event(room_id, "two"),
1888 make_test_event(room_id, "three"),
1889 make_test_event(room_id, "four"),
1890 make_test_event(room_id, "five"),
1891 make_test_event(room_id, "six"),
1892 ],
1893 },
1894 Update::RemoveItem {
1895 at: Position::new(ChunkIdentifier::new(42), 2), },
1897 ],
1898 )
1899 .await
1900 .unwrap();
1901
1902 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1903
1904 assert_eq!(chunks.len(), 1);
1905
1906 let c = chunks.remove(0);
1907 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1908 assert_eq!(c.previous, None);
1909 assert_eq!(c.next, None);
1910 assert_matches!(c.content, ChunkContent::Items(events) => {
1911 assert_eq!(events.len(), 5);
1912 check_test_event(&events[0], "one");
1913 check_test_event(&events[1], "two");
1914 check_test_event(&events[2], "four");
1915 check_test_event(&events[3], "five");
1916 check_test_event(&events[4], "six");
1917 });
1918
1919 let num_rows: u64 = store
1921 .read()
1922 .await
1923 .unwrap()
1924 .with_transaction(move |txn| {
1925 txn.query_row(
1926 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1927 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1928 |row| row.get(0),
1929 )
1930 })
1931 .await
1932 .unwrap();
1933 assert_eq!(num_rows, 3);
1934 }
1935
1936 #[async_test]
1937 async fn test_linked_chunk_detach_last_items() {
1938 let store = get_event_cache_store().await.expect("creating cache store failed");
1939
1940 let room_id = *DEFAULT_TEST_ROOM_ID;
1941 let linked_chunk_id = LinkedChunkId::Room(room_id);
1942
1943 store
1944 .handle_linked_chunk_updates(
1945 linked_chunk_id,
1946 vec![
1947 Update::NewItemsChunk {
1948 previous: None,
1949 new: ChunkIdentifier::new(42),
1950 next: None,
1951 },
1952 Update::PushItems {
1953 at: Position::new(ChunkIdentifier::new(42), 0),
1954 items: vec![
1955 make_test_event(room_id, "hello"),
1956 make_test_event(room_id, "world"),
1957 make_test_event(room_id, "howdy"),
1958 ],
1959 },
1960 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1961 ],
1962 )
1963 .await
1964 .unwrap();
1965
1966 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1967
1968 assert_eq!(chunks.len(), 1);
1969
1970 let c = chunks.remove(0);
1971 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1972 assert_eq!(c.previous, None);
1973 assert_eq!(c.next, None);
1974 assert_matches!(c.content, ChunkContent::Items(events) => {
1975 assert_eq!(events.len(), 1);
1976 check_test_event(&events[0], "hello");
1977 });
1978 }
1979
1980 #[async_test]
1981 async fn test_linked_chunk_start_end_reattach_items() {
1982 let store = get_event_cache_store().await.expect("creating cache store failed");
1983
1984 let room_id = *DEFAULT_TEST_ROOM_ID;
1985 let linked_chunk_id = LinkedChunkId::Room(room_id);
1986
1987 store
1991 .handle_linked_chunk_updates(
1992 linked_chunk_id,
1993 vec![
1994 Update::NewItemsChunk {
1995 previous: None,
1996 new: ChunkIdentifier::new(42),
1997 next: None,
1998 },
1999 Update::PushItems {
2000 at: Position::new(ChunkIdentifier::new(42), 0),
2001 items: vec![
2002 make_test_event(room_id, "hello"),
2003 make_test_event(room_id, "world"),
2004 make_test_event(room_id, "howdy"),
2005 ],
2006 },
2007 Update::StartReattachItems,
2008 Update::EndReattachItems,
2009 ],
2010 )
2011 .await
2012 .unwrap();
2013
2014 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2015
2016 assert_eq!(chunks.len(), 1);
2017
2018 let c = chunks.remove(0);
2019 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2020 assert_eq!(c.previous, None);
2021 assert_eq!(c.next, None);
2022 assert_matches!(c.content, ChunkContent::Items(events) => {
2023 assert_eq!(events.len(), 3);
2024 check_test_event(&events[0], "hello");
2025 check_test_event(&events[1], "world");
2026 check_test_event(&events[2], "howdy");
2027 });
2028 }
2029
2030 #[async_test]
2031 async fn test_linked_chunk_clear() {
2032 let store = get_event_cache_store().await.expect("creating cache store failed");
2033
2034 let room_id = *DEFAULT_TEST_ROOM_ID;
2035 let linked_chunk_id = LinkedChunkId::Room(room_id);
2036 let event_0 = make_test_event(room_id, "hello");
2037 let event_1 = make_test_event(room_id, "world");
2038 let event_2 = make_test_event(room_id, "howdy");
2039
2040 store
2041 .handle_linked_chunk_updates(
2042 linked_chunk_id,
2043 vec![
2044 Update::NewItemsChunk {
2045 previous: None,
2046 new: ChunkIdentifier::new(42),
2047 next: None,
2048 },
2049 Update::NewGapChunk {
2050 previous: Some(ChunkIdentifier::new(42)),
2051 new: ChunkIdentifier::new(54),
2052 next: None,
2053 gap: Gap { prev_token: "fondue".to_owned() },
2054 },
2055 Update::PushItems {
2056 at: Position::new(ChunkIdentifier::new(42), 0),
2057 items: vec![event_0.clone(), event_1, event_2],
2058 },
2059 Update::Clear,
2060 ],
2061 )
2062 .await
2063 .unwrap();
2064
2065 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2066 assert!(chunks.is_empty());
2067
2068 store
2070 .read()
2071 .await
2072 .unwrap()
2073 .with_transaction(|txn| -> rusqlite::Result<_> {
2074 let num_gaps = txn
2075 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2076 .query_row((), |row| row.get::<_, u64>(0))?;
2077 assert_eq!(num_gaps, 0);
2078
2079 let num_events = txn
2080 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2081 .query_row((), |row| row.get::<_, u64>(0))?;
2082 assert_eq!(num_events, 0);
2083
2084 Ok(())
2085 })
2086 .await
2087 .unwrap();
2088
2089 store
2091 .handle_linked_chunk_updates(
2092 linked_chunk_id,
2093 vec![
2094 Update::NewItemsChunk {
2095 previous: None,
2096 new: ChunkIdentifier::new(42),
2097 next: None,
2098 },
2099 Update::PushItems {
2100 at: Position::new(ChunkIdentifier::new(42), 0),
2101 items: vec![event_0],
2102 },
2103 ],
2104 )
2105 .await
2106 .unwrap();
2107 }
2108
2109 #[async_test]
2110 async fn test_linked_chunk_multiple_rooms() {
2111 let store = get_event_cache_store().await.expect("creating cache store failed");
2112
2113 let room1 = room_id!("!realcheeselovers:raclette.fr");
2114 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2115 let room2 = room_id!("!realcheeselovers:fondue.ch");
2116 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2117
2118 store
2122 .handle_linked_chunk_updates(
2123 linked_chunk_id1,
2124 vec![
2125 Update::NewItemsChunk {
2126 previous: None,
2127 new: ChunkIdentifier::new(42),
2128 next: None,
2129 },
2130 Update::PushItems {
2131 at: Position::new(ChunkIdentifier::new(42), 0),
2132 items: vec![
2133 make_test_event(room1, "best cheese is raclette"),
2134 make_test_event(room1, "obviously"),
2135 ],
2136 },
2137 ],
2138 )
2139 .await
2140 .unwrap();
2141
2142 store
2143 .handle_linked_chunk_updates(
2144 linked_chunk_id2,
2145 vec![
2146 Update::NewItemsChunk {
2147 previous: None,
2148 new: ChunkIdentifier::new(42),
2149 next: None,
2150 },
2151 Update::PushItems {
2152 at: Position::new(ChunkIdentifier::new(42), 0),
2153 items: vec![make_test_event(room1, "beaufort is the best")],
2154 },
2155 ],
2156 )
2157 .await
2158 .unwrap();
2159
2160 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2162 assert_eq!(chunks_room1.len(), 1);
2163
2164 let c = chunks_room1.remove(0);
2165 assert_matches!(c.content, ChunkContent::Items(events) => {
2166 assert_eq!(events.len(), 2);
2167 check_test_event(&events[0], "best cheese is raclette");
2168 check_test_event(&events[1], "obviously");
2169 });
2170
2171 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2173 assert_eq!(chunks_room2.len(), 1);
2174
2175 let c = chunks_room2.remove(0);
2176 assert_matches!(c.content, ChunkContent::Items(events) => {
2177 assert_eq!(events.len(), 1);
2178 check_test_event(&events[0], "beaufort is the best");
2179 });
2180 }
2181
2182 #[async_test]
2183 async fn test_linked_chunk_update_is_a_transaction() {
2184 let store = get_event_cache_store().await.expect("creating cache store failed");
2185
2186 let room_id = *DEFAULT_TEST_ROOM_ID;
2187 let linked_chunk_id = LinkedChunkId::Room(room_id);
2188
2189 let err = store
2192 .handle_linked_chunk_updates(
2193 linked_chunk_id,
2194 vec![
2195 Update::NewItemsChunk {
2196 previous: None,
2197 new: ChunkIdentifier::new(42),
2198 next: None,
2199 },
2200 Update::NewItemsChunk {
2201 previous: None,
2202 new: ChunkIdentifier::new(42),
2203 next: None,
2204 },
2205 ],
2206 )
2207 .await
2208 .unwrap_err();
2209
2210 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2212 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2213 });
2214
2215 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2219 assert!(chunks.is_empty());
2220 }
2221
2222 #[async_test]
2223 async fn test_filter_duplicate_events_no_events() {
2224 let store = get_event_cache_store().await.expect("creating cache store failed");
2225
2226 let room_id = *DEFAULT_TEST_ROOM_ID;
2227 let linked_chunk_id = LinkedChunkId::Room(room_id);
2228 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2229 assert!(duplicates.is_empty());
2230 }
2231
2232 #[async_test]
2233 async fn test_load_last_chunk() {
2234 let room_id = room_id!("!r0:matrix.org");
2235 let linked_chunk_id = LinkedChunkId::Room(room_id);
2236 let event = |msg: &str| make_test_event(room_id, msg);
2237 let store = get_event_cache_store().await.expect("creating cache store failed");
2238
2239 {
2241 let (last_chunk, chunk_identifier_generator) =
2242 store.load_last_chunk(linked_chunk_id).await.unwrap();
2243
2244 assert!(last_chunk.is_none());
2245 assert_eq!(chunk_identifier_generator.current(), 0);
2246 }
2247
2248 {
2250 store
2251 .handle_linked_chunk_updates(
2252 linked_chunk_id,
2253 vec![
2254 Update::NewItemsChunk {
2255 previous: None,
2256 new: ChunkIdentifier::new(42),
2257 next: None,
2258 },
2259 Update::PushItems {
2260 at: Position::new(ChunkIdentifier::new(42), 0),
2261 items: vec![event("saucisse de morteau"), event("comté")],
2262 },
2263 ],
2264 )
2265 .await
2266 .unwrap();
2267
2268 let (last_chunk, chunk_identifier_generator) =
2269 store.load_last_chunk(linked_chunk_id).await.unwrap();
2270
2271 assert_matches!(last_chunk, Some(last_chunk) => {
2272 assert_eq!(last_chunk.identifier, 42);
2273 assert!(last_chunk.previous.is_none());
2274 assert!(last_chunk.next.is_none());
2275 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2276 assert_eq!(items.len(), 2);
2277 check_test_event(&items[0], "saucisse de morteau");
2278 check_test_event(&items[1], "comté");
2279 });
2280 });
2281 assert_eq!(chunk_identifier_generator.current(), 42);
2282 }
2283
2284 {
2286 store
2287 .handle_linked_chunk_updates(
2288 linked_chunk_id,
2289 vec![
2290 Update::NewItemsChunk {
2291 previous: Some(ChunkIdentifier::new(42)),
2292 new: ChunkIdentifier::new(7),
2293 next: None,
2294 },
2295 Update::PushItems {
2296 at: Position::new(ChunkIdentifier::new(7), 0),
2297 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2298 },
2299 ],
2300 )
2301 .await
2302 .unwrap();
2303
2304 let (last_chunk, chunk_identifier_generator) =
2305 store.load_last_chunk(linked_chunk_id).await.unwrap();
2306
2307 assert_matches!(last_chunk, Some(last_chunk) => {
2308 assert_eq!(last_chunk.identifier, 7);
2309 assert_matches!(last_chunk.previous, Some(previous) => {
2310 assert_eq!(previous, 42);
2311 });
2312 assert!(last_chunk.next.is_none());
2313 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2314 assert_eq!(items.len(), 3);
2315 check_test_event(&items[0], "fondue");
2316 check_test_event(&items[1], "gruyère");
2317 check_test_event(&items[2], "mont d'or");
2318 });
2319 });
2320 assert_eq!(chunk_identifier_generator.current(), 42);
2321 }
2322 }
2323
2324 #[async_test]
2325 async fn test_load_last_chunk_with_a_cycle() {
2326 let room_id = room_id!("!r0:matrix.org");
2327 let linked_chunk_id = LinkedChunkId::Room(room_id);
2328 let store = get_event_cache_store().await.expect("creating cache store failed");
2329
2330 store
2331 .handle_linked_chunk_updates(
2332 linked_chunk_id,
2333 vec![
2334 Update::NewItemsChunk {
2335 previous: None,
2336 new: ChunkIdentifier::new(0),
2337 next: None,
2338 },
2339 Update::NewItemsChunk {
2340 previous: Some(ChunkIdentifier::new(0)),
2344 new: ChunkIdentifier::new(1),
2345 next: Some(ChunkIdentifier::new(0)),
2346 },
2347 ],
2348 )
2349 .await
2350 .unwrap();
2351
2352 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2353 }
2354
2355 #[async_test]
2356 async fn test_load_previous_chunk() {
2357 let room_id = room_id!("!r0:matrix.org");
2358 let linked_chunk_id = LinkedChunkId::Room(room_id);
2359 let event = |msg: &str| make_test_event(room_id, msg);
2360 let store = get_event_cache_store().await.expect("creating cache store failed");
2361
2362 {
2365 let previous_chunk = store
2366 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2367 .await
2368 .unwrap();
2369
2370 assert!(previous_chunk.is_none());
2371 }
2372
2373 {
2376 store
2377 .handle_linked_chunk_updates(
2378 linked_chunk_id,
2379 vec![Update::NewItemsChunk {
2380 previous: None,
2381 new: ChunkIdentifier::new(42),
2382 next: None,
2383 }],
2384 )
2385 .await
2386 .unwrap();
2387
2388 let previous_chunk =
2389 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2390
2391 assert!(previous_chunk.is_none());
2392 }
2393
2394 {
2396 store
2397 .handle_linked_chunk_updates(
2398 linked_chunk_id,
2399 vec![
2400 Update::NewItemsChunk {
2402 previous: None,
2403 new: ChunkIdentifier::new(7),
2404 next: Some(ChunkIdentifier::new(42)),
2405 },
2406 Update::PushItems {
2407 at: Position::new(ChunkIdentifier::new(7), 0),
2408 items: vec![event("brigand du jorat"), event("morbier")],
2409 },
2410 ],
2411 )
2412 .await
2413 .unwrap();
2414
2415 let previous_chunk =
2416 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2417
2418 assert_matches!(previous_chunk, Some(previous_chunk) => {
2419 assert_eq!(previous_chunk.identifier, 7);
2420 assert!(previous_chunk.previous.is_none());
2421 assert_matches!(previous_chunk.next, Some(next) => {
2422 assert_eq!(next, 42);
2423 });
2424 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2425 assert_eq!(items.len(), 2);
2426 check_test_event(&items[0], "brigand du jorat");
2427 check_test_event(&items[1], "morbier");
2428 });
2429 });
2430 }
2431 }
2432}
2433
2434#[cfg(test)]
2435mod encrypted_tests {
2436 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2437
2438 use matrix_sdk_base::{
2439 event_cache::store::{EventCacheStore, EventCacheStoreError},
2440 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2441 };
2442 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2443 use once_cell::sync::Lazy;
2444 use ruma::{
2445 event_id,
2446 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2447 room_id, user_id,
2448 };
2449 use tempfile::{tempdir, TempDir};
2450
2451 use super::SqliteEventCacheStore;
2452
2453 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2454 static NUM: AtomicU32 = AtomicU32::new(0);
2455
2456 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2457 let name = NUM.fetch_add(1, SeqCst).to_string();
2458 let tmpdir_path = TMP_DIR.path().join(name);
2459
2460 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2461
2462 Ok(SqliteEventCacheStore::open(
2463 tmpdir_path.to_str().unwrap(),
2464 Some("default_test_password"),
2465 )
2466 .await
2467 .unwrap())
2468 }
2469
2470 event_cache_store_integration_tests!();
2471 event_cache_store_integration_tests_time!();
2472
2473 #[async_test]
2474 async fn test_no_sqlite_injection_in_find_event_relations() {
2475 let room_id = room_id!("!test:localhost");
2476 let another_room_id = room_id!("!r1:matrix.org");
2477 let sender = user_id!("@alice:localhost");
2478
2479 let store = get_event_cache_store()
2480 .await
2481 .expect("We should be able to create a new, empty, event cache store");
2482
2483 let f = EventFactory::new().room(room_id).sender(sender);
2484
2485 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2487 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2488
2489 let edit_id = event_id!("$find_me:matrix.org");
2491 let edit = f
2492 .text_msg("Find me")
2493 .event_id(edit_id)
2494 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2495 .into_event();
2496
2497 let f = f.room(another_room_id);
2499
2500 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2501 let another_event =
2502 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2503
2504 store.save_event(room_id, event).await.unwrap();
2506 store.save_event(room_id, edit).await.unwrap();
2507 store.save_event(another_room_id, another_event).await.unwrap();
2508
2509 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2513
2514 let results = store
2516 .find_event_relations(room_id, event_id, filter.as_deref())
2517 .await
2518 .expect("We should be able to attempt to find event relations");
2519
2520 similar_asserts::assert_eq!(
2522 results.len(),
2523 1,
2524 "We should only have loaded events for the first room {results:#?}"
2525 );
2526
2527 let (found_event, _) = &results[0];
2529 assert_eq!(
2530 found_event.event_id().as_deref(),
2531 Some(edit_id),
2532 "The single event we found should be the edit event"
2533 );
2534 }
2535}