1use std::{
18 collections::HashMap,
19 fmt,
20 iter::once,
21 path::{Path, PathBuf},
22 sync::Arc,
23};
24
25use async_trait::async_trait;
26use deadpool::managed::PoolConfig;
27use matrix_sdk_base::{
28 cross_process_lock::CrossProcessLockGeneration,
29 deserialized_responses::TimelineEvent,
30 event_cache::{
31 Event, Gap,
32 store::{EventCacheStore, extract_event_relation},
33 },
34 linked_chunk::{
35 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
36 Position, RawChunk, Update,
37 },
38 timer,
39};
40use matrix_sdk_store_encryption::StoreCipher;
41use ruma::{
42 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
43};
44use rusqlite::{
45 OptionalExtension, ToSql, Transaction, TransactionBehavior, params, params_from_iter,
46};
47use tokio::{
48 fs,
49 sync::{Mutex, OwnedMutexGuard},
50};
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54 OpenStoreError, RuntimeConfig, Secret, SqliteStoreConfig,
55 connection::{self, Connection as SqliteAsyncConn, Pool as SqlitePool, SqliteConnections},
56 error::{Error, Result},
57 utils::{
58 EncryptableStore, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
59 SqliteKeyValueStoreConnExt, SqliteTransactionExt, repeat_vars,
60 },
61};
62
63mod keys {
64 pub const LINKED_CHUNKS: &str = "linked_chunks";
66 pub const EVENTS: &str = "events";
67}
68
69const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
71
72const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
75const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
78
79#[derive(Clone)]
81pub struct SqliteEventCacheStore {
82 store_cipher: Option<Arc<StoreCipher>>,
83
84 connections: Arc<Mutex<Option<SqliteConnections>>>,
86
87 db_path: PathBuf,
89
90 pool_config: PoolConfig,
92
93 runtime_config: RuntimeConfig,
95}
96
97#[cfg(not(tarpaulin_include))]
98impl fmt::Debug for SqliteEventCacheStore {
99 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
100 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
101 }
102}
103
104impl EncryptableStore for SqliteEventCacheStore {
105 fn get_cypher(&self) -> Option<&StoreCipher> {
106 self.store_cipher.as_deref()
107 }
108}
109
110impl SqliteEventCacheStore {
111 pub async fn open(
114 path: impl AsRef<Path>,
115 passphrase: Option<&str>,
116 ) -> Result<Self, OpenStoreError> {
117 Self::open_with_config(&SqliteStoreConfig::new(path).passphrase(passphrase)).await
118 }
119
120 pub async fn open_with_key(
123 path: impl AsRef<Path>,
124 key: Option<&[u8; 32]>,
125 ) -> Result<Self, OpenStoreError> {
126 Self::open_with_config(&SqliteStoreConfig::new(path).key(key)).await
127 }
128
129 #[instrument(skip(config), fields(path = ?config.path))]
131 pub async fn open_with_config(config: &SqliteStoreConfig) -> Result<Self, OpenStoreError> {
132 debug!(?config);
133
134 let _timer = timer!("open_with_config");
135
136 fs::create_dir_all(&config.path).await.map_err(OpenStoreError::CreateDir)?;
137
138 let db_path = config.path.join(DATABASE_NAME);
139 let pool_config = config.pool_config();
140 let runtime_config = config.runtime_config();
141
142 let pool = config.build_pool_of_connections(DATABASE_NAME)?;
143
144 let this =
145 Self::open_with_pool(pool, db_path, pool_config, runtime_config, config.secret.clone())
146 .await?;
147
148 this.write().await?.apply_runtime_config(runtime_config).await?;
150
151 Ok(this)
152 }
153
154 async fn open_with_pool(
157 pool: SqlitePool,
158 db_path: PathBuf,
159 pool_config: PoolConfig,
160 runtime_config: RuntimeConfig,
161 secret: Option<Secret>,
162 ) -> Result<Self, OpenStoreError> {
163 let conn = pool.get().await?;
164
165 let version = conn.db_version().await?;
166
167 run_migrations(&conn, version).await?;
168
169 conn.wal_checkpoint().await;
170
171 let store_cipher = match secret {
172 Some(s) => Some(Arc::new(conn.get_or_create_store_cipher(s).await?)),
173 None => None,
174 };
175
176 let connections = SqliteConnections {
177 pool,
178 write_connection: Arc::new(Mutex::new(conn)),
180 };
181
182 Ok(Self {
183 store_cipher,
184 connections: Arc::new(Mutex::new(Some(connections))),
185 db_path,
186 pool_config,
187 runtime_config,
188 })
189 }
190
191 #[instrument(skip_all)]
193 async fn read(&self) -> Result<SqliteAsyncConn> {
194 let pool = {
195 let guard = self.connections.lock().await;
196 let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
197 conns.pool.clone()
198 };
199
200 let connection = pool.get().await?;
201
202 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
207
208 Ok(connection)
209 }
210
211 #[instrument(skip_all)]
213 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
214 let write_connection = {
215 let guard = self.connections.lock().await;
216 let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
217 conns.write_connection.clone()
218 };
219
220 let connection = write_connection.lock_owned().await;
221
222 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
227
228 Ok(connection)
229 }
230
231 fn map_row_to_chunk(
232 row: &rusqlite::Row<'_>,
233 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
234 Ok((
235 row.get::<_, u64>(0)?,
236 row.get::<_, Option<u64>>(1)?,
237 row.get::<_, Option<u64>>(2)?,
238 row.get::<_, String>(3)?,
239 ))
240 }
241
242 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
243 let serialized = serde_json::to_vec(event)?;
244
245 let raw_event = event.raw();
247 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
248
249 let content = self.encode_value(serialized)?;
251
252 Ok(EncodedEvent {
253 content,
254 rel_type,
255 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
256 })
257 }
258
259 pub async fn vacuum(&self) -> Result<()> {
260 let write_connection = {
261 let guard = self.connections.lock().await;
262 let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
263 conns.write_connection.clone()
264 };
265 write_connection.lock().await.vacuum().await
266 }
267
268 async fn get_db_size(&self) -> Result<Option<usize>> {
269 let pool = {
270 let guard = self.connections.lock().await;
271 let conns = guard.as_ref().ok_or(Error::StoreClosed)?;
272 conns.pool.clone()
273 };
274 Ok(Some(pool.get().await?.get_db_size().await?))
275 }
276
277 pub async fn close(&self) -> Result<()> {
278 connection::close_connections(&self.connections, "Event cache store").await;
279 Ok(())
280 }
281
282 pub async fn reopen(&self) -> Result<()> {
283 connection::reopen_connections(
284 &self.connections,
285 self.db_path.clone(),
286 self.pool_config,
287 self.runtime_config,
288 )
289 .await?;
290 Ok(())
291 }
292
293 #[cfg(test)]
295 async fn pool_max_size(&self) -> Option<usize> {
296 let guard = self.connections.lock().await;
297 guard.as_ref().map(|conns| conns.pool.status().max_size)
298 }
299}
300
301struct EncodedEvent {
302 content: Vec<u8>,
303 rel_type: Option<String>,
304 relates_to: Option<String>,
305}
306
307trait TransactionExtForLinkedChunks {
308 fn rebuild_chunk(
309 &self,
310 store: &SqliteEventCacheStore,
311 linked_chunk_id: &Key,
312 previous: Option<u64>,
313 index: u64,
314 next: Option<u64>,
315 chunk_type: &str,
316 ) -> Result<RawChunk<Event, Gap>>;
317
318 fn load_gap_content(
319 &self,
320 store: &SqliteEventCacheStore,
321 linked_chunk_id: &Key,
322 chunk_id: ChunkIdentifier,
323 ) -> Result<Gap>;
324
325 fn load_events_content(
326 &self,
327 store: &SqliteEventCacheStore,
328 linked_chunk_id: &Key,
329 chunk_id: ChunkIdentifier,
330 ) -> Result<Vec<Event>>;
331}
332
333impl TransactionExtForLinkedChunks for Transaction<'_> {
334 fn rebuild_chunk(
335 &self,
336 store: &SqliteEventCacheStore,
337 linked_chunk_id: &Key,
338 previous: Option<u64>,
339 id: u64,
340 next: Option<u64>,
341 chunk_type: &str,
342 ) -> Result<RawChunk<Event, Gap>> {
343 let previous = previous.map(ChunkIdentifier::new);
344 let next = next.map(ChunkIdentifier::new);
345 let id = ChunkIdentifier::new(id);
346
347 match chunk_type {
348 CHUNK_TYPE_GAP_TYPE_STRING => {
349 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
351 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
352 }
353
354 CHUNK_TYPE_EVENT_TYPE_STRING => {
355 let events = self.load_events_content(store, linked_chunk_id, id)?;
357 Ok(RawChunk {
358 content: ChunkContent::Items(events),
359 previous,
360 identifier: id,
361 next,
362 })
363 }
364
365 other => {
366 Err(Error::InvalidData {
368 details: format!("a linked chunk has an unknown type {other}"),
369 })
370 }
371 }
372 }
373
374 fn load_gap_content(
375 &self,
376 store: &SqliteEventCacheStore,
377 linked_chunk_id: &Key,
378 chunk_id: ChunkIdentifier,
379 ) -> Result<Gap> {
380 let encoded_prev_token: Vec<u8> = self.query_row(
383 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
384 (chunk_id.index(), &linked_chunk_id),
385 |row| row.get(0),
386 )?;
387 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
388 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
389 Ok(Gap { token: prev_token })
390 }
391
392 fn load_events_content(
393 &self,
394 store: &SqliteEventCacheStore,
395 linked_chunk_id: &Key,
396 chunk_id: ChunkIdentifier,
397 ) -> Result<Vec<Event>> {
398 let mut events = Vec::new();
400
401 for event_data in self
402 .prepare(
403 r#"
404 SELECT events.content
405 FROM event_chunks ec, events
406 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
407 ORDER BY ec.position ASC
408 "#,
409 )?
410 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
411 {
412 let encoded_content = event_data?;
413 let serialized_content = store.decode_value(&encoded_content)?;
414 let event = serde_json::from_slice(&serialized_content)?;
415
416 events.push(event);
417 }
418
419 Ok(events)
420 }
421}
422
423async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
425 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
427
428 if version < 1 {
429 debug!("Creating database");
430 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
433 conn.with_transaction(|txn| {
434 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
435 txn.set_db_version(1)
436 })
437 .await?;
438 }
439
440 if version < 2 {
441 debug!("Upgrading database to version 2");
442 conn.with_transaction(|txn| {
443 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
444 txn.set_db_version(2)
445 })
446 .await?;
447 }
448
449 if version < 3 {
450 debug!("Upgrading database to version 3");
451 conn.with_transaction(|txn| {
452 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
453 txn.set_db_version(3)
454 })
455 .await?;
456 }
457
458 if version < 4 {
459 debug!("Upgrading database to version 4");
460 conn.with_transaction(|txn| {
461 txn.execute_batch(include_str!(
462 "../migrations/event_cache_store/004_ignore_policy.sql"
463 ))?;
464 txn.set_db_version(4)
465 })
466 .await?;
467 }
468
469 if version < 5 {
470 debug!("Upgrading database to version 5");
471 conn.with_transaction(|txn| {
472 txn.execute_batch(include_str!(
473 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
474 ))?;
475 txn.set_db_version(5)
476 })
477 .await?;
478 }
479
480 if version < 6 {
481 debug!("Upgrading database to version 6");
482 conn.with_transaction(|txn| {
483 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
484 txn.set_db_version(6)
485 })
486 .await?;
487 }
488
489 if version < 7 {
490 debug!("Upgrading database to version 7");
491 conn.with_transaction(|txn| {
492 txn.execute_batch(include_str!(
493 "../migrations/event_cache_store/007_event_chunks.sql"
494 ))?;
495 txn.set_db_version(7)
496 })
497 .await?;
498 }
499
500 if version < 8 {
501 debug!("Upgrading database to version 8");
502 conn.with_transaction(|txn| {
503 txn.execute_batch(include_str!(
504 "../migrations/event_cache_store/008_linked_chunk_id.sql"
505 ))?;
506 txn.set_db_version(8)
507 })
508 .await?;
509 }
510
511 if version < 9 {
512 debug!("Upgrading database to version 9");
513 conn.with_transaction(|txn| {
514 txn.execute_batch(include_str!(
515 "../migrations/event_cache_store/009_related_event_index.sql"
516 ))?;
517 txn.set_db_version(9)
518 })
519 .await?;
520 }
521
522 if version < 10 {
523 debug!("Upgrading database to version 10");
524 conn.with_transaction(|txn| {
525 txn.execute_batch(include_str!("../migrations/event_cache_store/010_drop_media.sql"))?;
526 txn.set_db_version(10)
527 })
528 .await?;
529
530 if version >= 1 {
531 conn.vacuum().await?;
534 }
535 }
536
537 if version < 11 {
538 debug!("Upgrading database to version 11");
539 conn.with_transaction(|txn| {
540 txn.execute_batch(include_str!(
541 "../migrations/event_cache_store/011_empty_event_cache.sql"
542 ))?;
543 txn.set_db_version(11)
544 })
545 .await?;
546 }
547
548 if version < 12 {
549 debug!("Upgrading database to version 12");
550 conn.with_transaction(|txn| {
551 txn.execute_batch(include_str!(
552 "../migrations/event_cache_store/012_store_event_type.sql"
553 ))?;
554 txn.set_db_version(12)
555 })
556 .await?;
557 }
558
559 if version < 13 {
560 debug!("Upgrading database to version 13");
561 conn.with_transaction(|txn| {
562 txn.execute_batch(include_str!(
563 "../migrations/event_cache_store/013_lease_locks_with_generation.sql"
564 ))?;
565 txn.set_db_version(13)
566 })
567 .await?;
568 }
569
570 if version < 14 {
571 debug!("Upgrading database to version 14");
572 conn.with_transaction(|txn| {
573 txn.execute_batch(include_str!(
574 "../migrations/event_cache_store/014_event_chunks_event_id_index.sql"
575 ))?;
576 txn.set_db_version(14)
577 })
578 .await?;
579 }
580
581 Ok(())
582}
583
584#[async_trait]
585impl EventCacheStore for SqliteEventCacheStore {
586 type Error = Error;
587
588 #[instrument(skip(self))]
589 async fn try_take_leased_lock(
590 &self,
591 lease_duration_ms: u32,
592 key: &str,
593 holder: &str,
594 ) -> Result<Option<CrossProcessLockGeneration>> {
595 let key = key.to_owned();
596 let holder = holder.to_owned();
597
598 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
599 let expiration = now + lease_duration_ms as u64;
600
601 let generation = self
603 .write()
604 .await?
605 .with_transaction(move |txn| {
606 txn.query_row(
607 "INSERT INTO lease_locks (key, holder, expiration)
608 VALUES (?1, ?2, ?3)
609 ON CONFLICT (key)
610 DO
611 UPDATE SET
612 holder = excluded.holder,
613 expiration = excluded.expiration,
614 generation =
615 CASE holder
616 WHEN excluded.holder THEN generation
617 ELSE generation + 1
618 END
619 WHERE
620 holder = excluded.holder
621 OR expiration < ?4
622 RETURNING generation
623 ",
624 (key, holder, expiration, now),
625 |row| row.get(0),
626 )
627 .optional()
628 })
629 .await?;
630
631 Ok(generation)
632 }
633
634 #[instrument(skip(self, updates))]
635 async fn handle_linked_chunk_updates(
636 &self,
637 linked_chunk_id: LinkedChunkId<'_>,
638 updates: Vec<Update<Event, Gap>>,
639 ) -> Result<(), Self::Error> {
640 let _timer = timer!("method");
641
642 let hashed_linked_chunk_id =
645 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
646 let linked_chunk_id = linked_chunk_id.to_owned();
647 let this = self.clone();
648
649 with_immediate_transaction(self, move |txn| {
650 for up in updates {
651 match up {
652 Update::NewItemsChunk { previous, new, next } => {
653 let previous = previous.as_ref().map(ChunkIdentifier::index);
654 let new = new.index();
655 let next = next.as_ref().map(ChunkIdentifier::index);
656
657 trace!(
658 %linked_chunk_id,
659 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
660 );
661
662 insert_chunk(
663 txn,
664 &hashed_linked_chunk_id,
665 previous,
666 new,
667 next,
668 CHUNK_TYPE_EVENT_TYPE_STRING,
669 )?;
670 }
671
672 Update::NewGapChunk { previous, new, next, gap } => {
673 let serialized = serde_json::to_vec(&gap.token)?;
674 let prev_token = this.encode_value(serialized)?;
675
676 let previous = previous.as_ref().map(ChunkIdentifier::index);
677 let new = new.index();
678 let next = next.as_ref().map(ChunkIdentifier::index);
679
680 trace!(
681 %linked_chunk_id,
682 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
683 );
684
685 insert_chunk(
687 txn,
688 &hashed_linked_chunk_id,
689 previous,
690 new,
691 next,
692 CHUNK_TYPE_GAP_TYPE_STRING,
693 )?;
694
695 txn.execute(
697 r#"
698 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
699 VALUES (?, ?, ?)
700 "#,
701 (new, &hashed_linked_chunk_id, prev_token),
702 )?;
703 }
704
705 Update::RemoveChunk(chunk_identifier) => {
706 let chunk_id = chunk_identifier.index();
707
708 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
709
710 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
712 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
713 (chunk_id, &hashed_linked_chunk_id),
714 |row| Ok((row.get(0)?, row.get(1)?))
715 )?;
716
717 if let Some(previous) = previous {
719 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
720 }
721
722 if let Some(next) = next {
724 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
725 }
726
727 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
730 }
731
732 Update::PushItems { at, items } => {
733 if items.is_empty() {
734 continue;
736 }
737
738 let chunk_id = at.chunk_identifier().index();
739
740 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
741
742 let mut chunk_statement = txn.prepare(
743 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
744 )?;
745
746 let mut content_statement = txn.prepare(
751 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)"
752 )?;
753
754 let invalid_event = |event: TimelineEvent| {
755 let Some(event_id) = event.event_id() else {
756 error!(%linked_chunk_id, "Trying to push an event with no ID");
757 return None;
758 };
759
760 let Some(event_type) = event.kind.event_type() else {
761 error!(%event_id, "Trying to save an event with no event type");
762 return None;
763 };
764
765 Some((event_id.to_string(), event_type, event))
766 };
767
768 let room_id = linked_chunk_id.room_id();
769 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
770
771 for (i, (event_id, event_type, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
772 let index = at.index() + i;
774 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
775
776 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
777 let event_type = this.encode_key(keys::EVENTS, event_type);
778
779 let encoded_event = this.encode_event(&event)?;
781 content_statement.execute((&hashed_room_id, event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
782 }
783 }
784
785 Update::ReplaceItem { at, item: event } => {
786 let chunk_id = at.chunk_identifier().index();
787
788 let index = at.index();
789
790 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
791
792 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
794 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
795 continue;
796 };
797
798 let Some(event_type) = event.kind.event_type() else {
799 error!(%event_id, "Trying to save an event with no event type");
800 continue;
801 };
802
803 let session_id = event.kind.session_id().map(|s| this.encode_key(keys::EVENTS, s));
804 let event_type = this.encode_key(keys::EVENTS, event_type);
805
806 let encoded_event = this.encode_event(&event)?;
810 let room_id = linked_chunk_id.room_id();
811 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
812 txn.execute(
813 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
814 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
815
816 txn.execute(
818 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
819 (event_id, &hashed_linked_chunk_id, chunk_id, index)
820 )?;
821 }
822
823 Update::RemoveItem { at } => {
824 let chunk_id = at.chunk_identifier().index();
825 let index = at.index();
826
827 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
828
829 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
831
832 txn.execute(
931 r#"
932 UPDATE event_chunks
933 SET position = -(position - 1)
934 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
935 "#,
936 (&hashed_linked_chunk_id, chunk_id, index)
937 )?;
938 txn.execute(
939 r#"
940 UPDATE event_chunks
941 SET position = -position
942 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
943 "#,
944 (&hashed_linked_chunk_id, chunk_id)
945 )?;
946 }
947
948 Update::DetachLastItems { at } => {
949 let chunk_id = at.chunk_identifier().index();
950 let index = at.index();
951
952 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
953
954 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
956 }
957
958 Update::Clear => {
959 trace!(%linked_chunk_id, "clearing items");
960
961 txn.execute(
963 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
964 (&hashed_linked_chunk_id,),
965 )?;
966 }
967
968 Update::StartReattachItems | Update::EndReattachItems => {
969 }
971 }
972 }
973
974 Ok(())
975 })
976 .await?;
977
978 Ok(())
979 }
980
981 #[instrument(skip(self))]
982 async fn load_all_chunks(
983 &self,
984 linked_chunk_id: LinkedChunkId<'_>,
985 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
986 let _timer = timer!("method");
987
988 let hashed_linked_chunk_id =
989 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
990
991 let this = self.clone();
992
993 let result = self
994 .read()
995 .await?
996 .with_transaction(move |txn| -> Result<_> {
997 let mut items = Vec::new();
998
999 for data in txn
1001 .prepare(
1002 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
1003 )?
1004 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
1005 {
1006 let (id, previous, next, chunk_type) = data?;
1007 let new = txn.rebuild_chunk(
1008 &this,
1009 &hashed_linked_chunk_id,
1010 previous,
1011 id,
1012 next,
1013 chunk_type.as_str(),
1014 )?;
1015 items.push(new);
1016 }
1017
1018 Ok(items)
1019 })
1020 .await?;
1021
1022 Ok(result)
1023 }
1024
1025 #[instrument(skip(self))]
1026 async fn load_all_chunks_metadata(
1027 &self,
1028 linked_chunk_id: LinkedChunkId<'_>,
1029 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
1030 let _timer = timer!("method");
1031
1032 let hashed_linked_chunk_id =
1033 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1034
1035 self.read()
1036 .await?
1037 .with_transaction(move |txn| -> Result<_> {
1038 let num_events_by_chunk_ids = txn
1065 .prepare(
1066 r#"
1067 SELECT ec.chunk_id, COUNT(ec.event_id)
1068 FROM event_chunks as ec
1069 WHERE ec.linked_chunk_id = ?
1070 GROUP BY ec.chunk_id
1071 "#,
1072 )?
1073 .query_map((&hashed_linked_chunk_id,), |row| {
1074 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
1075 })?
1076 .collect::<Result<HashMap<_, _>, _>>()?;
1077
1078 txn.prepare(
1079 r#"
1080 SELECT
1081 lc.id,
1082 lc.previous,
1083 lc.next,
1084 lc.type
1085 FROM linked_chunks as lc
1086 WHERE lc.linked_chunk_id = ?
1087 ORDER BY lc.id"#,
1088 )?
1089 .query_map((&hashed_linked_chunk_id,), |row| {
1090 Ok((
1091 row.get::<_, u64>(0)?,
1092 row.get::<_, Option<u64>>(1)?,
1093 row.get::<_, Option<u64>>(2)?,
1094 row.get::<_, String>(3)?,
1095 ))
1096 })?
1097 .map(|data| -> Result<_> {
1098 let (id, previous, next, chunk_type) = data?;
1099
1100 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
1106 0
1107 } else {
1108 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
1109 };
1110
1111 Ok(ChunkMetadata {
1112 identifier: ChunkIdentifier::new(id),
1113 previous: previous.map(ChunkIdentifier::new),
1114 next: next.map(ChunkIdentifier::new),
1115 num_items,
1116 })
1117 })
1118 .collect::<Result<Vec<_>, _>>()
1119 })
1120 .await
1121 }
1122
1123 #[instrument(skip(self))]
1124 async fn load_last_chunk(
1125 &self,
1126 linked_chunk_id: LinkedChunkId<'_>,
1127 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
1128 let _timer = timer!("method");
1129
1130 let hashed_linked_chunk_id =
1131 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1132
1133 let this = self.clone();
1134
1135 self
1136 .read()
1137 .await?
1138 .with_transaction(move |txn| -> Result<_> {
1139 let (observed_max_identifier, number_of_chunks) = txn
1141 .prepare(
1142 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1143 )?
1144 .query_row(
1145 (&hashed_linked_chunk_id,),
1146 |row| {
1147 Ok((
1148 row.get::<_, Option<u64>>(0)?,
1153 row.get::<_, u64>(1)?,
1154 ))
1155 }
1156 )?;
1157
1158 let chunk_identifier_generator = match observed_max_identifier {
1159 Some(max_observed_identifier) => {
1160 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1161 ChunkIdentifier::new(max_observed_identifier)
1162 )
1163 },
1164 None => ChunkIdentifierGenerator::new_from_scratch(),
1165 };
1166
1167 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1169 .prepare(
1170 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1171 )?
1172 .query_row(
1173 (&hashed_linked_chunk_id,),
1174 |row| {
1175 Ok((
1176 row.get::<_, u64>(0)?,
1177 row.get::<_, Option<u64>>(1)?,
1178 row.get::<_, String>(2)?,
1179 ))
1180 }
1181 )
1182 .optional()?
1183 else {
1184 if number_of_chunks == 0 {
1187 return Ok((None, chunk_identifier_generator));
1188 }
1189 else {
1194 return Err(Error::InvalidData {
1195 details:
1196 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1197 .to_owned()
1198 }
1199 )
1200 }
1201 };
1202
1203 let last_chunk = txn.rebuild_chunk(
1205 &this,
1206 &hashed_linked_chunk_id,
1207 previous_chunk,
1208 chunk_identifier,
1209 None,
1210 &chunk_type
1211 )?;
1212
1213 Ok((Some(last_chunk), chunk_identifier_generator))
1214 })
1215 .await
1216 }
1217
1218 #[instrument(skip(self))]
1219 async fn load_previous_chunk(
1220 &self,
1221 linked_chunk_id: LinkedChunkId<'_>,
1222 before_chunk_identifier: ChunkIdentifier,
1223 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1224 let _timer = timer!("method");
1225
1226 let hashed_linked_chunk_id =
1227 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1228
1229 let this = self.clone();
1230
1231 self
1232 .read()
1233 .await?
1234 .with_transaction(move |txn| -> Result<_> {
1235 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1237 .prepare(
1238 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1239 )?
1240 .query_row(
1241 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1242 |row| {
1243 Ok((
1244 row.get::<_, u64>(0)?,
1245 row.get::<_, Option<u64>>(1)?,
1246 row.get::<_, Option<u64>>(2)?,
1247 row.get::<_, String>(3)?,
1248 ))
1249 }
1250 )
1251 .optional()?
1252 else {
1253 return Ok(None);
1255 };
1256
1257 let last_chunk = txn.rebuild_chunk(
1259 &this,
1260 &hashed_linked_chunk_id,
1261 previous_chunk,
1262 chunk_identifier,
1263 next_chunk,
1264 &chunk_type
1265 )?;
1266
1267 Ok(Some(last_chunk))
1268 })
1269 .await
1270 }
1271
1272 #[instrument(skip(self))]
1273 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1274 let _timer = timer!("method");
1275
1276 self.write()
1277 .await?
1278 .with_transaction(move |txn| {
1279 txn.execute("DELETE FROM linked_chunks", ())?;
1281 txn.execute("DELETE FROM events", ())
1283 })
1284 .await?;
1285
1286 Ok(())
1287 }
1288
1289 #[instrument(skip(self, events))]
1290 async fn filter_duplicated_events(
1291 &self,
1292 linked_chunk_id: LinkedChunkId<'_>,
1293 events: Vec<OwnedEventId>,
1294 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1295 let _timer = timer!("method");
1296
1297 if events.is_empty() {
1301 return Ok(Vec::new());
1302 }
1303
1304 let hashed_linked_chunk_id =
1306 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1307 let linked_chunk_id = linked_chunk_id.to_owned();
1308
1309 self.read()
1310 .await?
1311 .with_transaction(move |txn| -> Result<_> {
1312 txn.chunk_large_query_over(events, None, move |txn, events| {
1313 let query = format!(
1314 r#"
1315 SELECT event_id, chunk_id, position
1316 FROM event_chunks
1317 WHERE linked_chunk_id = ? AND event_id IN ({})
1318 ORDER BY chunk_id ASC, position ASC
1319 "#,
1320 repeat_vars(events.len()),
1321 );
1322
1323 let parameters = params_from_iter(
1324 once(
1326 hashed_linked_chunk_id
1327 .to_sql()
1328 .unwrap(),
1330 )
1331 .chain(events.iter().map(|event| {
1333 event
1334 .as_str()
1335 .to_sql()
1336 .unwrap()
1338 })),
1339 );
1340
1341 let mut duplicated_events = Vec::new();
1342
1343 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1344 Ok((
1345 row.get::<_, String>(0)?,
1346 row.get::<_, u64>(1)?,
1347 row.get::<_, usize>(2)?,
1348 ))
1349 })? {
1350 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1351
1352 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1353 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1356 continue;
1357 };
1358
1359 duplicated_events.push((
1360 duplicated_event,
1361 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1362 ));
1363 }
1364
1365 Ok(duplicated_events)
1366 })
1367 })
1368 .await
1369 }
1370
1371 #[instrument(skip(self, event_id))]
1372 async fn find_event(
1373 &self,
1374 room_id: &RoomId,
1375 event_id: &EventId,
1376 ) -> Result<Option<Event>, Self::Error> {
1377 let _timer = timer!("method");
1378
1379 let event_id = event_id.to_owned();
1380 let this = self.clone();
1381
1382 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1383
1384 self.read()
1385 .await?
1386 .with_transaction(move |txn| -> Result<_> {
1387 let Some(event) = txn
1388 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1389 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1390 .optional()?
1391 else {
1392 return Ok(None);
1394 };
1395
1396 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1397
1398 Ok(Some(event))
1399 })
1400 .await
1401 }
1402
1403 #[instrument(skip(self, event_id, filters))]
1404 async fn find_event_relations(
1405 &self,
1406 room_id: &RoomId,
1407 event_id: &EventId,
1408 filters: Option<&[RelationType]>,
1409 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1410 let _timer = timer!("method");
1411
1412 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1413
1414 let hashed_linked_chunk_id =
1415 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1416
1417 let event_id = event_id.to_owned();
1418 let filters = filters.map(ToOwned::to_owned);
1419 let store = self.clone();
1420
1421 self.read()
1422 .await?
1423 .with_transaction(move |txn| -> Result<_> {
1424 find_event_relations_transaction(
1425 store,
1426 hashed_room_id,
1427 hashed_linked_chunk_id,
1428 event_id,
1429 filters,
1430 txn,
1431 )
1432 })
1433 .await
1434 }
1435
1436 #[instrument(skip(self))]
1437 async fn get_room_events(
1438 &self,
1439 room_id: &RoomId,
1440 event_type: Option<&str>,
1441 session_id: Option<&str>,
1442 ) -> Result<Vec<Event>, Self::Error> {
1443 let _timer = timer!("method");
1444
1445 let this = self.clone();
1446
1447 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1448 let hashed_event_type = event_type.map(|e| self.encode_key(keys::EVENTS, e));
1449 let hashed_session_id = session_id.map(|s| self.encode_key(keys::EVENTS, s));
1450
1451 self.read()
1452 .await?
1453 .with_transaction(move |txn| -> Result<_> {
1454 #[allow(clippy::redundant_clone)]
1458 let (query, keys) = match (hashed_event_type, hashed_session_id) {
1459 (None, None) => {
1460 ("SELECT content FROM events WHERE room_id = ?", params![hashed_room_id])
1461 }
1462 (None, Some(session_id)) => (
1463 "SELECT content FROM events WHERE room_id = ?1 AND session_id = ?2",
1464 params![hashed_room_id, session_id.to_owned()],
1465 ),
1466 (Some(event_type), None) => (
1467 "SELECT content FROM events WHERE room_id = ? AND event_type = ?",
1468 params![hashed_room_id, event_type.to_owned()]
1469 ),
1470 (Some(event_type), Some(session_id)) => (
1471 "SELECT content FROM events WHERE room_id = ?1 AND event_type = ?2 AND session_id = ?3",
1472 params![hashed_room_id, event_type.to_owned(), session_id.to_owned()],
1473 ),
1474 };
1475
1476 let mut statement = txn.prepare(query)?;
1477 let maybe_events = statement.query_map(keys, |row| row.get::<_, Vec<u8>>(0))?;
1478
1479 let mut events = Vec::new();
1480 for ev in maybe_events {
1481 let event = serde_json::from_slice(&this.decode_value(&ev?)?)?;
1482 events.push(event);
1483 }
1484
1485 Ok(events)
1486 })
1487 .await
1488 }
1489
1490 #[instrument(skip(self, event))]
1491 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1492 let _timer = timer!("method");
1493
1494 let Some(event_id) = event.event_id() else {
1495 error!("Trying to save an event with no ID");
1496 return Ok(());
1497 };
1498
1499 let Some(event_type) = event.kind.event_type() else {
1500 error!(%event_id, "Trying to save an event with no event type");
1501 return Ok(());
1502 };
1503
1504 let event_type = self.encode_key(keys::EVENTS, event_type);
1505 let session_id = event.kind.session_id().map(|s| self.encode_key(keys::EVENTS, s));
1506
1507 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1508 let event_id = event_id.to_string();
1509 let encoded_event = self.encode_event(&event)?;
1510
1511 self.write()
1512 .await?
1513 .with_transaction(move |txn| -> Result<_> {
1514 txn.execute(
1515 "INSERT OR REPLACE INTO events(room_id, event_id, event_type, session_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?, ?, ?)",
1516 (&hashed_room_id, &event_id, event_type, session_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1517
1518 Ok(())
1519 })
1520 .await
1521 }
1522
1523 async fn close(&self) -> Result<(), Self::Error> {
1524 SqliteEventCacheStore::close(self).await
1525 }
1526
1527 async fn reopen(&self) -> Result<(), Self::Error> {
1528 SqliteEventCacheStore::reopen(self).await
1529 }
1530
1531 async fn optimize(&self) -> Result<(), Self::Error> {
1532 Ok(self.vacuum().await?)
1533 }
1534
1535 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1536 self.get_db_size().await
1537 }
1538}
1539
1540fn find_event_relations_transaction(
1541 store: SqliteEventCacheStore,
1542 hashed_room_id: Key,
1543 hashed_linked_chunk_id: Key,
1544 event_id: OwnedEventId,
1545 filters: Option<Vec<RelationType>>,
1546 txn: &Transaction<'_>,
1547) -> Result<Vec<(Event, Option<Position>)>> {
1548 let get_rows = |row: &rusqlite::Row<'_>| {
1549 Ok((
1550 row.get::<_, Vec<u8>>(0)?,
1551 row.get::<_, Option<u64>>(1)?,
1552 row.get::<_, Option<usize>>(2)?,
1553 ))
1554 };
1555
1556 let collect_results = |transaction| {
1558 let mut related = Vec::new();
1559
1560 for result in transaction {
1561 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1562
1563 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1564
1565 let pos = chunk_id
1568 .zip(index)
1569 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1570
1571 related.push((event, pos));
1572 }
1573
1574 Ok(related)
1575 };
1576
1577 if let Some(filters) = filters {
1578 let question_marks = repeat_vars(filters.len());
1579 let query = format!(
1580 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1581 FROM events
1582 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1583 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1584 );
1585
1586 let filter_strings: Vec<_> = filters.iter().map(|f| f.to_string()).collect();
1591 let filters_params: Vec<_> = filter_strings
1592 .iter()
1593 .map(|f| f.to_sql().expect("converting a string to SQL should work"))
1594 .collect();
1595
1596 let parameters = params_from_iter(
1597 [
1598 hashed_linked_chunk_id.to_sql().expect(
1599 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1600 ),
1601 event_id
1602 .as_str()
1603 .to_sql()
1604 .expect("We should be able to convert an event ID to a SQLite value"),
1605 hashed_room_id
1606 .to_sql()
1607 .expect("We should be able to convert a room ID to a SQLite value"),
1608 ]
1609 .into_iter()
1610 .chain(filters_params),
1611 );
1612
1613 let mut transaction = txn.prepare(&query)?;
1614 let transaction = transaction.query_map(parameters, get_rows)?;
1615
1616 collect_results(transaction)
1617 } else {
1618 let query =
1619 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1620 FROM events
1621 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1622 WHERE relates_to = ? AND room_id = ?";
1623 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1624
1625 let mut transaction = txn.prepare(query)?;
1626 let transaction = transaction.query_map(parameters, get_rows)?;
1627
1628 collect_results(transaction)
1629 }
1630}
1631
1632async fn with_immediate_transaction<
1637 T: Send + 'static,
1638 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1639>(
1640 this: &SqliteEventCacheStore,
1641 f: F,
1642) -> Result<T, Error> {
1643 this.write()
1644 .await?
1645 .interact(move |conn| -> Result<T, Error> {
1646 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1650
1651 let code = || -> Result<T, Error> {
1652 let txn = conn.transaction()?;
1653 let res = f(&txn)?;
1654 txn.commit()?;
1655 Ok(res)
1656 };
1657
1658 let res = code();
1659
1660 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1663
1664 res
1665 })
1666 .await
1667 .unwrap()
1669}
1670
1671fn insert_chunk(
1672 txn: &Transaction<'_>,
1673 linked_chunk_id: &Key,
1674 previous: Option<u64>,
1675 new: u64,
1676 next: Option<u64>,
1677 type_str: &str,
1678) -> rusqlite::Result<()> {
1679 txn.execute(
1681 r#"
1682 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1683 VALUES (?, ?, ?, ?, ?)
1684 "#,
1685 (new, linked_chunk_id, previous, next, type_str),
1686 )?;
1687
1688 if let Some(previous) = previous {
1690 let updated = txn.execute(
1691 r#"
1692 UPDATE linked_chunks
1693 SET next = ?
1694 WHERE id = ? AND linked_chunk_id = ?
1695 "#,
1696 (new, previous, linked_chunk_id),
1697 )?;
1698 if updated < 1 {
1699 return Err(rusqlite::Error::QueryReturnedNoRows);
1700 }
1701 if updated > 1 {
1702 return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1703 }
1704 }
1705
1706 if let Some(next) = next {
1708 let updated = txn.execute(
1709 r#"
1710 UPDATE linked_chunks
1711 SET previous = ?
1712 WHERE id = ? AND linked_chunk_id = ?
1713 "#,
1714 (new, next, linked_chunk_id),
1715 )?;
1716 if updated < 1 {
1717 return Err(rusqlite::Error::QueryReturnedNoRows);
1718 }
1719 if updated > 1 {
1720 return Err(rusqlite::Error::QueryReturnedMoreThanOneRow);
1721 }
1722 }
1723
1724 Ok(())
1725}
1726
1727#[cfg(test)]
1728mod tests {
1729 use std::{
1730 path::PathBuf,
1731 sync::{
1732 LazyLock,
1733 atomic::{AtomicU32, Ordering::SeqCst},
1734 },
1735 };
1736
1737 use assert_matches::assert_matches;
1738 use matrix_sdk_base::{
1739 event_cache::store::{
1740 EventCacheStore, EventCacheStoreError, IntoEventCacheStore,
1741 integration_tests::EventCacheStoreIntegrationTests,
1742 },
1743 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1744 linked_chunk::{ChunkIdentifier, LinkedChunkId, Update},
1745 };
1746 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
1747 use tempfile::{TempDir, tempdir};
1748
1749 use super::SqliteEventCacheStore;
1750 use crate::{
1751 SqliteStoreConfig,
1752 event_cache_store::keys,
1753 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1754 };
1755
1756 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1757 static NUM: AtomicU32 = AtomicU32::new(0);
1758
1759 fn new_event_cache_store_workspace() -> PathBuf {
1760 let name = NUM.fetch_add(1, SeqCst).to_string();
1761 TMP_DIR.path().join(name)
1762 }
1763
1764 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1765 let tmpdir_path = new_event_cache_store_workspace();
1766
1767 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1768
1769 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1770 }
1771
1772 event_cache_store_integration_tests!();
1773 event_cache_store_integration_tests_time!();
1774
1775 #[async_test]
1776 async fn test_pool_size() {
1777 let tmpdir_path = new_event_cache_store_workspace();
1778 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1779
1780 let store = SqliteEventCacheStore::open_with_config(&store_open_config).await.unwrap();
1781
1782 assert_eq!(store.pool_max_size().await, Some(42));
1783 }
1784
1785 #[async_test]
1786 async fn test_linked_chunk_remove_chunk() {
1787 let store = get_event_cache_store().await.expect("creating cache store failed");
1788
1789 store.clone().into_event_cache_store().test_linked_chunk_remove_chunk().await;
1791
1792 let gaps = store
1794 .read()
1795 .await
1796 .unwrap()
1797 .with_transaction(|txn| -> rusqlite::Result<_> {
1798 let mut gaps = Vec::new();
1799 for data in txn
1800 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
1801 .query_map((), |row| row.get::<_, u64>(0))?
1802 {
1803 gaps.push(data?);
1804 }
1805 Ok(gaps)
1806 })
1807 .await
1808 .unwrap();
1809
1810 assert_eq!(gaps, vec![42, 44]);
1813 }
1814
1815 #[async_test]
1816 async fn test_linked_chunk_remove_item() {
1817 let store = get_event_cache_store().await.expect("creating cache store failed");
1818
1819 store.clone().into_event_cache_store().test_linked_chunk_remove_item().await;
1821
1822 let room_id = *DEFAULT_TEST_ROOM_ID;
1823 let linked_chunk_id = LinkedChunkId::Room(room_id);
1824
1825 let num_rows: u64 = store
1827 .read()
1828 .await
1829 .unwrap()
1830 .with_transaction(move |txn| {
1831 txn.query_row(
1832 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
1833 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
1834 |row| row.get(0),
1835 )
1836 })
1837 .await
1838 .unwrap();
1839 assert_eq!(num_rows, 3);
1840 }
1841
1842 #[async_test]
1843 async fn test_linked_chunk_clear() {
1844 let store = get_event_cache_store().await.expect("creating cache store failed");
1845
1846 store.clone().into_event_cache_store().test_linked_chunk_clear().await;
1848
1849 store
1851 .read()
1852 .await
1853 .unwrap()
1854 .with_transaction(|txn| -> rusqlite::Result<_> {
1855 let num_gaps = txn
1856 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
1857 .query_row((), |row| row.get::<_, u64>(0))?;
1858 assert_eq!(num_gaps, 0);
1859
1860 let num_events = txn
1861 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
1862 .query_row((), |row| row.get::<_, u64>(0))?;
1863 assert_eq!(num_events, 0);
1864
1865 Ok(())
1866 })
1867 .await
1868 .unwrap();
1869 }
1870
1871 #[async_test]
1872 async fn test_linked_chunk_update_is_a_transaction() {
1873 let store = get_event_cache_store().await.expect("creating cache store failed");
1874
1875 let room_id = *DEFAULT_TEST_ROOM_ID;
1876 let linked_chunk_id = LinkedChunkId::Room(room_id);
1877
1878 let err = store
1881 .handle_linked_chunk_updates(
1882 linked_chunk_id,
1883 vec![
1884 Update::NewItemsChunk {
1885 previous: None,
1886 new: ChunkIdentifier::new(42),
1887 next: None,
1888 },
1889 Update::NewItemsChunk {
1890 previous: None,
1891 new: ChunkIdentifier::new(42),
1892 next: None,
1893 },
1894 ],
1895 )
1896 .await
1897 .unwrap_err();
1898
1899 assert_matches!(err, crate::error::Error::Sqlite(err) => {
1901 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1902 });
1903
1904 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
1908 assert!(chunks.is_empty());
1909 }
1910}
1911
1912#[cfg(test)]
1913mod encrypted_tests {
1914 use std::sync::{
1915 LazyLock,
1916 atomic::{AtomicU32, Ordering::SeqCst},
1917 };
1918
1919 use matrix_sdk_base::{
1920 event_cache::store::{EventCacheStore, EventCacheStoreError},
1921 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1922 };
1923 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1924 use ruma::{
1925 event_id,
1926 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1927 room_id, user_id,
1928 };
1929 use tempfile::{TempDir, tempdir};
1930
1931 use super::SqliteEventCacheStore;
1932
1933 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
1934 static NUM: AtomicU32 = AtomicU32::new(0);
1935
1936 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1937 let name = NUM.fetch_add(1, SeqCst).to_string();
1938 let tmpdir_path = TMP_DIR.path().join(name);
1939
1940 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1941
1942 Ok(SqliteEventCacheStore::open(
1943 tmpdir_path.to_str().unwrap(),
1944 Some("default_test_password"),
1945 )
1946 .await
1947 .unwrap())
1948 }
1949
1950 event_cache_store_integration_tests!();
1951 event_cache_store_integration_tests_time!();
1952
1953 #[async_test]
1954 async fn test_no_sqlite_injection_in_find_event_relations() {
1955 let room_id = room_id!("!test:localhost");
1956 let another_room_id = room_id!("!r1:matrix.org");
1957 let sender = user_id!("@alice:localhost");
1958
1959 let store = get_event_cache_store()
1960 .await
1961 .expect("We should be able to create a new, empty, event cache store");
1962
1963 let f = EventFactory::new().room(room_id).sender(sender);
1964
1965 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
1967 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
1968
1969 let edit_id = event_id!("$find_me:matrix.org");
1971 let edit = f
1972 .text_msg("Find me")
1973 .event_id(edit_id)
1974 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
1975 .into_event();
1976
1977 let f = f.room(another_room_id);
1979
1980 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
1981 let another_event =
1982 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
1983
1984 store.save_event(room_id, event).await.unwrap();
1986 store.save_event(room_id, edit).await.unwrap();
1987 store.save_event(another_room_id, another_event).await.unwrap();
1988
1989 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
1993
1994 let results = store
1996 .find_event_relations(room_id, event_id, filter.as_deref())
1997 .await
1998 .expect("We should be able to attempt to find event relations");
1999
2000 similar_asserts::assert_eq!(
2002 results.len(),
2003 1,
2004 "We should only have loaded events for the first room {results:#?}"
2005 );
2006
2007 let (found_event, _) = &results[0];
2009 assert_eq!(
2010 found_event.event_id().as_deref(),
2011 Some(edit_id),
2012 "The single event we found should be the edit event"
2013 );
2014 }
2015}
2016
2017#[cfg(test)]
2018mod close_reopen_tests {
2019 use std::sync::{
2020 LazyLock,
2021 atomic::{AtomicU32, Ordering::SeqCst},
2022 };
2023
2024 use matrix_sdk_base::{event_cache::store::EventCacheStore, linked_chunk::LinkedChunkId};
2025 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, async_test};
2026 use tempfile::{TempDir, tempdir};
2027
2028 use super::SqliteEventCacheStore;
2029
2030 static TMP_DIR: LazyLock<TempDir> = LazyLock::new(|| tempdir().unwrap());
2031 static NUM: AtomicU32 = AtomicU32::new(0);
2032
2033 async fn new_store() -> SqliteEventCacheStore {
2034 let name = NUM.fetch_add(1, SeqCst).to_string();
2035 let tmpdir_path = TMP_DIR.path().join(name);
2036 SqliteEventCacheStore::open(tmpdir_path, None).await.unwrap()
2037 }
2038
2039 #[async_test]
2040 async fn test_close_completes_without_timeout() {
2041 let store = new_store().await;
2042
2043 let start = std::time::Instant::now();
2045 store.close().await.unwrap();
2046 let elapsed = start.elapsed();
2047
2048 assert!(
2049 elapsed < std::time::Duration::from_secs(2),
2050 "close() took {elapsed:?}, expected < 2s (no timeout)"
2051 );
2052
2053 let guard = store.connections.lock().await;
2055 assert!(guard.is_none(), "connections should be None after close");
2056 }
2057
2058 #[async_test]
2059 async fn test_reopen_restores_connections() {
2060 let store = new_store().await;
2061
2062 store.close().await.unwrap();
2063
2064 {
2065 let guard = store.connections.lock().await;
2066 assert!(guard.is_none());
2067 }
2068
2069 store.reopen().await.unwrap();
2070
2071 {
2072 let guard = store.connections.lock().await;
2073 assert!(guard.is_some(), "connections should be Some after reopen");
2074 }
2075 }
2076
2077 #[async_test]
2078 async fn test_close_is_idempotent() {
2079 let store = new_store().await;
2080
2081 store.close().await.unwrap();
2082 store.close().await.unwrap();
2084
2085 let guard = store.connections.lock().await;
2086 assert!(guard.is_none());
2087 }
2088
2089 #[async_test]
2090 async fn test_reopen_is_idempotent() {
2091 let store = new_store().await;
2092
2093 store.reopen().await.unwrap();
2095
2096 let guard = store.connections.lock().await;
2097 assert!(guard.is_some());
2098 }
2099
2100 #[async_test]
2101 async fn test_read_fails_when_closed() {
2102 let store = new_store().await;
2103 store.close().await.unwrap();
2104
2105 let err = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2106 assert!(err.is_err(), "read should fail when closed");
2107
2108 let err_msg = err.unwrap_err().to_string();
2109 assert!(err_msg.contains("closed"), "error should mention 'closed', got: {err_msg}");
2110 }
2111
2112 #[async_test]
2113 async fn test_write_fails_when_closed() {
2114 let store = new_store().await;
2115 store.close().await.unwrap();
2116
2117 let err = store.try_take_leased_lock(1000, "test_lock", "holder").await;
2118 assert!(err.is_err(), "write should fail when closed");
2119
2120 let err_msg = err.unwrap_err().to_string();
2121 assert!(err_msg.contains("closed"), "error should mention 'closed', got: {err_msg}");
2122 }
2123
2124 #[async_test]
2125 async fn test_data_persists_across_close_reopen() {
2126 let store = new_store().await;
2127
2128 let result = store.try_take_leased_lock(60_000, "test_lock", "holder").await.unwrap();
2130 assert!(result.is_some(), "should have acquired the lock");
2131
2132 store.close().await.unwrap();
2134 store.reopen().await.unwrap();
2135
2136 let result = store.try_take_leased_lock(60_000, "test_lock", "other_holder").await.unwrap();
2138 assert!(result.is_none(), "lock should still be held by the original holder after reopen");
2139 }
2140
2141 #[async_test]
2142 async fn test_multiple_close_reopen_cycles() {
2143 let store = new_store().await;
2144
2145 for _ in 0..5 {
2146 store.close().await.unwrap();
2147 store.reopen().await.unwrap();
2148
2149 let result = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2151 assert!(result.is_ok(), "store should work after close/reopen cycle");
2152 }
2153 }
2154
2155 #[async_test]
2156 async fn test_pool_is_fully_drained_after_close() {
2157 let store = new_store().await;
2158
2159 let _ = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2161 let _ = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2162
2163 store.close().await.unwrap();
2164
2165 let guard = store.connections.lock().await;
2168 assert!(guard.is_none(), "all connections should be released after close");
2169 }
2170
2171 #[async_test]
2172 async fn test_operations_work_immediately_after_reopen() {
2173 let store = new_store().await;
2174
2175 store.close().await.unwrap();
2176 store.reopen().await.unwrap();
2177
2178 let result = store.load_all_chunks(LinkedChunkId::Room(*DEFAULT_TEST_ROOM_ID)).await;
2180 assert!(result.is_ok(), "read should succeed immediately after reopen");
2181
2182 let result = store.try_take_leased_lock(1000, "test_lock", "holder").await;
2184 assert!(result.is_ok(), "write should succeed immediately after reopen");
2185 }
2186
2187 #[async_test]
2188 async fn test_close_waits_for_held_read_connection_to_drain() {
2189 let store = new_store().await;
2190
2191 let held_conn = store.read().await.unwrap();
2193
2194 let store_clone = store.clone();
2197 let close_handle = tokio::spawn(async move {
2198 store_clone.close().await.unwrap();
2199 });
2200
2201 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
2203
2204 assert!(!close_handle.is_finished(), "close should be waiting for the held connection");
2206
2207 drop(held_conn);
2209
2210 let timeout = tokio::time::timeout(std::time::Duration::from_secs(3), close_handle).await;
2212 assert!(timeout.is_ok(), "close should complete after the held connection is released");
2213 timeout.unwrap().unwrap();
2214
2215 let guard = store.connections.lock().await;
2217 assert!(guard.is_none(), "connections should be None after close");
2218 }
2219}