1use std::{borrow::Cow, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 event_cache::{
23 store::{
24 media::{
25 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
26 MediaService,
27 },
28 EventCacheStore,
29 },
30 Event, Gap,
31 },
32 linked_chunk::{ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, RawChunk, Update},
33 media::{MediaRequestParameters, UniqueKey},
34};
35use matrix_sdk_store_encryption::StoreCipher;
36use ruma::{time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri, OwnedEventId, RoomId};
37use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
38use tokio::fs;
39use tracing::{debug, trace};
40
41use crate::{
42 error::{Error, Result},
43 utils::{
44 repeat_vars, time_to_timestamp, Key, SqliteAsyncConnExt, SqliteKeyValueStoreAsyncConnExt,
45 SqliteKeyValueStoreConnExt, SqliteTransactionExt,
46 },
47 OpenStoreError,
48};
49
50mod keys {
51 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
53 pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
54
55 pub const LINKED_CHUNKS: &str = "linked_chunks";
57 pub const MEDIA: &str = "media";
58}
59
60const DATABASE_VERSION: u8 = 5;
66
67const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
70const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
73
74#[derive(Clone)]
76pub struct SqliteEventCacheStore {
77 store_cipher: Option<Arc<StoreCipher>>,
78 pool: SqlitePool,
79 media_service: MediaService,
80}
81
82#[cfg(not(tarpaulin_include))]
83impl fmt::Debug for SqliteEventCacheStore {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
86 }
87}
88
89impl SqliteEventCacheStore {
90 pub async fn open(
93 path: impl AsRef<Path>,
94 passphrase: Option<&str>,
95 ) -> Result<Self, OpenStoreError> {
96 let pool = create_pool(path.as_ref()).await?;
97
98 Self::open_with_pool(pool, passphrase).await
99 }
100
101 pub async fn open_with_pool(
104 pool: SqlitePool,
105 passphrase: Option<&str>,
106 ) -> Result<Self, OpenStoreError> {
107 let conn = pool.get().await?;
108 conn.set_journal_size_limit().await?;
109
110 let version = conn.db_version().await?;
111 run_migrations(&conn, version).await?;
112 conn.optimize().await?;
113
114 let store_cipher = match passphrase {
115 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
116 None => None,
117 };
118
119 let media_service = MediaService::new();
120 let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
121 let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
122 media_service.restore(media_retention_policy, last_media_cleanup_time);
123
124 Ok(Self { store_cipher, pool, media_service })
125 }
126
127 fn encode_value(&self, value: Vec<u8>) -> Result<Vec<u8>> {
128 if let Some(key) = &self.store_cipher {
129 let encrypted = key.encrypt_value_data(value)?;
130 Ok(rmp_serde::to_vec_named(&encrypted)?)
131 } else {
132 Ok(value)
133 }
134 }
135
136 fn decode_value<'a>(&self, value: &'a [u8]) -> Result<Cow<'a, [u8]>> {
137 if let Some(key) = &self.store_cipher {
138 let encrypted = rmp_serde::from_slice(value)?;
139 let decrypted = key.decrypt_value_data(encrypted)?;
140 Ok(Cow::Owned(decrypted))
141 } else {
142 Ok(Cow::Borrowed(value))
143 }
144 }
145
146 fn encode_key(&self, table_name: &str, key: impl AsRef<[u8]>) -> Key {
147 let bytes = key.as_ref();
148 if let Some(store_cipher) = &self.store_cipher {
149 Key::Hashed(store_cipher.hash_key(table_name, bytes))
150 } else {
151 Key::Plain(bytes.to_owned())
152 }
153 }
154
155 async fn acquire(&self) -> Result<SqliteAsyncConn> {
156 Ok(self.pool.get().await?)
157 }
158
159 fn map_row_to_chunk(
160 row: &rusqlite::Row<'_>,
161 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
162 Ok((
163 row.get::<_, u64>(0)?,
164 row.get::<_, Option<u64>>(1)?,
165 row.get::<_, Option<u64>>(2)?,
166 row.get::<_, String>(3)?,
167 ))
168 }
169}
170
171trait TransactionExtForLinkedChunks {
172 fn rebuild_chunk(
173 &self,
174 store: &SqliteEventCacheStore,
175 room_id: &Key,
176 previous: Option<u64>,
177 index: u64,
178 next: Option<u64>,
179 chunk_type: &str,
180 ) -> Result<RawChunk<Event, Gap>>;
181
182 fn load_gap_content(
183 &self,
184 store: &SqliteEventCacheStore,
185 room_id: &Key,
186 chunk_id: ChunkIdentifier,
187 ) -> Result<Gap>;
188
189 fn load_events_content(
190 &self,
191 store: &SqliteEventCacheStore,
192 room_id: &Key,
193 chunk_id: ChunkIdentifier,
194 ) -> Result<Vec<Event>>;
195}
196
197impl TransactionExtForLinkedChunks for Transaction<'_> {
198 fn rebuild_chunk(
199 &self,
200 store: &SqliteEventCacheStore,
201 room_id: &Key,
202 previous: Option<u64>,
203 id: u64,
204 next: Option<u64>,
205 chunk_type: &str,
206 ) -> Result<RawChunk<Event, Gap>> {
207 let previous = previous.map(ChunkIdentifier::new);
208 let next = next.map(ChunkIdentifier::new);
209 let id = ChunkIdentifier::new(id);
210
211 match chunk_type {
212 CHUNK_TYPE_GAP_TYPE_STRING => {
213 let gap = self.load_gap_content(store, room_id, id)?;
215 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
216 }
217
218 CHUNK_TYPE_EVENT_TYPE_STRING => {
219 let events = self.load_events_content(store, room_id, id)?;
221 Ok(RawChunk {
222 content: ChunkContent::Items(events),
223 previous,
224 identifier: id,
225 next,
226 })
227 }
228
229 other => {
230 Err(Error::InvalidData {
232 details: format!("a linked chunk has an unknown type {other}"),
233 })
234 }
235 }
236 }
237
238 fn load_gap_content(
239 &self,
240 store: &SqliteEventCacheStore,
241 room_id: &Key,
242 chunk_id: ChunkIdentifier,
243 ) -> Result<Gap> {
244 let encoded_prev_token: Vec<u8> = self.query_row(
247 "SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
248 (chunk_id.index(), &room_id),
249 |row| row.get(0),
250 )?;
251 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
252 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
253 Ok(Gap { prev_token })
254 }
255
256 fn load_events_content(
257 &self,
258 store: &SqliteEventCacheStore,
259 room_id: &Key,
260 chunk_id: ChunkIdentifier,
261 ) -> Result<Vec<Event>> {
262 let mut events = Vec::new();
264
265 for event_data in self
266 .prepare(
267 r#"
268 SELECT content FROM events
269 WHERE chunk_id = ? AND room_id = ?
270 ORDER BY position ASC
271 "#,
272 )?
273 .query_map((chunk_id.index(), &room_id), |row| row.get::<_, Vec<u8>>(0))?
274 {
275 let encoded_content = event_data?;
276 let serialized_content = store.decode_value(&encoded_content)?;
277 let event = serde_json::from_slice(&serialized_content)?;
278
279 events.push(event);
280 }
281
282 Ok(events)
283 }
284}
285
286async fn create_pool(path: &Path) -> Result<SqlitePool, OpenStoreError> {
287 fs::create_dir_all(path).await.map_err(OpenStoreError::CreateDir)?;
288 let cfg = deadpool_sqlite::Config::new(path.join("matrix-sdk-event-cache.sqlite3"));
289 Ok(cfg.create_pool(Runtime::Tokio1)?)
290}
291
292async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
294 if version == 0 {
295 debug!("Creating database");
296 } else if version < DATABASE_VERSION {
297 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
298 } else {
299 return Ok(());
300 }
301
302 if version < 1 {
303 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
306 conn.with_transaction(|txn| {
307 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
308 txn.set_db_version(1)
309 })
310 .await?;
311 }
312
313 if version < 2 {
314 conn.with_transaction(|txn| {
315 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
316 txn.set_db_version(2)
317 })
318 .await?;
319 }
320
321 if version < 3 {
322 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
324
325 conn.with_transaction(|txn| {
326 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
327 txn.set_db_version(3)
328 })
329 .await?;
330 }
331
332 if version < 4 {
333 conn.with_transaction(|txn| {
334 txn.execute_batch(include_str!(
335 "../migrations/event_cache_store/004_ignore_policy.sql"
336 ))?;
337 txn.set_db_version(4)
338 })
339 .await?;
340 }
341
342 if version < 5 {
343 conn.with_transaction(|txn| {
344 txn.execute_batch(include_str!(
345 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
346 ))?;
347 txn.set_db_version(5)
348 })
349 .await?;
350 }
351
352 Ok(())
353}
354
355#[async_trait]
356impl EventCacheStore for SqliteEventCacheStore {
357 type Error = Error;
358
359 async fn try_take_leased_lock(
360 &self,
361 lease_duration_ms: u32,
362 key: &str,
363 holder: &str,
364 ) -> Result<bool> {
365 let key = key.to_owned();
366 let holder = holder.to_owned();
367
368 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
369 let expiration = now + lease_duration_ms as u64;
370
371 let num_touched = self
372 .acquire()
373 .await?
374 .with_transaction(move |txn| {
375 txn.execute(
376 "INSERT INTO lease_locks (key, holder, expiration)
377 VALUES (?1, ?2, ?3)
378 ON CONFLICT (key)
379 DO
380 UPDATE SET holder = ?2, expiration = ?3
381 WHERE holder = ?2
382 OR expiration < ?4
383 ",
384 (key, holder, expiration, now),
385 )
386 })
387 .await?;
388
389 Ok(num_touched == 1)
390 }
391
392 async fn handle_linked_chunk_updates(
393 &self,
394 room_id: &RoomId,
395 updates: Vec<Update<Event, Gap>>,
396 ) -> Result<(), Self::Error> {
397 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
400 let room_id = room_id.to_owned();
401 let this = self.clone();
402
403 with_immediate_transaction(self.acquire().await?, move |txn| {
404 for up in updates {
405 match up {
406 Update::NewItemsChunk { previous, new, next } => {
407 let previous = previous.as_ref().map(ChunkIdentifier::index);
408 let new = new.index();
409 let next = next.as_ref().map(ChunkIdentifier::index);
410
411 trace!(
412 %room_id,
413 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
414 );
415
416 insert_chunk(
417 txn,
418 &hashed_room_id,
419 previous,
420 new,
421 next,
422 CHUNK_TYPE_EVENT_TYPE_STRING,
423 )?;
424 }
425
426 Update::NewGapChunk { previous, new, next, gap } => {
427 let serialized = serde_json::to_vec(&gap.prev_token)?;
428 let prev_token = this.encode_value(serialized)?;
429
430 let previous = previous.as_ref().map(ChunkIdentifier::index);
431 let new = new.index();
432 let next = next.as_ref().map(ChunkIdentifier::index);
433
434 trace!(
435 %room_id,
436 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
437 );
438
439 insert_chunk(
441 txn,
442 &hashed_room_id,
443 previous,
444 new,
445 next,
446 CHUNK_TYPE_GAP_TYPE_STRING,
447 )?;
448
449 txn.execute(
451 r#"
452 INSERT INTO gaps(chunk_id, room_id, prev_token)
453 VALUES (?, ?, ?)
454 "#,
455 (new, &hashed_room_id, prev_token),
456 )?;
457 }
458
459 Update::RemoveChunk(chunk_identifier) => {
460 let chunk_id = chunk_identifier.index();
461
462 trace!(%room_id, "removing chunk @ {chunk_id}");
463
464 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
466 "SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
467 (chunk_id, &hashed_room_id),
468 |row| Ok((row.get(0)?, row.get(1)?))
469 )?;
470
471 if let Some(previous) = previous {
473 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
474 }
475
476 if let Some(next) = next {
478 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
479 }
480
481 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, &hashed_room_id))?;
484 }
485
486 Update::PushItems { at, items } => {
487 let chunk_id = at.chunk_identifier().index();
488
489 trace!(%room_id, "pushing {} items @ {chunk_id}", items.len());
490
491 for (i, event) in items.into_iter().enumerate() {
492 let serialized = serde_json::to_vec(&event)?;
493 let content = this.encode_value(serialized)?;
494
495 let event_id = event.event_id().map(|event_id| event_id.to_string());
496 let index = at.index() + i;
497
498 txn.execute(
499 r#"
500 INSERT INTO events(chunk_id, room_id, event_id, content, position)
501 VALUES (?, ?, ?, ?, ?)
502 "#,
503 (chunk_id, &hashed_room_id, event_id, content, index),
504 )?;
505 }
506 }
507
508 Update::ReplaceItem { at, item: event } => {
509 let chunk_id = at.chunk_identifier().index();
510 let index = at.index();
511
512 trace!(%room_id, "replacing item @ {chunk_id}:{index}");
513
514 let serialized = serde_json::to_vec(&event)?;
515 let content = this.encode_value(serialized)?;
516
517 let event_id = event.event_id().map(|event_id| event_id.to_string());
519
520 txn.execute(
521 r#"
522 UPDATE events
523 SET content = ?, event_id = ?
524 WHERE room_id = ? AND chunk_id = ? AND position = ?
525 "#,
526 (content, event_id, &hashed_room_id, chunk_id, index,)
527 )?;
528 }
529
530 Update::RemoveItem { at } => {
531 let chunk_id = at.chunk_identifier().index();
532 let index = at.index();
533
534 trace!(%room_id, "removing item @ {chunk_id}:{index}");
535
536 txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
538
539 txn.execute(
541 r#"
542 UPDATE events
543 SET position = position - 1
544 WHERE room_id = ? AND chunk_id = ? AND position > ?
545 "#,
546 (&hashed_room_id, chunk_id, index)
547 )?;
548
549 }
550
551 Update::DetachLastItems { at } => {
552 let chunk_id = at.chunk_identifier().index();
553 let index = at.index();
554
555 trace!(%room_id, "truncating items >= {chunk_id}:{index}");
556
557 txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
559 }
560
561 Update::Clear => {
562 trace!(%room_id, "clearing items");
563
564 txn.execute(
566 "DELETE FROM linked_chunks WHERE room_id = ?",
567 (&hashed_room_id,),
568 )?;
569 }
570
571 Update::StartReattachItems | Update::EndReattachItems => {
572 }
574 }
575 }
576
577 Ok(())
578 })
579 .await?;
580
581 Ok(())
582 }
583
584 async fn load_all_chunks(
585 &self,
586 room_id: &RoomId,
587 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
588 let room_id = room_id.to_owned();
589 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
590
591 let this = self.clone();
592
593 let result = self
594 .acquire()
595 .await?
596 .with_transaction(move |txn| -> Result<_> {
597 let mut items = Vec::new();
598
599 for data in txn
601 .prepare(
602 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? ORDER BY id",
603 )?
604 .query_map((&hashed_room_id,), Self::map_row_to_chunk)?
605 {
606 let (id, previous, next, chunk_type) = data?;
607 let new = txn.rebuild_chunk(
608 &this,
609 &hashed_room_id,
610 previous,
611 id,
612 next,
613 chunk_type.as_str(),
614 )?;
615 items.push(new);
616 }
617
618 Ok(items)
619 })
620 .await?;
621
622 Ok(result)
623 }
624
625 async fn load_last_chunk(
626 &self,
627 room_id: &RoomId,
628 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
629 let room_id = room_id.to_owned();
630 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
631
632 let this = self.clone();
633
634 self
635 .acquire()
636 .await?
637 .with_transaction(move |txn| -> Result<_> {
638 let (chunk_identifier_generator, number_of_chunks) = txn
640 .prepare(
641 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE room_id = ?"
642 )?
643 .query_row(
644 (&hashed_room_id,),
645 |row| {
646 Ok((
647 row.get::<_, Option<u64>>(0)?,
652 row.get::<_, u64>(1)?,
653 ))
654 }
655 )?;
656
657 let chunk_identifier_generator = match chunk_identifier_generator {
658 Some(last_chunk_identifier) => {
659 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
660 ChunkIdentifier::new(last_chunk_identifier)
661 )
662 },
663 None => ChunkIdentifierGenerator::new_from_scratch(),
664 };
665
666 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
668 .prepare(
669 "SELECT id, previous, type FROM linked_chunks WHERE room_id = ? AND next IS NULL"
670 )?
671 .query_row(
672 (&hashed_room_id,),
673 |row| {
674 Ok((
675 row.get::<_, u64>(0)?,
676 row.get::<_, Option<u64>>(1)?,
677 row.get::<_, String>(2)?,
678 ))
679 }
680 )
681 .optional()?
682 else {
683 if number_of_chunks == 0 {
686 return Ok((None, chunk_identifier_generator));
687 }
688 else {
693 return Err(Error::InvalidData {
694 details:
695 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
696 .to_owned()
697 }
698 )
699 }
700 };
701
702 let last_chunk = txn.rebuild_chunk(
704 &this,
705 &hashed_room_id,
706 previous_chunk,
707 chunk_identifier,
708 None,
709 &chunk_type
710 )?;
711
712 Ok((Some(last_chunk), chunk_identifier_generator))
713 })
714 .await
715 }
716
717 async fn load_previous_chunk(
718 &self,
719 room_id: &RoomId,
720 before_chunk_identifier: ChunkIdentifier,
721 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
722 let room_id = room_id.to_owned();
723 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
724
725 let this = self.clone();
726
727 self
728 .acquire()
729 .await?
730 .with_transaction(move |txn| -> Result<_> {
731 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
733 .prepare(
734 "SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ? AND next = ?"
735 )?
736 .query_row(
737 (&hashed_room_id, before_chunk_identifier.index()),
738 |row| {
739 Ok((
740 row.get::<_, u64>(0)?,
741 row.get::<_, Option<u64>>(1)?,
742 row.get::<_, Option<u64>>(2)?,
743 row.get::<_, String>(3)?,
744 ))
745 }
746 )
747 .optional()?
748 else {
749 return Ok(None);
751 };
752
753 let last_chunk = txn.rebuild_chunk(
755 &this,
756 &hashed_room_id,
757 previous_chunk,
758 chunk_identifier,
759 next_chunk,
760 &chunk_type
761 )?;
762
763 Ok(Some(last_chunk))
764 })
765 .await
766 }
767
768 async fn clear_all_rooms_chunks(&self) -> Result<(), Self::Error> {
769 self.acquire()
770 .await?
771 .with_transaction(move |txn| {
772 txn.execute("DELETE FROM linked_chunks", ())
774 })
775 .await?;
776 Ok(())
777 }
778
779 async fn filter_duplicated_events(
780 &self,
781 room_id: &RoomId,
782 events: Vec<OwnedEventId>,
783 ) -> Result<Vec<OwnedEventId>, Self::Error> {
784 if events.is_empty() {
788 return Ok(Vec::new());
789 }
790
791 let room_id = room_id.to_owned();
793 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
794
795 self.acquire()
796 .await?
797 .with_transaction(move |txn| -> Result<_> {
798 txn.chunk_large_query_over(events, None, move |txn, events| {
799 let query = format!(
800 "SELECT event_id FROM events WHERE room_id = ? AND event_id IN ({}) ORDER BY chunk_id ASC, position ASC",
801 repeat_vars(events.len()),
802 );
803 let parameters = params_from_iter(
804 once(
806 hashed_room_id
807 .to_sql()
808 .unwrap(),
810 )
811 .chain(events.iter().map(|event| {
813 event
814 .as_str()
815 .to_sql()
816 .unwrap()
818 })),
819 );
820
821 let mut duplicated_events = Vec::new();
822
823 for duplicated_event in txn
824 .prepare(&query)?
825 .query_map(parameters, |row| row.get::<_, Option<String>>(0))?
826 {
827 let duplicated_event = duplicated_event?;
828
829 let Some(duplicated_event) = duplicated_event else {
830 continue;
832 };
833
834 let Ok(duplicated_event) = EventId::parse(duplicated_event) else {
835 continue;
838 };
839
840 duplicated_events.push(duplicated_event);
841 }
842
843 Ok(duplicated_events)
844 })
845 })
846 .await
847 }
848
849 async fn add_media_content(
850 &self,
851 request: &MediaRequestParameters,
852 content: Vec<u8>,
853 ignore_policy: IgnoreMediaRetentionPolicy,
854 ) -> Result<()> {
855 self.media_service.add_media_content(self, request, content, ignore_policy).await
856 }
857
858 async fn replace_media_key(
859 &self,
860 from: &MediaRequestParameters,
861 to: &MediaRequestParameters,
862 ) -> Result<(), Self::Error> {
863 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
864 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
865
866 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
867 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
868
869 let conn = self.acquire().await?;
870 conn.execute(
871 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
872 (new_uri, new_format, prev_uri, prev_format),
873 )
874 .await?;
875
876 Ok(())
877 }
878
879 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
880 self.media_service.get_media_content(self, request).await
881 }
882
883 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
884 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
885 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
886
887 let conn = self.acquire().await?;
888 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
889
890 Ok(())
891 }
892
893 async fn get_media_content_for_uri(
894 &self,
895 uri: &MxcUri,
896 ) -> Result<Option<Vec<u8>>, Self::Error> {
897 self.media_service.get_media_content_for_uri(self, uri).await
898 }
899
900 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
901 let uri = self.encode_key(keys::MEDIA, uri);
902
903 let conn = self.acquire().await?;
904 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
905
906 Ok(())
907 }
908
909 async fn set_media_retention_policy(
910 &self,
911 policy: MediaRetentionPolicy,
912 ) -> Result<(), Self::Error> {
913 self.media_service.set_media_retention_policy(self, policy).await
914 }
915
916 fn media_retention_policy(&self) -> MediaRetentionPolicy {
917 self.media_service.media_retention_policy()
918 }
919
920 async fn set_ignore_media_retention_policy(
921 &self,
922 request: &MediaRequestParameters,
923 ignore_policy: IgnoreMediaRetentionPolicy,
924 ) -> Result<(), Self::Error> {
925 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
926 }
927
928 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
929 self.media_service.clean_up_media_cache(self).await
930 }
931}
932
933#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
934#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
935impl EventCacheStoreMedia for SqliteEventCacheStore {
936 type Error = Error;
937
938 async fn media_retention_policy_inner(
939 &self,
940 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
941 let conn = self.acquire().await?;
942 conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
943 }
944
945 async fn set_media_retention_policy_inner(
946 &self,
947 policy: MediaRetentionPolicy,
948 ) -> Result<(), Self::Error> {
949 let conn = self.acquire().await?;
950 conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
951 Ok(())
952 }
953
954 async fn add_media_content_inner(
955 &self,
956 request: &MediaRequestParameters,
957 data: Vec<u8>,
958 last_access: SystemTime,
959 policy: MediaRetentionPolicy,
960 ignore_policy: IgnoreMediaRetentionPolicy,
961 ) -> Result<(), Self::Error> {
962 let ignore_policy = ignore_policy.is_yes();
963 let data = self.encode_value(data)?;
964
965 if !ignore_policy && policy.exceeds_max_file_size(data.len()) {
966 return Ok(());
967 }
968
969 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
970 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
971 let timestamp = time_to_timestamp(last_access);
972
973 let conn = self.acquire().await?;
974 conn.execute(
975 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
976 (uri, format, data, timestamp, ignore_policy),
977 )
978 .await?;
979
980 Ok(())
981 }
982
983 async fn set_ignore_media_retention_policy_inner(
984 &self,
985 request: &MediaRequestParameters,
986 ignore_policy: IgnoreMediaRetentionPolicy,
987 ) -> Result<(), Self::Error> {
988 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
989 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
990 let ignore_policy = ignore_policy.is_yes();
991
992 let conn = self.acquire().await?;
993 conn.execute(
994 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
995 (ignore_policy, uri, format),
996 )
997 .await?;
998
999 Ok(())
1000 }
1001
1002 async fn get_media_content_inner(
1003 &self,
1004 request: &MediaRequestParameters,
1005 current_time: SystemTime,
1006 ) -> Result<Option<Vec<u8>>, Self::Error> {
1007 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1008 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1009 let timestamp = time_to_timestamp(current_time);
1010
1011 let conn = self.acquire().await?;
1012 let data = conn
1013 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1014 txn.execute(
1018 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1019 (timestamp, &uri, &format),
1020 )?;
1021
1022 txn.query_row::<Vec<u8>, _, _>(
1023 "SELECT data FROM media WHERE uri = ? AND format = ?",
1024 (&uri, &format),
1025 |row| row.get(0),
1026 )
1027 .optional()
1028 })
1029 .await?;
1030
1031 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1032 }
1033
1034 async fn get_media_content_for_uri_inner(
1035 &self,
1036 uri: &MxcUri,
1037 current_time: SystemTime,
1038 ) -> Result<Option<Vec<u8>>, Self::Error> {
1039 let uri = self.encode_key(keys::MEDIA, uri);
1040 let timestamp = time_to_timestamp(current_time);
1041
1042 let conn = self.acquire().await?;
1043 let data = conn
1044 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1045 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1049
1050 txn.query_row::<Vec<u8>, _, _>(
1051 "SELECT data FROM media WHERE uri = ?",
1052 (&uri,),
1053 |row| row.get(0),
1054 )
1055 .optional()
1056 })
1057 .await?;
1058
1059 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1060 }
1061
1062 async fn clean_up_media_cache_inner(
1063 &self,
1064 policy: MediaRetentionPolicy,
1065 current_time: SystemTime,
1066 ) -> Result<(), Self::Error> {
1067 if !policy.has_limitations() {
1068 return Ok(());
1070 }
1071
1072 let conn = self.acquire().await?;
1073 let removed = conn
1074 .with_transaction::<_, Error, _>(move |txn| {
1075 let mut removed = false;
1076
1077 if let Some(max_file_size) = policy.computed_max_file_size() {
1079 let count = txn.execute(
1080 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1081 (max_file_size,),
1082 )?;
1083
1084 if count > 0 {
1085 removed = true;
1086 }
1087 }
1088
1089 if let Some(last_access_expiry) = policy.last_access_expiry {
1091 let current_timestamp = time_to_timestamp(current_time);
1092 let expiry_secs = last_access_expiry.as_secs();
1093 let count = txn.execute(
1094 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1095 (current_timestamp, expiry_secs),
1096 )?;
1097
1098 if count > 0 {
1099 removed = true;
1100 }
1101 }
1102
1103 if let Some(max_cache_size) = policy.max_cache_size {
1105 let cache_size_int = txn
1108 .query_row(
1109 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1110 (),
1111 |row| {
1112 row.get::<_, Option<i64>>(0)
1114 },
1115 )?
1116 .unwrap_or_default();
1117 let cache_size_usize = usize::try_from(cache_size_int);
1118
1119 if cache_size_usize.is_err()
1121 || cache_size_usize.is_ok_and(|cache_size| cache_size > max_cache_size)
1122 {
1123 let mut cached_stmt = txn.prepare_cached(
1125 "SELECT rowid, length(data) FROM media \
1126 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1127 )?;
1128 let content_sizes = cached_stmt
1129 .query(())?
1130 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, usize>(1)?)));
1131
1132 let mut accumulated_items_size = 0usize;
1133 let mut limit_reached = false;
1134 let mut rows_to_remove = Vec::new();
1135
1136 for result in content_sizes {
1137 let (row_id, size) = match result {
1138 Ok(content_size) => content_size,
1139 Err(error) => {
1140 return Err(error.into());
1141 }
1142 };
1143
1144 if limit_reached {
1145 rows_to_remove.push(row_id);
1146 continue;
1147 }
1148
1149 match accumulated_items_size.checked_add(size) {
1150 Some(acc) if acc > max_cache_size => {
1151 limit_reached = true;
1153 rows_to_remove.push(row_id);
1154 }
1155 Some(acc) => accumulated_items_size = acc,
1156 None => {
1157 limit_reached = true;
1160 rows_to_remove.push(row_id);
1161 }
1162 };
1163 }
1164
1165 if !rows_to_remove.is_empty() {
1166 removed = true;
1167 }
1168
1169 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1170 let sql_params = repeat_vars(row_ids.len());
1171 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1172 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1173 Ok(Vec::<()>::new())
1174 })?;
1175 }
1176 }
1177
1178 txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1179
1180 Ok(removed)
1181 })
1182 .await?;
1183
1184 if removed {
1187 conn.vacuum().await?;
1188 }
1189
1190 Ok(())
1191 }
1192
1193 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1194 let conn = self.acquire().await?;
1195 conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1196 }
1197}
1198
1199async fn with_immediate_transaction<
1204 T: Send + 'static,
1205 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1206>(
1207 conn: SqliteAsyncConn,
1208 f: F,
1209) -> Result<T, Error> {
1210 conn.interact(move |conn| -> Result<T, Error> {
1211 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1215
1216 let code = || -> Result<T, Error> {
1217 let txn = conn.transaction()?;
1218 let res = f(&txn)?;
1219 txn.commit()?;
1220 Ok(res)
1221 };
1222
1223 let res = code();
1224
1225 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1228
1229 res
1230 })
1231 .await
1232 .unwrap()
1234}
1235
1236fn insert_chunk(
1237 txn: &Transaction<'_>,
1238 room_id: &Key,
1239 previous: Option<u64>,
1240 new: u64,
1241 next: Option<u64>,
1242 type_str: &str,
1243) -> rusqlite::Result<()> {
1244 txn.execute(
1246 r#"
1247 INSERT INTO linked_chunks(id, room_id, previous, next, type)
1248 VALUES (?, ?, ?, ?, ?)
1249 "#,
1250 (new, room_id, previous, next, type_str),
1251 )?;
1252
1253 if let Some(previous) = previous {
1255 txn.execute(
1256 r#"
1257 UPDATE linked_chunks
1258 SET next = ?
1259 WHERE id = ? AND room_id = ?
1260 "#,
1261 (new, previous, room_id),
1262 )?;
1263 }
1264
1265 if let Some(next) = next {
1267 txn.execute(
1268 r#"
1269 UPDATE linked_chunks
1270 SET previous = ?
1271 WHERE id = ? AND room_id = ?
1272 "#,
1273 (new, next, room_id),
1274 )?;
1275 }
1276
1277 Ok(())
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282 use std::{
1283 sync::atomic::{AtomicU32, Ordering::SeqCst},
1284 time::Duration,
1285 };
1286
1287 use assert_matches::assert_matches;
1288 use matrix_sdk_base::{
1289 event_cache::{
1290 store::{
1291 integration_tests::{check_test_event, make_test_event},
1292 media::IgnoreMediaRetentionPolicy,
1293 EventCacheStore, EventCacheStoreError,
1294 },
1295 Gap,
1296 },
1297 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1298 event_cache_store_media_integration_tests,
1299 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
1300 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1301 };
1302 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1303 use once_cell::sync::Lazy;
1304 use ruma::{events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1305 use tempfile::{tempdir, TempDir};
1306
1307 use super::SqliteEventCacheStore;
1308 use crate::utils::SqliteAsyncConnExt;
1309
1310 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1311 static NUM: AtomicU32 = AtomicU32::new(0);
1312
1313 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1314 let name = NUM.fetch_add(1, SeqCst).to_string();
1315 let tmpdir_path = TMP_DIR.path().join(name);
1316
1317 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1318
1319 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1320 }
1321
1322 event_cache_store_integration_tests!();
1323 event_cache_store_integration_tests_time!();
1324 event_cache_store_media_integration_tests!(with_media_size_tests);
1325
1326 async fn get_event_cache_store_content_sorted_by_last_access(
1327 event_cache_store: &SqliteEventCacheStore,
1328 ) -> Vec<Vec<u8>> {
1329 let sqlite_db = event_cache_store.acquire().await.expect("accessing sqlite db failed");
1330 sqlite_db
1331 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1332 stmt.query(())?.mapped(|row| row.get(0)).collect()
1333 })
1334 .await
1335 .expect("querying media cache content by last access failed")
1336 }
1337
1338 #[async_test]
1339 async fn test_last_access() {
1340 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1341 let uri = mxc_uri!("mxc://localhost/media");
1342 let file_request = MediaRequestParameters {
1343 source: MediaSource::Plain(uri.to_owned()),
1344 format: MediaFormat::File,
1345 };
1346 let thumbnail_request = MediaRequestParameters {
1347 source: MediaSource::Plain(uri.to_owned()),
1348 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1349 Method::Crop,
1350 uint!(100),
1351 uint!(100),
1352 )),
1353 };
1354
1355 let content: Vec<u8> = "hello world".into();
1356 let thumbnail_content: Vec<u8> = "hello…".into();
1357
1358 event_cache_store
1360 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1361 .await
1362 .expect("adding file failed");
1363
1364 tokio::time::sleep(Duration::from_secs(3)).await;
1367
1368 event_cache_store
1369 .add_media_content(
1370 &thumbnail_request,
1371 thumbnail_content.clone(),
1372 IgnoreMediaRetentionPolicy::No,
1373 )
1374 .await
1375 .expect("adding thumbnail failed");
1376
1377 let contents =
1379 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1380
1381 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1382 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1383 assert_eq!(contents[1], content, "file is not second-to-last access");
1384
1385 tokio::time::sleep(Duration::from_secs(3)).await;
1388
1389 let _ = event_cache_store
1391 .get_media_content(&file_request)
1392 .await
1393 .expect("getting file failed")
1394 .expect("file is missing");
1395
1396 let contents =
1398 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1399
1400 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1401 assert_eq!(contents[0], content, "file is not last access");
1402 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
1403 }
1404
1405 #[async_test]
1406 async fn test_linked_chunk_new_items_chunk() {
1407 let store = get_event_cache_store().await.expect("creating cache store failed");
1408
1409 let room_id = &DEFAULT_TEST_ROOM_ID;
1410
1411 store
1412 .handle_linked_chunk_updates(
1413 room_id,
1414 vec![
1415 Update::NewItemsChunk {
1416 previous: None,
1417 new: ChunkIdentifier::new(42),
1418 next: None, },
1420 Update::NewItemsChunk {
1421 previous: Some(ChunkIdentifier::new(42)),
1422 new: ChunkIdentifier::new(13),
1423 next: Some(ChunkIdentifier::new(37)), },
1426 Update::NewItemsChunk {
1427 previous: Some(ChunkIdentifier::new(13)),
1428 new: ChunkIdentifier::new(37),
1429 next: None,
1430 },
1431 ],
1432 )
1433 .await
1434 .unwrap();
1435
1436 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1437
1438 assert_eq!(chunks.len(), 3);
1439
1440 {
1441 let c = chunks.remove(0);
1443 assert_eq!(c.identifier, ChunkIdentifier::new(13));
1444 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1445 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
1446 assert_matches!(c.content, ChunkContent::Items(events) => {
1447 assert!(events.is_empty());
1448 });
1449
1450 let c = chunks.remove(0);
1451 assert_eq!(c.identifier, ChunkIdentifier::new(37));
1452 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
1453 assert_eq!(c.next, None);
1454 assert_matches!(c.content, ChunkContent::Items(events) => {
1455 assert!(events.is_empty());
1456 });
1457
1458 let c = chunks.remove(0);
1459 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1460 assert_eq!(c.previous, None);
1461 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
1462 assert_matches!(c.content, ChunkContent::Items(events) => {
1463 assert!(events.is_empty());
1464 });
1465 }
1466 }
1467
1468 #[async_test]
1469 async fn test_linked_chunk_new_gap_chunk() {
1470 let store = get_event_cache_store().await.expect("creating cache store failed");
1471
1472 let room_id = &DEFAULT_TEST_ROOM_ID;
1473
1474 store
1475 .handle_linked_chunk_updates(
1476 room_id,
1477 vec![Update::NewGapChunk {
1478 previous: None,
1479 new: ChunkIdentifier::new(42),
1480 next: None,
1481 gap: Gap { prev_token: "raclette".to_owned() },
1482 }],
1483 )
1484 .await
1485 .unwrap();
1486
1487 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1488
1489 assert_eq!(chunks.len(), 1);
1490
1491 let c = chunks.remove(0);
1493 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1494 assert_eq!(c.previous, None);
1495 assert_eq!(c.next, None);
1496 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1497 assert_eq!(gap.prev_token, "raclette");
1498 });
1499 }
1500
1501 #[async_test]
1502 async fn test_linked_chunk_replace_item() {
1503 let store = get_event_cache_store().await.expect("creating cache store failed");
1504
1505 let room_id = &DEFAULT_TEST_ROOM_ID;
1506
1507 store
1508 .handle_linked_chunk_updates(
1509 room_id,
1510 vec![
1511 Update::NewItemsChunk {
1512 previous: None,
1513 new: ChunkIdentifier::new(42),
1514 next: None,
1515 },
1516 Update::PushItems {
1517 at: Position::new(ChunkIdentifier::new(42), 0),
1518 items: vec![
1519 make_test_event(room_id, "hello"),
1520 make_test_event(room_id, "world"),
1521 ],
1522 },
1523 Update::ReplaceItem {
1524 at: Position::new(ChunkIdentifier::new(42), 1),
1525 item: make_test_event(room_id, "yolo"),
1526 },
1527 ],
1528 )
1529 .await
1530 .unwrap();
1531
1532 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1533
1534 assert_eq!(chunks.len(), 1);
1535
1536 let c = chunks.remove(0);
1537 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1538 assert_eq!(c.previous, None);
1539 assert_eq!(c.next, None);
1540 assert_matches!(c.content, ChunkContent::Items(events) => {
1541 assert_eq!(events.len(), 2);
1542 check_test_event(&events[0], "hello");
1543 check_test_event(&events[1], "yolo");
1544 });
1545 }
1546
1547 #[async_test]
1548 async fn test_linked_chunk_remove_chunk() {
1549 let store = get_event_cache_store().await.expect("creating cache store failed");
1550
1551 let room_id = &DEFAULT_TEST_ROOM_ID;
1552
1553 store
1554 .handle_linked_chunk_updates(
1555 room_id,
1556 vec![
1557 Update::NewGapChunk {
1558 previous: None,
1559 new: ChunkIdentifier::new(42),
1560 next: None,
1561 gap: Gap { prev_token: "raclette".to_owned() },
1562 },
1563 Update::NewGapChunk {
1564 previous: Some(ChunkIdentifier::new(42)),
1565 new: ChunkIdentifier::new(43),
1566 next: None,
1567 gap: Gap { prev_token: "fondue".to_owned() },
1568 },
1569 Update::NewGapChunk {
1570 previous: Some(ChunkIdentifier::new(43)),
1571 new: ChunkIdentifier::new(44),
1572 next: None,
1573 gap: Gap { prev_token: "tartiflette".to_owned() },
1574 },
1575 Update::RemoveChunk(ChunkIdentifier::new(43)),
1576 ],
1577 )
1578 .await
1579 .unwrap();
1580
1581 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1582
1583 assert_eq!(chunks.len(), 2);
1584
1585 let c = chunks.remove(0);
1587 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1588 assert_eq!(c.previous, None);
1589 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
1590 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1591 assert_eq!(gap.prev_token, "raclette");
1592 });
1593
1594 let c = chunks.remove(0);
1595 assert_eq!(c.identifier, ChunkIdentifier::new(44));
1596 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
1597 assert_eq!(c.next, None);
1598 assert_matches!(c.content, ChunkContent::Gap(gap) => {
1599 assert_eq!(gap.prev_token, "tartiflette");
1600 });
1601
1602 let gaps = store
1604 .acquire()
1605 .await
1606 .unwrap()
1607 .with_transaction(|txn| -> rusqlite::Result<_> {
1608 let mut gaps = Vec::new();
1609 for data in txn
1610 .prepare("SELECT chunk_id FROM gaps ORDER BY chunk_id")?
1611 .query_map((), |row| row.get::<_, u64>(0))?
1612 {
1613 gaps.push(data?);
1614 }
1615 Ok(gaps)
1616 })
1617 .await
1618 .unwrap();
1619
1620 assert_eq!(gaps, vec![42, 44]);
1621 }
1622
1623 #[async_test]
1624 async fn test_linked_chunk_push_items() {
1625 let store = get_event_cache_store().await.expect("creating cache store failed");
1626
1627 let room_id = &DEFAULT_TEST_ROOM_ID;
1628
1629 store
1630 .handle_linked_chunk_updates(
1631 room_id,
1632 vec![
1633 Update::NewItemsChunk {
1634 previous: None,
1635 new: ChunkIdentifier::new(42),
1636 next: None,
1637 },
1638 Update::PushItems {
1639 at: Position::new(ChunkIdentifier::new(42), 0),
1640 items: vec![
1641 make_test_event(room_id, "hello"),
1642 make_test_event(room_id, "world"),
1643 ],
1644 },
1645 Update::PushItems {
1646 at: Position::new(ChunkIdentifier::new(42), 2),
1647 items: vec![make_test_event(room_id, "who?")],
1648 },
1649 ],
1650 )
1651 .await
1652 .unwrap();
1653
1654 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1655
1656 assert_eq!(chunks.len(), 1);
1657
1658 let c = chunks.remove(0);
1659 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1660 assert_eq!(c.previous, None);
1661 assert_eq!(c.next, None);
1662 assert_matches!(c.content, ChunkContent::Items(events) => {
1663 assert_eq!(events.len(), 3);
1664
1665 check_test_event(&events[0], "hello");
1666 check_test_event(&events[1], "world");
1667 check_test_event(&events[2], "who?");
1668 });
1669 }
1670
1671 #[async_test]
1672 async fn test_linked_chunk_remove_item() {
1673 let store = get_event_cache_store().await.expect("creating cache store failed");
1674
1675 let room_id = *DEFAULT_TEST_ROOM_ID;
1676
1677 store
1678 .handle_linked_chunk_updates(
1679 room_id,
1680 vec![
1681 Update::NewItemsChunk {
1682 previous: None,
1683 new: ChunkIdentifier::new(42),
1684 next: None,
1685 },
1686 Update::PushItems {
1687 at: Position::new(ChunkIdentifier::new(42), 0),
1688 items: vec![
1689 make_test_event(room_id, "hello"),
1690 make_test_event(room_id, "world"),
1691 ],
1692 },
1693 Update::RemoveItem { at: Position::new(ChunkIdentifier::new(42), 0) },
1694 ],
1695 )
1696 .await
1697 .unwrap();
1698
1699 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1700
1701 assert_eq!(chunks.len(), 1);
1702
1703 let c = chunks.remove(0);
1704 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1705 assert_eq!(c.previous, None);
1706 assert_eq!(c.next, None);
1707 assert_matches!(c.content, ChunkContent::Items(events) => {
1708 assert_eq!(events.len(), 1);
1709 check_test_event(&events[0], "world");
1710 });
1711
1712 let num_rows: u64 = store
1714 .acquire()
1715 .await
1716 .unwrap()
1717 .with_transaction(move |txn| {
1718 txn.query_row(
1719 "SELECT COUNT(*) FROM events WHERE chunk_id = 42 AND room_id = ? AND position = 0",
1720 (room_id.as_bytes(),),
1721 |row| row.get(0),
1722 )
1723 })
1724 .await
1725 .unwrap();
1726 assert_eq!(num_rows, 1);
1727 }
1728
1729 #[async_test]
1730 async fn test_linked_chunk_detach_last_items() {
1731 let store = get_event_cache_store().await.expect("creating cache store failed");
1732
1733 let room_id = *DEFAULT_TEST_ROOM_ID;
1734
1735 store
1736 .handle_linked_chunk_updates(
1737 room_id,
1738 vec![
1739 Update::NewItemsChunk {
1740 previous: None,
1741 new: ChunkIdentifier::new(42),
1742 next: None,
1743 },
1744 Update::PushItems {
1745 at: Position::new(ChunkIdentifier::new(42), 0),
1746 items: vec![
1747 make_test_event(room_id, "hello"),
1748 make_test_event(room_id, "world"),
1749 make_test_event(room_id, "howdy"),
1750 ],
1751 },
1752 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
1753 ],
1754 )
1755 .await
1756 .unwrap();
1757
1758 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1759
1760 assert_eq!(chunks.len(), 1);
1761
1762 let c = chunks.remove(0);
1763 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1764 assert_eq!(c.previous, None);
1765 assert_eq!(c.next, None);
1766 assert_matches!(c.content, ChunkContent::Items(events) => {
1767 assert_eq!(events.len(), 1);
1768 check_test_event(&events[0], "hello");
1769 });
1770 }
1771
1772 #[async_test]
1773 async fn test_linked_chunk_start_end_reattach_items() {
1774 let store = get_event_cache_store().await.expect("creating cache store failed");
1775
1776 let room_id = *DEFAULT_TEST_ROOM_ID;
1777
1778 store
1782 .handle_linked_chunk_updates(
1783 room_id,
1784 vec![
1785 Update::NewItemsChunk {
1786 previous: None,
1787 new: ChunkIdentifier::new(42),
1788 next: None,
1789 },
1790 Update::PushItems {
1791 at: Position::new(ChunkIdentifier::new(42), 0),
1792 items: vec![
1793 make_test_event(room_id, "hello"),
1794 make_test_event(room_id, "world"),
1795 make_test_event(room_id, "howdy"),
1796 ],
1797 },
1798 Update::StartReattachItems,
1799 Update::EndReattachItems,
1800 ],
1801 )
1802 .await
1803 .unwrap();
1804
1805 let mut chunks = store.load_all_chunks(room_id).await.unwrap();
1806
1807 assert_eq!(chunks.len(), 1);
1808
1809 let c = chunks.remove(0);
1810 assert_eq!(c.identifier, ChunkIdentifier::new(42));
1811 assert_eq!(c.previous, None);
1812 assert_eq!(c.next, None);
1813 assert_matches!(c.content, ChunkContent::Items(events) => {
1814 assert_eq!(events.len(), 3);
1815 check_test_event(&events[0], "hello");
1816 check_test_event(&events[1], "world");
1817 check_test_event(&events[2], "howdy");
1818 });
1819 }
1820
1821 #[async_test]
1822 async fn test_linked_chunk_clear() {
1823 let store = get_event_cache_store().await.expect("creating cache store failed");
1824
1825 let room_id = *DEFAULT_TEST_ROOM_ID;
1826
1827 store
1828 .handle_linked_chunk_updates(
1829 room_id,
1830 vec![
1831 Update::NewItemsChunk {
1832 previous: None,
1833 new: ChunkIdentifier::new(42),
1834 next: None,
1835 },
1836 Update::NewGapChunk {
1837 previous: Some(ChunkIdentifier::new(42)),
1838 new: ChunkIdentifier::new(54),
1839 next: None,
1840 gap: Gap { prev_token: "fondue".to_owned() },
1841 },
1842 Update::PushItems {
1843 at: Position::new(ChunkIdentifier::new(42), 0),
1844 items: vec![
1845 make_test_event(room_id, "hello"),
1846 make_test_event(room_id, "world"),
1847 make_test_event(room_id, "howdy"),
1848 ],
1849 },
1850 Update::Clear,
1851 ],
1852 )
1853 .await
1854 .unwrap();
1855
1856 let chunks = store.load_all_chunks(room_id).await.unwrap();
1857 assert!(chunks.is_empty());
1858 }
1859
1860 #[async_test]
1861 async fn test_linked_chunk_multiple_rooms() {
1862 let store = get_event_cache_store().await.expect("creating cache store failed");
1863
1864 let room1 = room_id!("!realcheeselovers:raclette.fr");
1865 let room2 = room_id!("!realcheeselovers:fondue.ch");
1866
1867 store
1871 .handle_linked_chunk_updates(
1872 room1,
1873 vec![
1874 Update::NewItemsChunk {
1875 previous: None,
1876 new: ChunkIdentifier::new(42),
1877 next: None,
1878 },
1879 Update::PushItems {
1880 at: Position::new(ChunkIdentifier::new(42), 0),
1881 items: vec![
1882 make_test_event(room1, "best cheese is raclette"),
1883 make_test_event(room1, "obviously"),
1884 ],
1885 },
1886 ],
1887 )
1888 .await
1889 .unwrap();
1890
1891 store
1892 .handle_linked_chunk_updates(
1893 room2,
1894 vec![
1895 Update::NewItemsChunk {
1896 previous: None,
1897 new: ChunkIdentifier::new(42),
1898 next: None,
1899 },
1900 Update::PushItems {
1901 at: Position::new(ChunkIdentifier::new(42), 0),
1902 items: vec![make_test_event(room1, "beaufort is the best")],
1903 },
1904 ],
1905 )
1906 .await
1907 .unwrap();
1908
1909 let mut chunks_room1 = store.load_all_chunks(room1).await.unwrap();
1911 assert_eq!(chunks_room1.len(), 1);
1912
1913 let c = chunks_room1.remove(0);
1914 assert_matches!(c.content, ChunkContent::Items(events) => {
1915 assert_eq!(events.len(), 2);
1916 check_test_event(&events[0], "best cheese is raclette");
1917 check_test_event(&events[1], "obviously");
1918 });
1919
1920 let mut chunks_room2 = store.load_all_chunks(room2).await.unwrap();
1922 assert_eq!(chunks_room2.len(), 1);
1923
1924 let c = chunks_room2.remove(0);
1925 assert_matches!(c.content, ChunkContent::Items(events) => {
1926 assert_eq!(events.len(), 1);
1927 check_test_event(&events[0], "beaufort is the best");
1928 });
1929 }
1930
1931 #[async_test]
1932 async fn test_linked_chunk_update_is_a_transaction() {
1933 let store = get_event_cache_store().await.expect("creating cache store failed");
1934
1935 let room_id = *DEFAULT_TEST_ROOM_ID;
1936
1937 let err = store
1940 .handle_linked_chunk_updates(
1941 room_id,
1942 vec![
1943 Update::NewItemsChunk {
1944 previous: None,
1945 new: ChunkIdentifier::new(42),
1946 next: None,
1947 },
1948 Update::NewItemsChunk {
1949 previous: None,
1950 new: ChunkIdentifier::new(42),
1951 next: None,
1952 },
1953 ],
1954 )
1955 .await
1956 .unwrap_err();
1957
1958 assert_matches!(err, crate::error::Error::Sqlite(err) => {
1960 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
1961 });
1962
1963 let chunks = store.load_all_chunks(room_id).await.unwrap();
1967 assert!(chunks.is_empty());
1968 }
1969
1970 #[async_test]
1971 async fn test_filter_duplicate_events_no_events() {
1972 let store = get_event_cache_store().await.expect("creating cache store failed");
1973
1974 let room_id = *DEFAULT_TEST_ROOM_ID;
1975 let duplicates = store.filter_duplicated_events(room_id, Vec::new()).await.unwrap();
1976 assert!(duplicates.is_empty());
1977 }
1978
1979 #[async_test]
1980 async fn test_load_last_chunk() {
1981 let room_id = room_id!("!r0:matrix.org");
1982 let event = |msg: &str| make_test_event(room_id, msg);
1983 let store = get_event_cache_store().await.expect("creating cache store failed");
1984
1985 {
1987 let (last_chunk, chunk_identifier_generator) =
1988 store.load_last_chunk(room_id).await.unwrap();
1989
1990 assert!(last_chunk.is_none());
1991 assert_eq!(chunk_identifier_generator.current(), 0);
1992 }
1993
1994 {
1996 store
1997 .handle_linked_chunk_updates(
1998 room_id,
1999 vec![
2000 Update::NewItemsChunk {
2001 previous: None,
2002 new: ChunkIdentifier::new(42),
2003 next: None,
2004 },
2005 Update::PushItems {
2006 at: Position::new(ChunkIdentifier::new(42), 0),
2007 items: vec![event("saucisse de morteau"), event("comté")],
2008 },
2009 ],
2010 )
2011 .await
2012 .unwrap();
2013
2014 let (last_chunk, chunk_identifier_generator) =
2015 store.load_last_chunk(room_id).await.unwrap();
2016
2017 assert_matches!(last_chunk, Some(last_chunk) => {
2018 assert_eq!(last_chunk.identifier, 42);
2019 assert!(last_chunk.previous.is_none());
2020 assert!(last_chunk.next.is_none());
2021 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2022 assert_eq!(items.len(), 2);
2023 check_test_event(&items[0], "saucisse de morteau");
2024 check_test_event(&items[1], "comté");
2025 });
2026 });
2027 assert_eq!(chunk_identifier_generator.current(), 42);
2028 }
2029
2030 {
2032 store
2033 .handle_linked_chunk_updates(
2034 room_id,
2035 vec![
2036 Update::NewItemsChunk {
2037 previous: Some(ChunkIdentifier::new(42)),
2038 new: ChunkIdentifier::new(7),
2039 next: None,
2040 },
2041 Update::PushItems {
2042 at: Position::new(ChunkIdentifier::new(7), 0),
2043 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2044 },
2045 ],
2046 )
2047 .await
2048 .unwrap();
2049
2050 let (last_chunk, chunk_identifier_generator) =
2051 store.load_last_chunk(room_id).await.unwrap();
2052
2053 assert_matches!(last_chunk, Some(last_chunk) => {
2054 assert_eq!(last_chunk.identifier, 7);
2055 assert_matches!(last_chunk.previous, Some(previous) => {
2056 assert_eq!(previous, 42);
2057 });
2058 assert!(last_chunk.next.is_none());
2059 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2060 assert_eq!(items.len(), 3);
2061 check_test_event(&items[0], "fondue");
2062 check_test_event(&items[1], "gruyère");
2063 check_test_event(&items[2], "mont d'or");
2064 });
2065 });
2066 assert_eq!(chunk_identifier_generator.current(), 42);
2067 }
2068 }
2069
2070 #[async_test]
2071 async fn test_load_last_chunk_with_a_cycle() {
2072 let room_id = room_id!("!r0:matrix.org");
2073 let store = get_event_cache_store().await.expect("creating cache store failed");
2074
2075 store
2076 .handle_linked_chunk_updates(
2077 room_id,
2078 vec![
2079 Update::NewItemsChunk {
2080 previous: None,
2081 new: ChunkIdentifier::new(0),
2082 next: None,
2083 },
2084 Update::NewItemsChunk {
2085 previous: Some(ChunkIdentifier::new(0)),
2089 new: ChunkIdentifier::new(1),
2090 next: Some(ChunkIdentifier::new(0)),
2091 },
2092 ],
2093 )
2094 .await
2095 .unwrap();
2096
2097 store.load_last_chunk(room_id).await.unwrap_err();
2098 }
2099
2100 #[async_test]
2101 async fn test_load_previous_chunk() {
2102 let room_id = room_id!("!r0:matrix.org");
2103 let event = |msg: &str| make_test_event(room_id, msg);
2104 let store = get_event_cache_store().await.expect("creating cache store failed");
2105
2106 {
2109 let previous_chunk =
2110 store.load_previous_chunk(room_id, ChunkIdentifier::new(153)).await.unwrap();
2111
2112 assert!(previous_chunk.is_none());
2113 }
2114
2115 {
2118 store
2119 .handle_linked_chunk_updates(
2120 room_id,
2121 vec![Update::NewItemsChunk {
2122 previous: None,
2123 new: ChunkIdentifier::new(42),
2124 next: None,
2125 }],
2126 )
2127 .await
2128 .unwrap();
2129
2130 let previous_chunk =
2131 store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2132
2133 assert!(previous_chunk.is_none());
2134 }
2135
2136 {
2138 store
2139 .handle_linked_chunk_updates(
2140 room_id,
2141 vec![
2142 Update::NewItemsChunk {
2144 previous: None,
2145 new: ChunkIdentifier::new(7),
2146 next: Some(ChunkIdentifier::new(42)),
2147 },
2148 Update::PushItems {
2149 at: Position::new(ChunkIdentifier::new(7), 0),
2150 items: vec![event("brigand du jorat"), event("morbier")],
2151 },
2152 ],
2153 )
2154 .await
2155 .unwrap();
2156
2157 let previous_chunk =
2158 store.load_previous_chunk(room_id, ChunkIdentifier::new(42)).await.unwrap();
2159
2160 assert_matches!(previous_chunk, Some(previous_chunk) => {
2161 assert_eq!(previous_chunk.identifier, 7);
2162 assert!(previous_chunk.previous.is_none());
2163 assert_matches!(previous_chunk.next, Some(next) => {
2164 assert_eq!(next, 42);
2165 });
2166 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2167 assert_eq!(items.len(), 2);
2168 check_test_event(&items[0], "brigand du jorat");
2169 check_test_event(&items[1], "morbier");
2170 });
2171 });
2172 }
2173 }
2174}
2175
2176#[cfg(test)]
2177mod encrypted_tests {
2178 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2179
2180 use matrix_sdk_base::{
2181 event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
2182 event_cache_store_integration_tests_time, event_cache_store_media_integration_tests,
2183 };
2184 use once_cell::sync::Lazy;
2185 use tempfile::{tempdir, TempDir};
2186
2187 use super::SqliteEventCacheStore;
2188
2189 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2190 static NUM: AtomicU32 = AtomicU32::new(0);
2191
2192 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2193 let name = NUM.fetch_add(1, SeqCst).to_string();
2194 let tmpdir_path = TMP_DIR.path().join(name);
2195
2196 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2197
2198 Ok(SqliteEventCacheStore::open(
2199 tmpdir_path.to_str().unwrap(),
2200 Some("default_test_password"),
2201 )
2202 .await
2203 .unwrap())
2204 }
2205
2206 event_cache_store_integration_tests!();
2207 event_cache_store_integration_tests_time!();
2208 event_cache_store_media_integration_tests!();
2209}