1use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{
25 media::{
26 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
27 MediaService,
28 },
29 EventCacheStore,
30 },
31 Event, Gap,
32 },
33 linked_chunk::{
34 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Position, RawChunk, Update,
35 },
36 media::{MediaRequestParameters, UniqueKey},
37};
38use matrix_sdk_store_encryption::StoreCipher;
39use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
40use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
41use tokio::fs;
42use tracing::{debug, error, trace};
43
44use crate::{
45 error::{Error, Result},
46 utils::{
47 repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
48 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
49 },
50 OpenStoreError, SqliteStoreConfig,
51};
52
53mod keys {
54 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
56 pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
57
58 pub const LINKED_CHUNKS: &str = "linked_chunks";
60 pub const MEDIA: &str = "media";
61}
62
63const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
65
66const DATABASE_VERSION: u8 = 6;
72
73const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
76const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
79
80#[derive(Clone)]
82pub struct SqliteEventCacheStore {
83 store_cipher: Option<Arc<StoreCipher>>,
84 pool: SqlitePool,
85 media_service: MediaService,
86}
87
88#[cfg(not(tarpaulin_include))]
89impl fmt::Debug for SqliteEventCacheStore {
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
92 }
93}
94
95impl SqliteEventCacheStore {
96 pub async fn open(
99 path: impl AsRef<Path>,
100 passphrase: Option<&str>,
101 ) -> Result<Self, OpenStoreError> {
102 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
103 }
104
105 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
107 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
108
109 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
110
111 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
112 config.pool = Some(pool_config);
113
114 let pool = config.create_pool(Runtime::Tokio1)?;
115
116 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
117 this.pool.get().await?.apply_runtime_config(runtime_config).await?;
118
119 Ok(this)
120 }
121
122 async fn open_with_pool(
125 pool: SqlitePool,
126 passphrase: Option<&str>,
127 ) -> Result<Self, OpenStoreError> {
128 let conn = pool.get().await?;
129
130 let version = conn.db_version().await?;
131 run_migrations(&conn, version).await?;
132
133 let store_cipher = match passphrase {
134 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
135 None => None,
136 };
137
138 let media_service = MediaService::new();
139 let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
140 let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
141 media_service.restore(media_retention_policy, last_media_cleanup_time);
142
143 Ok(Self { store_cipher, pool, media_service })
144 }
145
146 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
147 if let Some(key) = &self.store_cipher {
148 let encrypted = key.encrypt_value_data(value)?;
149 Ok(rmp_serde::to_vec_named(&encrypted)?)
150 } else {
151 Ok(value)
152 }
153 }
154
155 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
156 if let Some(key) = &self.store_cipher {
157 let encrypted = rmp_serde::from_slice(value)?;
158 let decrypted = key.decrypt_value_data(encrypted)?;
159 Ok(Cow::Owned(decrypted))
160 } else {
161 Ok(Cow::Borrowed(value))
162 }
163 }
164
165 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
166 let bytes = key.as_ref();
167 if let Some(store_cipher) = &self.store_cipher {
168 Key::Hashed(store_cipher.hash_key(table_name, bytes))
169 } else {
170 Key::Plain(bytes.to_owned())
171 }
172 }
173
174 async fn acquire(&self) -> Result<SqliteAsyncConn> {
175 let connection = self.pool.get().await?;
176
177 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
182
183 Ok(connection)
184 }
185
186 fn map_row_to_chunk(
187 row: &rusqlite::Row<'_>,
188 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
189 Ok((
190 row.get::<_, u64>(0)?,
191 row.get::<_, Option<u64>>(1)?,
192 row.get::<_, Option<u64>>(2)?,
193 row.get::<_, String>(3)?,
194 ))
195 }
196}
197
198trait TransactionExtForLinkedChunks {
199 fn rebuild_chunk(
200 &self,
201 store: &SqliteEventCacheStore,
202 room_id: &Key,
203 previous: Option<u64>,
204 index: u64,
205 next: Option<u64>,
206 chunk_type: &str,
207 ) -> Result<RawChunk<Event, Gap>>;
208
209 fn load_gap_content(
210 &self,
211 store: &SqliteEventCacheStore,
212 room_id: &Key,
213 chunk_id: ChunkIdentifier,
214 ) -> Result<Gap>;
215
216 fn load_events_content(
217 &self,
218 store: &SqliteEventCacheStore,
219 room_id: &Key,
220 chunk_id: ChunkIdentifier,
221 ) -> Result<Vec<Event>>;
222}
223
224impl TransactionExtForLinkedChunks for Transaction<'_> {
225 fn rebuild_chunk(
226 &self,
227 store: &SqliteEventCacheStore,
228 room_id: &Key,
229 previous: Option<u64>,
230 id: u64,
231 next: Option<u64>,
232 chunk_type: &str,
233 ) -> Result<RawChunk<Event, Gap>> {
234 let previous = previous.map(ChunkIdentifier::new);
235 let next = next.map(ChunkIdentifier::new);
236 let id = ChunkIdentifier::new(id);
237
238 match chunk_type {
239 CHUNK_TYPE_GAP_TYPE_STRING => {
240 let gap = self.load_gap_content(store, room_id, id)?;
242 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
243 }
244
245 CHUNK_TYPE_EVENT_TYPE_STRING => {
246 let events = self.load_events_content(store, room_id, id)?;
248 Ok(RawChunk {
249 content: ChunkContent::Items(events),
250 previous,
251 identifier: id,
252 next,
253 })
254 }
255
256 other => {
257 Err(Error::InvalidData {
259 details: format!("a linked chunk has an unknown type {other}"),
260 })
261 }
262 }
263 }
264
265 fn load_gap_content(
266 &self,
267 store: &SqliteEventCacheStore,
268 room_id: &Key,
269 chunk_id: ChunkIdentifier,
270 ) -> Result<Gap> {
271 let encoded_prev_token: Vec<u8> = self.query_row(
274 "SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
275 (chunk_id.index(), &room_id),
276 |row| row.get(0),
277 )?;
278 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
279 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
280 Ok(Gap { prev_token })
281 }
282
283 fn load_events_content(
284 &self,
285 store: &SqliteEventCacheStore,
286 room_id: &Key,
287 chunk_id: ChunkIdentifier,
288 ) -> Result<Vec<Event>> {
289 let mut events = Vec::new();
291
292 for event_data in self
293 .prepare(
294 r#"
295 SELECT content FROM events
296 WHERE chunk_id = ? AND room_id = ?
297 ORDER BY position ASC
298 "#,
299 )?
300 .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
301 {
302 let encoded_content = event_data?;
303 let serialized_content = store.decode_value(&encoded_content)?;
304 let event = serde_json::from_slice(&serialized_content)?;
305
306 events.push(event);
307 }
308
309 Ok(events)
310 }
311}
312
313async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
315 if version == 0 {
316 debug!("Creating database");
317 } else if version < DATABASE_VERSION {
318 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
319 } else {
320 return Ok(());
321 }
322
323 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
325
326 if version < 1 {
327 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
330 conn.with_transaction(|txn| {
331 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
332 txn.set_db_version(1)
333 })
334 .await?;
335 }
336
337 if version < 2 {
338 conn.with_transaction(|txn| {
339 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
340 txn.set_db_version(2)
341 })
342 .await?;
343 }
344
345 if version < 3 {
346 conn.with_transaction(|txn| {
347 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
348 txn.set_db_version(3)
349 })
350 .await?;
351 }
352
353 if version < 4 {
354 conn.with_transaction(|txn| {
355 txn.execute_batch(include_str!(
356 "../migrations/event_cache_store/004_ignore_policy.sql"
357 ))?;
358 txn.set_db_version(4)
359 })
360 .await?;
361 }
362
363 if version < 5 {
364 conn.with_transaction(|txn| {
365 txn.execute_batch(include_str!(
366 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
367 ))?;
368 txn.set_db_version(5)
369 })
370 .await?;
371 }
372
373 if version < 6 {
374 conn.with_transaction(|txn| {
375 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
376 txn.set_db_version(6)
377 })
378 .await?;
379 }
380
381 Ok(())
382}
383
384#[async_trait]
385impl EventCacheStore for SqliteEventCacheStore {
386 type Error = Error;
387
388 async fn try_take_leased_lock(
389 &self,
390 lease_duration_ms: u32,
391 key: &str,
392 holder: &str,
393 ) -> Result<bool> {
394 let key = key.to_owned();
395 let holder = holder.to_owned();
396
397 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
398 let expiration = now + lease_duration_ms as u64;
399
400 let num_touched = self
401 .acquire()
402 .await?
403 .with_transaction(move |txn| {
404 txn.execute(
405 "INSERT INTO lease_locks (key, holder, expiration)
406 VALUES (?1, ?2, ?3)
407 ON CONFLICT (key)
408 DO
409 UPDATE SET holder = ?2, expiration = ?3
410 WHERE holder = ?2
411 OR expiration < ?4
412 ",
413 (key, holder, expiration, now),
414 )
415 })
416 .await?;
417
418 Ok(num_touched == 1)
419 }
420
421 async fn handle_linked_chunk_updates(
422 &self,
423 room_id: &RoomId,
424 updates: Vec<Update<Event, Gap>>,
425 ) -> Result<(), Self::Error> {
426 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
429 let room_id = room_id.to_owned();
430 let this = self.clone();
431
432 with_immediate_transaction(self.acquire().await?, move |txn| {
433 for up in updates {
434 match up {
435 Update::NewItemsChunk { previous, new, next } => {
436 let previous = previous.as_ref().map(ChunkIdentifier::index);
437 let new = new.index();
438 let next = next.as_ref().map(ChunkIdentifier::index);
439
440 trace!(
441 %room_id,
442 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
443 );
444
445 insert_chunk(
446 txn,
447 &hashed_room_id,
448 previous,
449 new,
450 next,
451 CHUNK_TYPE_EVENT_TYPE_STRING,
452 )?;
453 }
454
455 Update::NewGapChunk { previous, new, next, gap } => {
456 let serialized = serde_json::to_vec(&gap.prev_token)?;
457 let prev_token = this.encode_value(serialized)?;
458
459 let previous = previous.as_ref().map(ChunkIdentifier::index);
460 let new = new.index();
461 let next = next.as_ref().map(ChunkIdentifier::index);
462
463 trace!(
464 %room_id,
465 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
466 );
467
468 insert_chunk(
470 txn,
471 &hashed_room_id,
472 previous,
473 new,
474 next,
475 CHUNK_TYPE_GAP_TYPE_STRING,
476 )?;
477
478 txn.execute(
480 r#"
481 INSERT INTO gaps(chunk_id, room_id, prev_token)
482 VALUES (?, ?, ?)
483 "#,
484 (new, &hashed_room_id, prev_token),
485 )?;
486 }
487
488 Update::RemoveChunk(chunk_identifier) => {
489 let chunk_id = chunk_identifier.index();
490
491 trace!(%room_id, "removing chunk @ {chunk_id}");
492
493 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
495 "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
496 (chunk_id, &hashed_room_id),
497 |row| Ok((row.get(0)?, row.get(1)?))
498 )?;
499
500 if let Some(previous) = previous {
502 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
503 }
504
505 if let Some(next) = next {
507 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
508 }
509
510 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
513 }
514
515 Update::PushItems { at, items } => {
516 if items.is_empty() {
517 continue;
519 }
520
521 let chunk_id = at.chunk_identifier().index();
522
523 trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
524
525 let mut statement = txn.prepare(
526 "INSERT INTO events(chunk_id, room_id, event_id, content, position) VALUES (?, ?, ?, ?, ?)"
527 )?;
528
529 let invalid_event = |event: TimelineEvent| {
530 let Some(event_id) = event.event_id() else {
531 error!(%room_id, "Trying to push an event with no ID");
532 return None;
533 };
534
535 Some((event_id.to_string(), event))
536 };
537
538 for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
539 let serialized = serde_json::to_vec(&event)?;
540 let content = this.encode_value(serialized)?;
541
542 let index = at.index() + i;
543
544 statement.execute((chunk_id, &hashed_room_id, event_id, content, index))?;
545 }
546 }
547
548 Update::ReplaceItem { at, item: event } => {
549 let chunk_id = at.chunk_identifier().index();
550 let index = at.index();
551
552 trace!(%room_id, "replacing item @ {chunk_id}:{index}");
553
554 let serialized = serde_json::to_vec(&event)?;
555 let content = this.encode_value(serialized)?;
556
557 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
559 error!(%room_id, "Trying to replace an event with a new one that has no ID");
560 continue;
561 };
562
563 txn.execute(
564 r#"
565 UPDATE events
566 SET content = ?, event_id = ?
567 WHERE room_id = ? AND chunk_id = ? AND position = ?
568 "#,
569 (content, event_id, &hashed_room_id, chunk_id, index,)
570 )?;
571 }
572
573 Update::RemoveItem { at } => {
574 let chunk_id = at.chunk_identifier().index();
575 let index = at.index();
576
577 trace!(%room_id, "removing item @ {chunk_id}:{index}");
578
579 txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
581
582 txn.execute(
584 r#"
585 UPDATE events
586 SET position = position - 1
587 WHERE room_id = ? AND chunk_id = ? AND position > ?
588 "#,
589 (&hashed_room_id, chunk_id, index)
590 )?;
591
592 }
593
594 Update::DetachLastItems { at } => {
595 let chunk_id = at.chunk_identifier().index();
596 let index = at.index();
597
598 trace!(%room_id, "truncating items >= {chunk_id}:{index}");
599
600 txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
602 }
603
604 Update::Clear => {
605 trace!(%room_id, "clearing items");
606
607 txn.execute(
609 "DELETE FROM linked_chunks WHERE room_id = ?",
610 (&hashed_room_id,),
611 )?;
612 }
613
614 Update::StartReattachItems | Update::EndReattachItems => {
615 }
617 }
618 }
619
620 Ok(())
621 })
622 .await?;
623
624 Ok(())
625 }
626
627 async fn load_all_chunks(
628 &self,
629 room_id: &RoomId,
630 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
631 let room_id = room_id.to_owned();
632 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
633
634 let this = self.clone();
635
636 let result = self
637 .acquire()
638 .await?
639 .with_transaction(move |txn| -> Result<_> {
640 let mut items = Vec::new();
641
642 for data in txn
644 .prepare(
645 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
646 )?
647 .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
648 {
649 let (id, previous, next, chunk_type) = data?;
650 let new = txn.rebuild_chunk(
651 &this,
652 &hashed_room_id,
653 previous,
654 id,
655 next,
656 chunk_type.as_str(),
657 )?;
658 items.push(new);
659 }
660
661 Ok(items)
662 })
663 .await?;
664
665 Ok(result)
666 }
667
668 async fn load_last_chunk(
669 &self,
670 room_id: &RoomId,
671 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
672 let room_id = room_id.to_owned();
673 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
674
675 let this = self.clone();
676
677 self
678 .acquire()
679 .await?
680 .with_transaction(move |txn| -> Result<_> {
681 let (chunk_identifier_generator, number_of_chunks) = txn
683 .prepare(
684 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE room_id = ?"
685 )?
686 .query_row(
687 (&hashed_room_id,),
688 |row| {
689 Ok((
690 row.get::<_, Option<u64>>(0)?,
695 row.get::<_, u64>(1)?,
696 ))
697 }
698 )?;
699
700 let chunk_identifier_generator = match chunk_identifier_generator {
701 Some(last_chunk_identifier) => {
702 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
703 ChunkIdentifier::new(last_chunk_identifier)
704 )
705 },
706 None => ChunkIdentifierGenerator::new_from_scratch(),
707 };
708
709 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
711 .prepare(
712 "SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
713 )?
714 .query_row(
715 (&hashed_room_id,),
716 |row| {
717 Ok((
718 row.get::<_, u64>(0)?,
719 row.get::<_, Option<u64>>(1)?,
720 row.get::<_, String>(2)?,
721 ))
722 }
723 )
724 .optional()?
725 else {
726 if number_of_chunks == 0 {
729 return Ok((None, chunk_identifier_generator));
730 }
731 else {
736 return Err(Error::InvalidData {
737 details:
738 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
739 .to_owned()
740 }
741 )
742 }
743 };
744
745 let last_chunk = txn.rebuild_chunk(
747 &this,
748 &hashed_room_id,
749 previous_chunk,
750 chunk_identifier,
751 None,
752 &chunk_type
753 )?;
754
755 Ok((Some(last_chunk), chunk_identifier_generator))
756 })
757 .await
758 }
759
760 async fn load_previous_chunk(
761 &self,
762 room_id: &RoomId,
763 before_chunk_identifier: ChunkIdentifier,
764 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
765 let room_id = room_id.to_owned();
766 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
767
768 let this = self.clone();
769
770 self
771 .acquire()
772 .await?
773 .with_transaction(move |txn| -> Result<_> {
774 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
776 .prepare(
777 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
778 )?
779 .query_row(
780 (&hashed_room_id, before_chunk_identifier.index()),
781 |row| {
782 Ok((
783 row.get::<_, u64>(0)?,
784 row.get::<_, Option<u64>>(1)?,
785 row.get::<_, Option<u64>>(2)?,
786 row.get::<_, String>(3)?,
787 ))
788 }
789 )
790 .optional()?
791 else {
792 return Ok(None);
794 };
795
796 let last_chunk = txn.rebuild_chunk(
798 &this,
799 &hashed_room_id,
800 previous_chunk,
801 chunk_identifier,
802 next_chunk,
803 &chunk_type
804 )?;
805
806 Ok(Some(last_chunk))
807 })
808 .await
809 }
810
811 async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
812 self.acquire()
813 .await?
814 .with_transaction(move |txn| {
815 txn.execute("DELETE FROM linked_chunks", ())
817 })
818 .await?;
819 Ok(())
820 }
821
822 async fn filter_duplicated_events(
823 &self,
824 room_id: &RoomId,
825 events: Vec<OwnedEventId>,
826 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
827 if events.is_empty() {
831 return Ok(Vec::new());
832 }
833
834 let room_id = room_id.to_owned();
836 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
837
838 self.acquire()
839 .await?
840 .with_transaction(move |txn| -> Result<_> {
841 txn.chunk_large_query_over(events, None, move |txn, events| {
842 let query = format!(
843 "SELECT event_id, chunk_id, position FROM events WHERE room_id = ? AND event_id IN ({}) ORDER BY chunk_id ASC, position ASC",
844 repeat_vars(events.len()),
845 );
846 let parameters = params_from_iter(
847 once(
849 hashed_room_id
850 .to_sql()
851 .unwrap(),
853 )
854 .chain(events.iter().map(|event| {
856 event
857 .as_str()
858 .to_sql()
859 .unwrap()
861 })),
862 );
863
864 let mut duplicated_events = Vec::new();
865
866 for duplicated_event in txn
867 .prepare(&query)?
868 .query_map(parameters, |row| {
869 Ok((
870 row.get::<_, String>(0)?,
871 row.get::<_, u64>(1)?,
872 row.get::<_, usize>(2)?
873 ))
874 })?
875 {
876 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
877
878 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
879 error!(%duplicated_event, %room_id, "Reading an malformed event ID");
882 continue;
883 };
884
885 duplicated_events.push((duplicated_event, Position::new(ChunkIdentifier::new(chunk_identifier), index)));
886 }
887
888 Ok(duplicated_events)
889 })
890 })
891 .await
892 }
893
894 async fn find_event(
895 &self,
896 room_id: &RoomId,
897 event_id: &EventId,
898 ) -> Result<Option<(Position, Event)>, Self::Error> {
899 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
900 let event_id = event_id.to_owned();
901 let this = self.clone();
902
903 self.acquire()
904 .await?
905 .with_transaction(move |txn| -> Result<_> {
906 let Some((chunk_identifier, index, event)) = txn
907 .prepare(
908 "SELECT chunk_id, position, content FROM events WHERE room_id = ? AND event_id = ?",
909 )?
910 .query_row((hashed_room_id, event_id.as_str(),), |row| {
911 Ok((
912 row.get::<_, u64>(0)?,
913 row.get::<_, usize>(1)?,
914 row.get::<_, Vec<u8>>(2)?,
915 ))
916 })
917 .optional()?
918 else {
919 return Ok(None);
921 };
922
923 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
924
925 Ok(Some((Position::new(ChunkIdentifier::new(chunk_identifier), index), event)))
926 })
927 .await
928 }
929
930 async fn add_media_content(
931 &self,
932 request: &MediaRequestParameters,
933 content: Vec<u8>,
934 ignore_policy: IgnoreMediaRetentionPolicy,
935 ) -> Result<()> {
936 self.media_service.add_media_content(self, request, content, ignore_policy).await
937 }
938
939 async fn replace_media_key(
940 &self,
941 from: &MediaRequestParameters,
942 to: &MediaRequestParameters,
943 ) -> Result<(), Self::Error> {
944 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
945 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
946
947 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
948 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
949
950 let conn = self.acquire().await?;
951 conn.execute(
952 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
953 (new_uri, new_format, prev_uri, prev_format),
954 )
955 .await?;
956
957 Ok(())
958 }
959
960 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
961 self.media_service.get_media_content(self, request).await
962 }
963
964 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
965 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
966 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
967
968 let conn = self.acquire().await?;
969 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
970
971 Ok(())
972 }
973
974 async fn get_media_content_for_uri(
975 &self,
976 uri: &MxcUri,
977 ) -> Result<Option<Vec<u8>>, Self::Error> {
978 self.media_service.get_media_content_for_uri(self, uri).await
979 }
980
981 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
982 let uri = self.encode_key(keys::MEDIA, uri);
983
984 let conn = self.acquire().await?;
985 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
986
987 Ok(())
988 }
989
990 async fn set_media_retention_policy(
991 &self,
992 policy: MediaRetentionPolicy,
993 ) -> Result<(), Self::Error> {
994 self.media_service.set_media_retention_policy(self, policy).await
995 }
996
997 fn media_retention_policy(&self) -> MediaRetentionPolicy {
998 self.media_service.media_retention_policy()
999 }
1000
1001 async fn set_ignore_media_retention_policy(
1002 &self,
1003 request: &MediaRequestParameters,
1004 ignore_policy: IgnoreMediaRetentionPolicy,
1005 ) -> Result<(), Self::Error> {
1006 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1007 }
1008
1009 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1010 self.media_service.clean_up_media_cache(self).await
1011 }
1012}
1013
1014#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1015#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1016impl EventCacheStoreMedia for SqliteEventCacheStore {
1017 type Error = Error;
1018
1019 async fn media_retention_policy_inner(
1020 &self,
1021 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1022 let conn = self.acquire().await?;
1023 conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1024 }
1025
1026 async fn set_media_retention_policy_inner(
1027 &self,
1028 policy: MediaRetentionPolicy,
1029 ) -> Result<(), Self::Error> {
1030 let conn = self.acquire().await?;
1031 conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1032 Ok(())
1033 }
1034
1035 async fn add_media_content_inner(
1036 &self,
1037 request: &MediaRequestParameters,
1038 data: Vec<u8>,
1039 last_access: SystemTime,
1040 policy: MediaRetentionPolicy,
1041 ignore_policy: IgnoreMediaRetentionPolicy,
1042 ) -> Result<(), Self::Error> {
1043 let ignore_policy = ignore_policy.is_yes();
1044 let data = self.encode_value(data)?;
1045
1046 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1047 return Ok(());
1048 }
1049
1050 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1051 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1052 let timestamp = time_to_timestamp(last_access);
1053
1054 let conn = self.acquire().await?;
1055 conn.execute(
1056 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1057 (uri, format, data, timestamp, ignore_policy),
1058 )
1059 .await?;
1060
1061 Ok(())
1062 }
1063
1064 async fn set_ignore_media_retention_policy_inner(
1065 &self,
1066 request: &MediaRequestParameters,
1067 ignore_policy: IgnoreMediaRetentionPolicy,
1068 ) -> Result<(), Self::Error> {
1069 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1070 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1071 let ignore_policy = ignore_policy.is_yes();
1072
1073 let conn = self.acquire().await?;
1074 conn.execute(
1075 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1076 (ignore_policy, uri, format),
1077 )
1078 .await?;
1079
1080 Ok(())
1081 }
1082
1083 async fn get_media_content_inner(
1084 &self,
1085 request: &MediaRequestParameters,
1086 current_time: SystemTime,
1087 ) -> Result<Option<Vec<u8>>, Self::Error> {
1088 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1089 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1090 let timestamp = time_to_timestamp(current_time);
1091
1092 let conn = self.acquire().await?;
1093 let data = conn
1094 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1095 txn.execute(
1099 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1100 (timestamp, &uri, &format),
1101 )?;
1102
1103 txn.query_row::<Vec<u8>, _, _>(
1104 "SELECT data FROM media WHERE uri = ? AND format = ?",
1105 (&uri, &format),
1106 |row| row.get(0),
1107 )
1108 .optional()
1109 })
1110 .await?;
1111
1112 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1113 }
1114
1115 async fn get_media_content_for_uri_inner(
1116 &self,
1117 uri: &MxcUri,
1118 current_time: SystemTime,
1119 ) -> Result<Option<Vec<u8>>, Self::Error> {
1120 let uri = self.encode_key(keys::MEDIA, uri);
1121 let timestamp = time_to_timestamp(current_time);
1122
1123 let conn = self.acquire().await?;
1124 let data = conn
1125 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1126 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1130
1131 txn.query_row::<Vec<u8>, _, _>(
1132 "SELECT data FROM media WHERE uri = ?",
1133 (&uri,),
1134 |row| row.get(0),
1135 )
1136 .optional()
1137 })
1138 .await?;
1139
1140 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1141 }
1142
1143 async fn clean_up_media_cache_inner(
1144 &self,
1145 policy: MediaRetentionPolicy,
1146 current_time: SystemTime,
1147 ) -> Result<(), Self::Error> {
1148 if !policy.has_limitations() {
1149 return Ok(());
1151 }
1152
1153 let conn = self.acquire().await?;
1154 let removed = conn
1155 .with_transaction::<_, Error, _>(move |txn| {
1156 let mut removed = false;
1157
1158 if let Some(max_file_size) = policy.computed_max_file_size() {
1160 let count = txn.execute(
1161 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1162 (max_file_size,),
1163 )?;
1164
1165 if count > 0 {
1166 removed = true;
1167 }
1168 }
1169
1170 if let Some(last_access_expiry) = policy.last_access_expiry {
1172 let current_timestamp = time_to_timestamp(current_time);
1173 let expiry_secs = last_access_expiry.as_secs();
1174 let count = txn.execute(
1175 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1176 (current_timestamp, expiry_secs),
1177 )?;
1178
1179 if count > 0 {
1180 removed = true;
1181 }
1182 }
1183
1184 if let Some(max_cache_size) = policy.max_cache_size {
1186 let cache_size = txn
1189 .query_row(
1190 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1191 (),
1192 |row| {
1193 row.get::<_, Option<u64>>(0)
1195 },
1196 )?
1197 .unwrap_or_default();
1198
1199 if cache_size > max_cache_size {
1201 let mut cached_stmt = txn.prepare_cached(
1203 "SELECT rowid, length(data) FROM media \
1204 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1205 )?;
1206 let content_sizes = cached_stmt
1207 .query(())?
1208 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1209
1210 let mut accumulated_items_size = 0u64;
1211 let mut limit_reached = false;
1212 let mut rows_to_remove = Vec::new();
1213
1214 for result in content_sizes {
1215 let (row_id, size) = match result {
1216 Ok(content_size) => content_size,
1217 Err(error) => {
1218 return Err(error.into());
1219 }
1220 };
1221
1222 if limit_reached {
1223 rows_to_remove.push(row_id);
1224 continue;
1225 }
1226
1227 match accumulated_items_size.checked_add(size) {
1228 Some(acc) if acc > max_cache_size => {
1229 limit_reached = true;
1231 rows_to_remove.push(row_id);
1232 }
1233 Some(acc) => accumulated_items_size = acc,
1234 None => {
1235 limit_reached = true;
1238 rows_to_remove.push(row_id);
1239 }
1240 };
1241 }
1242
1243 if !rows_to_remove.is_empty() {
1244 removed = true;
1245 }
1246
1247 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1248 let sql_params = repeat_vars(row_ids.len());
1249 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1250 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1251 Ok(Vec::<()>::new())
1252 })?;
1253 }
1254 }
1255
1256 txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1257
1258 Ok(removed)
1259 })
1260 .await?;
1261
1262 if removed {
1265 conn.vacuum().await?;
1266 }
1267
1268 Ok(())
1269 }
1270
1271 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1272 let conn = self.acquire().await?;
1273 conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1274 }
1275}
1276
1277async fn with_immediate_transaction<
1282 T: Send + 'static,
1283 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1284>(
1285 conn: SqliteAsyncConn,
1286 f: F,
1287) -> Result<T, Error> {
1288 conn.interact(move |conn| -> Result<T, Error> {
1289 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1293
1294 let code = || -> Result<T, Error> {
1295 let txn = conn.transaction()?;
1296 let res = f(&txn)?;
1297 txn.commit()?;
1298 Ok(res)
1299 };
1300
1301 let res = code();
1302
1303 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1306
1307 res
1308 })
1309 .await
1310 .unwrap()
1312}
1313
1314fn insert_chunk(
1315 txn: &Transaction<'_>,
1316 room_id: &Key,
1317 previous: Option<u64>,
1318 new: u64,
1319 next: Option<u64>,
1320 type_str: &str,
1321) -> rusqlite::Result<()> {
1322 txn.execute(
1324 r#"
1325 INSERT INTO linked_chunks(id, room_id, previous, next, type)
1326 VALUES (?, ?, ?, ?, ?)
1327 "#,
1328 (new, room_id, previous, next, type_str),
1329 )?;
1330
1331 if let Some(previous) = previous {
1333 txn.execute(
1334 r#"
1335 UPDATE linked_chunks
1336 SET next = ?
1337 WHERE id = ? AND room_id = ?
1338 "#,
1339 (new, previous, room_id),
1340 )?;
1341 }
1342
1343 if let Some(next) = next {
1345 txn.execute(
1346 r#"
1347 UPDATE linked_chunks
1348 SET previous = ?
1349 WHERE id = ? AND room_id = ?
1350 "#,
1351 (new, next, room_id),
1352 )?;
1353 }
1354
1355 Ok(())
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use std::{
1361 path::PathBuf,
1362 sync::atomic::{AtomicU32, Ordering::SeqCst},
1363 time::Duration,
1364 };
1365
1366 use assert_matches::assert_matches;
1367 use matrix_sdk_base::{
1368 event_cache::{
1369 store::{
1370 integration_tests::{check_test_event, make_test_event},
1371 media::IgnoreMediaRetentionPolicy,
1372 EventCacheStore, EventCacheStoreError,
1373 },
1374 Gap,
1375 },
1376 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1377 event_cache_store_media_integration_tests,
1378 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1379 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1380 };
1381 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1382 use once_cell::sync::Lazy;
1383 use ruma::{events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1384 use tempfile::{tempdir, TempDir};
1385
1386 use super::SqliteEventCacheStore;
1387 use crate::{utils::SqliteAsyncConnExt, SqliteStoreConfig};
1388
1389 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1390 static NUM: AtomicU32 = AtomicU32::new(0);
1391
1392 fn new_event_cache_store_workspace() -> PathBuf {
1393 let name = NUM.fetch_add(1, SeqCst).to_string();
1394 TMP_DIR.path().join(name)
1395 }
1396
1397 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1398 let tmpdir_path = new_event_cache_store_workspace();
1399
1400 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1401
1402 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1403 }
1404
1405 event_cache_store_integration_tests!();
1406 event_cache_store_integration_tests_time!();
1407 event_cache_store_media_integration_tests!(with_media_size_tests);
1408
1409 async fn get_event_cache_store_content_sorted_by_last_access(
1410 event_cache_store: &SqliteEventCacheStore,
1411 ) -> Vec<Vec<u8>> {
1412 let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1413 sqlite_db
1414 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1415 stmt.query(())?.mapped(|row| row.get(0)).collect()
1416 })
1417 .await
1418 .expect("querying media cache content by last access failed")
1419 }
1420
1421 #[async_test]
1422 async fn test_pool_size() {
1423 let tmpdir_path = new_event_cache_store_workspace();
1424 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1425
1426 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1427
1428 assert_eq!(store.pool.status().max_size, 42);
1429 }
1430
1431 #[async_test]
1432 async fn test_last_access() {
1433 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1434 let uri = mxc_uri!("mxc://localhost/media");
1435 let file_request = MediaRequestParameters {
1436 source: MediaSource::Plain(uri.to_owned()),
1437 format: MediaFormat::File,
1438 };
1439 let thumbnail_request = MediaRequestParameters {
1440 source: MediaSource::Plain(uri.to_owned()),
1441 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1442 Method::Crop,
1443 uint!(100),
1444 uint!(100),
1445 )),
1446 };
1447
1448 let content: Vec<u8> = "hello world".into();
1449 let thumbnail_content: Vec<u8> = "hello…".into();
1450
1451 event_cache_store
1453 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1454 .await
1455 .expect("adding file failed");
1456
1457 tokio::time::sleep(Duration::from_secs(3)).await;
1460
1461 event_cache_store
1462 .add_media_content(
1463 &thumbnail_request,
1464 thumbnail_content.clone(),
1465 IgnoreMediaRetentionPolicy::No,
1466 )
1467 .await
1468 .expect("adding thumbnail failed");
1469
1470 let contents =
1472 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1473
1474 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1475 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1476 assert_eq!(contents[1], content, "file is not second-to-last access");
1477
1478 tokio::time::sleep(Duration::from_secs(3)).await;
1481
1482 let _ = event_cache_store
1484 .get_media_content(&file_request)
1485 .await
1486 .expect("getting file failed")
1487 .expect("file is missing");
1488
1489 let contents =
1491 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1492
1493 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1494 assert_eq!(contents[0], content, "file is not last access");
1495 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1496 }
1497
1498 #[async_test]
1499 async fn test_linked_chunk_new_items_chunk() {
1500 let store = get_event_cache_store().await.expect("creating cache store failed");
1501
1502 let room_id = &DEFAULT_TEST_ROOM_ID;
1503
1504 store
1505 .handle_linked_chunk_updates(
1506 room_id,
1507 vec![
1508 Update::NewItemsChunk {
1509 previous: None,
1510 new: ChunkIdentifier::new(42),
1511 next: None, },
1513 Update::NewItemsChunk {
1514 previous: Some(ChunkIdentifier::new(42)),
1515 new: ChunkIdentifier::new(13),
1516 next: Some(ChunkIdentifier::new(37)), },
1519 Update::NewItemsChunk {
1520 previous: Some(ChunkIdentifier::new(13)),
1521 new: ChunkIdentifier::new(37),
1522 next: None,
1523 },
1524 ],
1525 )
1526 .await
1527 .unwrap();
1528
1529 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1530
1531 assert_eq!(chunks.len(), 3);
1532
1533 {
1534 let c = chunks.remove(0);
1536 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1537 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1538 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1539 assert_matches!(c.content, ChunkContent::Items(events) => {
1540 assert!(events.is_empty());
1541 });
1542
1543 let c = chunks.remove(0);
1544 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1545 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1546 assert_eq!(c.next, None);
1547 assert_matches!(c.content, ChunkContent::Items(events) => {
1548 assert!(events.is_empty());
1549 });
1550
1551 let c = chunks.remove(0);
1552 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1553 assert_eq!(c.previous, None);
1554 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1555 assert_matches!(c.content, ChunkContent::Items(events) => {
1556 assert!(events.is_empty());
1557 });
1558 }
1559 }
1560
1561 #[async_test]
1562 async fn test_linked_chunk_new_gap_chunk() {
1563 let store = get_event_cache_store().await.expect("creating cache store failed");
1564
1565 let room_id = &DEFAULT_TEST_ROOM_ID;
1566
1567 store
1568 .handle_linked_chunk_updates(
1569 room_id,
1570 vec![Update::NewGapChunk {
1571 previous: None,
1572 new: ChunkIdentifier::new(42),
1573 next: None,
1574 gap: Gap { prev_token: "raclette".to_owned() },
1575 }],
1576 )
1577 .await
1578 .unwrap();
1579
1580 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1581
1582 assert_eq!(chunks.len(), 1);
1583
1584 let c = chunks.remove(0);
1586 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1587 assert_eq!(c.previous, None);
1588 assert_eq!(c.next, None);
1589 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1590 assert_eq!(gap.prev_token, "raclette");
1591 });
1592 }
1593
1594 #[async_test]
1595 async fn test_linked_chunk_replace_item() {
1596 let store = get_event_cache_store().await.expect("creating cache store failed");
1597
1598 let room_id = &DEFAULT_TEST_ROOM_ID;
1599
1600 store
1601 .handle_linked_chunk_updates(
1602 room_id,
1603 vec![
1604 Update::NewItemsChunk {
1605 previous: None,
1606 new: ChunkIdentifier::new(42),
1607 next: None,
1608 },
1609 Update::PushItems {
1610 at: Position::new(ChunkIdentifier::new(42), 0),
1611 items: vec![
1612 make_test_event(room_id, "hello"),
1613 make_test_event(room_id, "world"),
1614 ],
1615 },
1616 Update::ReplaceItem {
1617 at: Position::new(ChunkIdentifier::new(42), 1),
1618 item: make_test_event(room_id, "yolo"),
1619 },
1620 ],
1621 )
1622 .await
1623 .unwrap();
1624
1625 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1626
1627 assert_eq!(chunks.len(), 1);
1628
1629 let c = chunks.remove(0);
1630 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1631 assert_eq!(c.previous, None);
1632 assert_eq!(c.next, None);
1633 assert_matches!(c.content, ChunkContent::Items(events) => {
1634 assert_eq!(events.len(), 2);
1635 check_test_event(&events[0], "hello");
1636 check_test_event(&events[1], "yolo");
1637 });
1638 }
1639
1640 #[async_test]
1641 async fn test_linked_chunk_remove_chunk() {
1642 let store = get_event_cache_store().await.expect("creating cache store failed");
1643
1644 let room_id = &DEFAULT_TEST_ROOM_ID;
1645
1646 store
1647 .handle_linked_chunk_updates(
1648 room_id,
1649 vec![
1650 Update::NewGapChunk {
1651 previous: None,
1652 new: ChunkIdentifier::new(42),
1653 next: None,
1654 gap: Gap { prev_token: "raclette".to_owned() },
1655 },
1656 Update::NewGapChunk {
1657 previous: Some(ChunkIdentifier::new(42)),
1658 new: ChunkIdentifier::new(43),
1659 next: None,
1660 gap: Gap { prev_token: "fondue".to_owned() },
1661 },
1662 Update::NewGapChunk {
1663 previous: Some(ChunkIdentifier::new(43)),
1664 new: ChunkIdentifier::new(44),
1665 next: None,
1666 gap: Gap { prev_token: "tartiflette".to_owned() },
1667 },
1668 Update::RemoveChunk(ChunkIdentifier::new(43)),
1669 ],
1670 )
1671 .await
1672 .unwrap();
1673
1674 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1675
1676 assert_eq!(chunks.len(), 2);
1677
1678 let c = chunks.remove(0);
1680 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1681 assert_eq!(c.previous, None);
1682 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1683 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1684 assert_eq!(gap.prev_token, "raclette");
1685 });
1686
1687 let c = chunks.remove(0);
1688 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1689 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1690 assert_eq!(c.next, None);
1691 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1692 assert_eq!(gap.prev_token, "tartiflette");
1693 });
1694
1695 let gaps = store
1697 .acquire()
1698 .await
1699 .unwrap()
1700 .with_transaction(|txn| -> rusqlite::Result<_> {
1701 let mut gaps = Vec::new();
1702 for data in txn
1703 .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
1704 .query_map((), |row| row.get::<_, u64>(0))?
1705 {
1706 gaps.push(data?);
1707 }
1708 Ok(gaps)
1709 })
1710 .await
1711 .unwrap();
1712
1713 assert_eq!(gaps, vec![42, 44]);
1714 }
1715
1716 #[async_test]
1717 async fn test_linked_chunk_push_items() {
1718 let store = get_event_cache_store().await.expect("creating cache store failed");
1719
1720 let room_id = &DEFAULT_TEST_ROOM_ID;
1721
1722 store
1723 .handle_linked_chunk_updates(
1724 room_id,
1725 vec![
1726 Update::NewItemsChunk {
1727 previous: None,
1728 new: ChunkIdentifier::new(42),
1729 next: None,
1730 },
1731 Update::PushItems {
1732 at: Position::new(ChunkIdentifier::new(42), 0),
1733 items: vec![
1734 make_test_event(room_id, "hello"),
1735 make_test_event(room_id, "world"),
1736 ],
1737 },
1738 Update::PushItems {
1739 at: Position::new(ChunkIdentifier::new(42), 2),
1740 items: vec![make_test_event(room_id, "who?")],
1741 },
1742 ],
1743 )
1744 .await
1745 .unwrap();
1746
1747 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1748
1749 assert_eq!(chunks.len(), 1);
1750
1751 let c = chunks.remove(0);
1752 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1753 assert_eq!(c.previous, None);
1754 assert_eq!(c.next, None);
1755 assert_matches!(c.content, ChunkContent::Items(events) => {
1756 assert_eq!(events.len(), 3);
1757
1758 check_test_event(&events[0], "hello");
1759 check_test_event(&events[1], "world");
1760 check_test_event(&events[2], "who?");
1761 });
1762 }
1763
1764 #[async_test]
1765 async fn test_linked_chunk_remove_item() {
1766 let store = get_event_cache_store().await.expect("creating cache store failed");
1767
1768 let room_id = *DEFAULT_TEST_ROOM_ID;
1769
1770 store
1771 .handle_linked_chunk_updates(
1772 room_id,
1773 vec![
1774 Update::NewItemsChunk {
1775 previous: None,
1776 new: ChunkIdentifier::new(42),
1777 next: None,
1778 },
1779 Update::PushItems {
1780 at: Position::new(ChunkIdentifier::new(42), 0),
1781 items: vec![
1782 make_test_event(room_id, "hello"),
1783 make_test_event(room_id, "world"),
1784 ],
1785 },
1786 Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1787 ],
1788 )
1789 .await
1790 .unwrap();
1791
1792 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1793
1794 assert_eq!(chunks.len(), 1);
1795
1796 let c = chunks.remove(0);
1797 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1798 assert_eq!(c.previous, None);
1799 assert_eq!(c.next, None);
1800 assert_matches!(c.content, ChunkContent::Items(events) => {
1801 assert_eq!(events.len(), 1);
1802 check_test_event(&events[0], "world");
1803 });
1804
1805 let num_rows: u64 = store
1807 .acquire()
1808 .await
1809 .unwrap()
1810 .with_transaction(move |txn| {
1811 txn.query_row(
1812 "SELECT COUNT(*) FROM events WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1813 (room_id.as_bytes(),),
1814 |row| row.get(0),
1815 )
1816 })
1817 .await
1818 .unwrap();
1819 assert_eq!(num_rows, 1);
1820 }
1821
1822 #[async_test]
1823 async fn test_linked_chunk_detach_last_items() {
1824 let store = get_event_cache_store().await.expect("creating cache store failed");
1825
1826 let room_id = *DEFAULT_TEST_ROOM_ID;
1827
1828 store
1829 .handle_linked_chunk_updates(
1830 room_id,
1831 vec![
1832 Update::NewItemsChunk {
1833 previous: None,
1834 new: ChunkIdentifier::new(42),
1835 next: None,
1836 },
1837 Update::PushItems {
1838 at: Position::new(ChunkIdentifier::new(42), 0),
1839 items: vec![
1840 make_test_event(room_id, "hello"),
1841 make_test_event(room_id, "world"),
1842 make_test_event(room_id, "howdy"),
1843 ],
1844 },
1845 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1846 ],
1847 )
1848 .await
1849 .unwrap();
1850
1851 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1852
1853 assert_eq!(chunks.len(), 1);
1854
1855 let c = chunks.remove(0);
1856 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1857 assert_eq!(c.previous, None);
1858 assert_eq!(c.next, None);
1859 assert_matches!(c.content, ChunkContent::Items(events) => {
1860 assert_eq!(events.len(), 1);
1861 check_test_event(&events[0], "hello");
1862 });
1863 }
1864
1865 #[async_test]
1866 async fn test_linked_chunk_start_end_reattach_items() {
1867 let store = get_event_cache_store().await.expect("creating cache store failed");
1868
1869 let room_id = *DEFAULT_TEST_ROOM_ID;
1870
1871 store
1875 .handle_linked_chunk_updates(
1876 room_id,
1877 vec![
1878 Update::NewItemsChunk {
1879 previous: None,
1880 new: ChunkIdentifier::new(42),
1881 next: None,
1882 },
1883 Update::PushItems {
1884 at: Position::new(ChunkIdentifier::new(42), 0),
1885 items: vec![
1886 make_test_event(room_id, "hello"),
1887 make_test_event(room_id, "world"),
1888 make_test_event(room_id, "howdy"),
1889 ],
1890 },
1891 Update::StartReattachItems,
1892 Update::EndReattachItems,
1893 ],
1894 )
1895 .await
1896 .unwrap();
1897
1898 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1899
1900 assert_eq!(chunks.len(), 1);
1901
1902 let c = chunks.remove(0);
1903 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1904 assert_eq!(c.previous, None);
1905 assert_eq!(c.next, None);
1906 assert_matches!(c.content, ChunkContent::Items(events) => {
1907 assert_eq!(events.len(), 3);
1908 check_test_event(&events[0], "hello");
1909 check_test_event(&events[1], "world");
1910 check_test_event(&events[2], "howdy");
1911 });
1912 }
1913
1914 #[async_test]
1915 async fn test_linked_chunk_clear() {
1916 let store = get_event_cache_store().await.expect("creating cache store failed");
1917
1918 let room_id = *DEFAULT_TEST_ROOM_ID;
1919 let event_0 = make_test_event(room_id, "hello");
1920 let event_1 = make_test_event(room_id, "world");
1921 let event_2 = make_test_event(room_id, "howdy");
1922
1923 store
1924 .handle_linked_chunk_updates(
1925 room_id,
1926 vec![
1927 Update::NewItemsChunk {
1928 previous: None,
1929 new: ChunkIdentifier::new(42),
1930 next: None,
1931 },
1932 Update::NewGapChunk {
1933 previous: Some(ChunkIdentifier::new(42)),
1934 new: ChunkIdentifier::new(54),
1935 next: None,
1936 gap: Gap { prev_token: "fondue".to_owned() },
1937 },
1938 Update::PushItems {
1939 at: Position::new(ChunkIdentifier::new(42), 0),
1940 items: vec![event_0.clone(), event_1, event_2],
1941 },
1942 Update::Clear,
1943 ],
1944 )
1945 .await
1946 .unwrap();
1947
1948 let chunks = store.load_all_chunks(room_id).await.unwrap();
1949 assert!(chunks.is_empty());
1950
1951 store
1953 .acquire()
1954 .await
1955 .unwrap()
1956 .with_transaction(|txn| -> rusqlite::Result<_> {
1957 let num_gaps = txn
1958 .prepare("SELECT COUNT(chunk_id) FROM gaps ORDER BY chunk_id")?
1959 .query_row((), |row| row.get::<_, u64>(0))?;
1960 assert_eq!(num_gaps, 0);
1961
1962 let num_events = txn
1963 .prepare("SELECT COUNT(event_id) FROM events ORDER BY chunk_id")?
1964 .query_row((), |row| row.get::<_, u64>(0))?;
1965 assert_eq!(num_events, 0);
1966
1967 Ok(())
1968 })
1969 .await
1970 .unwrap();
1971
1972 store
1974 .handle_linked_chunk_updates(
1975 room_id,
1976 vec![
1977 Update::NewItemsChunk {
1978 previous: None,
1979 new: ChunkIdentifier::new(42),
1980 next: None,
1981 },
1982 Update::PushItems {
1983 at: Position::new(ChunkIdentifier::new(42), 0),
1984 items: vec![event_0],
1985 },
1986 ],
1987 )
1988 .await
1989 .unwrap();
1990 }
1991
1992 #[async_test]
1993 async fn test_linked_chunk_multiple_rooms() {
1994 let store = get_event_cache_store().await.expect("creating cache store failed");
1995
1996 let room1 = room_id!("!realcheeselovers:raclette.fr");
1997 let room2 = room_id!("!realcheeselovers:fondue.ch");
1998
1999 store
2003 .handle_linked_chunk_updates(
2004 room1,
2005 vec![
2006 Update::NewItemsChunk {
2007 previous: None,
2008 new: ChunkIdentifier::new(42),
2009 next: None,
2010 },
2011 Update::PushItems {
2012 at: Position::new(ChunkIdentifier::new(42), 0),
2013 items: vec![
2014 make_test_event(room1, "best cheese is raclette"),
2015 make_test_event(room1, "obviously"),
2016 ],
2017 },
2018 ],
2019 )
2020 .await
2021 .unwrap();
2022
2023 store
2024 .handle_linked_chunk_updates(
2025 room2,
2026 vec![
2027 Update::NewItemsChunk {
2028 previous: None,
2029 new: ChunkIdentifier::new(42),
2030 next: None,
2031 },
2032 Update::PushItems {
2033 at: Position::new(ChunkIdentifier::new(42), 0),
2034 items: vec![make_test_event(room1, "beaufort is the best")],
2035 },
2036 ],
2037 )
2038 .await
2039 .unwrap();
2040
2041 let mut chunks_room1 = store.load_all_chunks(room1).await.unwrap();
2043 assert_eq!(chunks_room1.len(), 1);
2044
2045 let c = chunks_room1.remove(0);
2046 assert_matches!(c.content, ChunkContent::Items(events) => {
2047 assert_eq!(events.len(), 2);
2048 check_test_event(&events[0], "best cheese is raclette");
2049 check_test_event(&events[1], "obviously");
2050 });
2051
2052 let mut chunks_room2 = store.load_all_chunks(room2).await.unwrap();
2054 assert_eq!(chunks_room2.len(), 1);
2055
2056 let c = chunks_room2.remove(0);
2057 assert_matches!(c.content, ChunkContent::Items(events) => {
2058 assert_eq!(events.len(), 1);
2059 check_test_event(&events[0], "beaufort is the best");
2060 });
2061 }
2062
2063 #[async_test]
2064 async fn test_linked_chunk_update_is_a_transaction() {
2065 let store = get_event_cache_store().await.expect("creating cache store failed");
2066
2067 let room_id = *DEFAULT_TEST_ROOM_ID;
2068
2069 let err = store
2072 .handle_linked_chunk_updates(
2073 room_id,
2074 vec![
2075 Update::NewItemsChunk {
2076 previous: None,
2077 new: ChunkIdentifier::new(42),
2078 next: None,
2079 },
2080 Update::NewItemsChunk {
2081 previous: None,
2082 new: ChunkIdentifier::new(42),
2083 next: None,
2084 },
2085 ],
2086 )
2087 .await
2088 .unwrap_err();
2089
2090 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2092 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2093 });
2094
2095 let chunks = store.load_all_chunks(room_id).await.unwrap();
2099 assert!(chunks.is_empty());
2100 }
2101
2102 #[async_test]
2103 async fn test_filter_duplicate_events_no_events() {
2104 let store = get_event_cache_store().await.expect("creating cache store failed");
2105
2106 let room_id = *DEFAULT_TEST_ROOM_ID;
2107 let duplicates = store.filter_duplicated_events(room_id, Vec::new()).await.unwrap();
2108 assert!(duplicates.is_empty());
2109 }
2110
2111 #[async_test]
2112 async fn test_load_last_chunk() {
2113 let room_id = room_id!("!r0:matrix.org");
2114 let event = |msg: &str| make_test_event(room_id, msg);
2115 let store = get_event_cache_store().await.expect("creating cache store failed");
2116
2117 {
2119 let (last_chunk, chunk_identifier_generator) =
2120 store.load_last_chunk(room_id).await.unwrap();
2121
2122 assert!(last_chunk.is_none());
2123 assert_eq!(chunk_identifier_generator.current(), 0);
2124 }
2125
2126 {
2128 store
2129 .handle_linked_chunk_updates(
2130 room_id,
2131 vec![
2132 Update::NewItemsChunk {
2133 previous: None,
2134 new: ChunkIdentifier::new(42),
2135 next: None,
2136 },
2137 Update::PushItems {
2138 at: Position::new(ChunkIdentifier::new(42), 0),
2139 items: vec![event("saucisse de morteau"), event("comté")],
2140 },
2141 ],
2142 )
2143 .await
2144 .unwrap();
2145
2146 let (last_chunk, chunk_identifier_generator) =
2147 store.load_last_chunk(room_id).await.unwrap();
2148
2149 assert_matches!(last_chunk, Some(last_chunk) => {
2150 assert_eq!(last_chunk.identifier, 42);
2151 assert!(last_chunk.previous.is_none());
2152 assert!(last_chunk.next.is_none());
2153 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2154 assert_eq!(items.len(), 2);
2155 check_test_event(&items[0], "saucisse de morteau");
2156 check_test_event(&items[1], "comté");
2157 });
2158 });
2159 assert_eq!(chunk_identifier_generator.current(), 42);
2160 }
2161
2162 {
2164 store
2165 .handle_linked_chunk_updates(
2166 room_id,
2167 vec![
2168 Update::NewItemsChunk {
2169 previous: Some(ChunkIdentifier::new(42)),
2170 new: ChunkIdentifier::new(7),
2171 next: None,
2172 },
2173 Update::PushItems {
2174 at: Position::new(ChunkIdentifier::new(7), 0),
2175 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2176 },
2177 ],
2178 )
2179 .await
2180 .unwrap();
2181
2182 let (last_chunk, chunk_identifier_generator) =
2183 store.load_last_chunk(room_id).await.unwrap();
2184
2185 assert_matches!(last_chunk, Some(last_chunk) => {
2186 assert_eq!(last_chunk.identifier, 7);
2187 assert_matches!(last_chunk.previous, Some(previous) => {
2188 assert_eq!(previous, 42);
2189 });
2190 assert!(last_chunk.next.is_none());
2191 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2192 assert_eq!(items.len(), 3);
2193 check_test_event(&items[0], "fondue");
2194 check_test_event(&items[1], "gruyère");
2195 check_test_event(&items[2], "mont d'or");
2196 });
2197 });
2198 assert_eq!(chunk_identifier_generator.current(), 42);
2199 }
2200 }
2201
2202 #[async_test]
2203 async fn test_load_last_chunk_with_a_cycle() {
2204 let room_id = room_id!("!r0:matrix.org");
2205 let store = get_event_cache_store().await.expect("creating cache store failed");
2206
2207 store
2208 .handle_linked_chunk_updates(
2209 room_id,
2210 vec![
2211 Update::NewItemsChunk {
2212 previous: None,
2213 new: ChunkIdentifier::new(0),
2214 next: None,
2215 },
2216 Update::NewItemsChunk {
2217 previous: Some(ChunkIdentifier::new(0)),
2221 new: ChunkIdentifier::new(1),
2222 next: Some(ChunkIdentifier::new(0)),
2223 },
2224 ],
2225 )
2226 .await
2227 .unwrap();
2228
2229 store.load_last_chunk(room_id).await.unwrap_err();
2230 }
2231
2232 #[async_test]
2233 async fn test_load_previous_chunk() {
2234 let room_id = room_id!("!r0:matrix.org");
2235 let event = |msg: &str| make_test_event(room_id, msg);
2236 let store = get_event_cache_store().await.expect("creating cache store failed");
2237
2238 {
2241 let previous_chunk =
2242 store.load_previous_chunk(room_id, ChunkIdentifier::new(153)).await.unwrap();
2243
2244 assert!(previous_chunk.is_none());
2245 }
2246
2247 {
2250 store
2251 .handle_linked_chunk_updates(
2252 room_id,
2253 vec![Update::NewItemsChunk {
2254 previous: None,
2255 new: ChunkIdentifier::new(42),
2256 next: None,
2257 }],
2258 )
2259 .await
2260 .unwrap();
2261
2262 let previous_chunk =
2263 store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2264
2265 assert!(previous_chunk.is_none());
2266 }
2267
2268 {
2270 store
2271 .handle_linked_chunk_updates(
2272 room_id,
2273 vec![
2274 Update::NewItemsChunk {
2276 previous: None,
2277 new: ChunkIdentifier::new(7),
2278 next: Some(ChunkIdentifier::new(42)),
2279 },
2280 Update::PushItems {
2281 at: Position::new(ChunkIdentifier::new(7), 0),
2282 items: vec![event("brigand du jorat"), event("morbier")],
2283 },
2284 ],
2285 )
2286 .await
2287 .unwrap();
2288
2289 let previous_chunk =
2290 store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2291
2292 assert_matches!(previous_chunk, Some(previous_chunk) => {
2293 assert_eq!(previous_chunk.identifier, 7);
2294 assert!(previous_chunk.previous.is_none());
2295 assert_matches!(previous_chunk.next, Some(next) => {
2296 assert_eq!(next, 42);
2297 });
2298 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2299 assert_eq!(items.len(), 2);
2300 check_test_event(&items[0], "brigand du jorat");
2301 check_test_event(&items[1], "morbier");
2302 });
2303 });
2304 }
2305 }
2306}
2307
2308#[cfg(test)]
2309mod encrypted_tests {
2310 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2311
2312 use matrix_sdk_base::{
2313 event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2314 event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2315 };
2316 use once_cell::sync::Lazy;
2317 use tempfile::{tempdir, TempDir};
2318
2319 use super::SqliteEventCacheStore;
2320
2321 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2322 static NUM: AtomicU32 = AtomicU32::new(0);
2323
2324 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2325 let name = NUM.fetch_add(1, SeqCst).to_string();
2326 let tmpdir_path = TMP_DIR.path().join(name);
2327
2328 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2329
2330 Ok(SqliteEventCacheStore::open(
2331 tmpdir_path.to_str().unwrap(),
2332 Some("default_test_password"),
2333 )
2334 .await
2335 .unwrap())
2336 }
2337
2338 event_cache_store_integration_tests!();
2339 event_cache_store_integration_tests_time!();
2340 event_cache_store_media_integration_tests!();
2341}