1use std::{collections::HashMap, fmt, iter::once, path::Path, sync::Arc};
18
19use async_trait::async_trait;
20use deadpool_sqlite::{Object as SqliteAsyncConn, Pool as SqlitePool, Runtime};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent,
23 event_cache::{
24 store::{
25 compute_filters_string, extract_event_relation,
26 media::{
27 EventCacheStoreMedia, IgnoreMediaRetentionPolicy, MediaRetentionPolicy,
28 MediaService,
29 },
30 EventCacheStore,
31 },
32 Event, Gap,
33 },
34 linked_chunk::{
35 ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
36 Position, RawChunk, Update,
37 },
38 media::{MediaRequestParameters, UniqueKey},
39 timer,
40};
41use matrix_sdk_store_encryption::StoreCipher;
42use ruma::{
43 events::relation::RelationType, time::SystemTime, EventId, MilliSecondsSinceUnixEpoch, MxcUri,
44 OwnedEventId, RoomId,
45};
46use rusqlite::{params_from_iter, OptionalExtension, ToSql, Transaction, TransactionBehavior};
47use tokio::{
48 fs,
49 sync::{Mutex, OwnedMutexGuard},
50};
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54 error::{Error, Result},
55 utils::{
56 repeat_vars, time_to_timestamp, EncryptableStore, Key, SqliteAsyncConnExt,
57 SqliteKeyValueStoreAsyncConnExt, SqliteKeyValueStoreConnExt, SqliteTransactionExt,
58 },
59 OpenStoreError, SqliteStoreConfig,
60};
61
62mod keys {
63 pub const MEDIA_RETENTION_POLICY: &str = "media_retention_policy";
65 pub const LAST_MEDIA_CLEANUP_TIME: &str = "last_media_cleanup_time";
66
67 pub const LINKED_CHUNKS: &str = "linked_chunks";
69 pub const MEDIA: &str = "media";
70}
71
72const DATABASE_NAME: &str = "matrix-sdk-event-cache.sqlite3";
74
75const DATABASE_VERSION: u8 = 9;
81
82const CHUNK_TYPE_EVENT_TYPE_STRING: &str = "E";
85const CHUNK_TYPE_GAP_TYPE_STRING: &str = "G";
88
89#[derive(Clone)]
91pub struct SqliteEventCacheStore {
92 store_cipher: Option<Arc<StoreCipher>>,
93
94 pool: SqlitePool,
96
97 write_connection: Arc<Mutex<SqliteAsyncConn>>,
102
103 media_service: MediaService,
104}
105
106#[cfg(not(tarpaulin_include))]
107impl fmt::Debug for SqliteEventCacheStore {
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
109 f.debug_struct("SqliteEventCacheStore").finish_non_exhaustive()
110 }
111}
112
113impl EncryptableStore for SqliteEventCacheStore {
114 fn get_cypher(&self) -> Option<&StoreCipher> {
115 self.store_cipher.as_deref()
116 }
117}
118
119impl SqliteEventCacheStore {
120 pub async fn open(
123 path: impl AsRef<Path>,
124 passphrase: Option<&str>,
125 ) -> Result<Self, OpenStoreError> {
126 Self::open_with_config(SqliteStoreConfig::new(path).passphrase(passphrase)).await
127 }
128
129 #[instrument(skip(config), fields(path = ?config.path))]
131 pub async fn open_with_config(config: SqliteStoreConfig) -> Result<Self, OpenStoreError> {
132 debug!(?config);
133
134 let _timer = timer!("open_with_config");
135
136 let SqliteStoreConfig { path, passphrase, pool_config, runtime_config } = config;
137
138 fs::create_dir_all(&path).await.map_err(OpenStoreError::CreateDir)?;
139
140 let mut config = deadpool_sqlite::Config::new(path.join(DATABASE_NAME));
141 config.pool = Some(pool_config);
142
143 let pool = config.create_pool(Runtime::Tokio1)?;
144
145 let this = Self::open_with_pool(pool, passphrase.as_deref()).await?;
146 this.write().await?.apply_runtime_config(runtime_config).await?;
147
148 Ok(this)
149 }
150
151 async fn open_with_pool(
154 pool: SqlitePool,
155 passphrase: Option<&str>,
156 ) -> Result<Self, OpenStoreError> {
157 let conn = pool.get().await?;
158
159 let version = conn.db_version().await?;
160 run_migrations(&conn, version).await?;
161
162 let store_cipher = match passphrase {
163 Some(p) => Some(Arc::new(conn.get_or_create_store_cipher(p).await?)),
164 None => None,
165 };
166
167 let media_service = MediaService::new();
168 let media_retention_policy = conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await?;
169 let last_media_cleanup_time = conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await?;
170 media_service.restore(media_retention_policy, last_media_cleanup_time);
171
172 Ok(Self {
173 store_cipher,
174 pool,
175 write_connection: Arc::new(Mutex::new(conn)),
177 media_service,
178 })
179 }
180
181 #[instrument(skip_all)]
183 async fn read(&self) -> Result<SqliteAsyncConn> {
184 trace!("Taking a `read` connection");
185 let _timer = timer!("connection");
186
187 let connection = self.pool.get().await?;
188
189 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
194
195 Ok(connection)
196 }
197
198 #[instrument(skip_all)]
200 async fn write(&self) -> Result<OwnedMutexGuard<SqliteAsyncConn>> {
201 trace!("Taking a `write` connection");
202 let _timer = timer!("connection");
203
204 let connection = self.write_connection.clone().lock_owned().await;
205
206 connection.execute_batch("PRAGMA foreign_keys = ON;").await?;
211
212 Ok(connection)
213 }
214
215 fn map_row_to_chunk(
216 row: &rusqlite::Row<'_>,
217 ) -> Result<(u64, Option<u64>, Option<u64>, String), rusqlite::Error> {
218 Ok((
219 row.get::<_, u64>(0)?,
220 row.get::<_, Option<u64>>(1)?,
221 row.get::<_, Option<u64>>(2)?,
222 row.get::<_, String>(3)?,
223 ))
224 }
225
226 fn encode_event(&self, event: &TimelineEvent) -> Result<EncodedEvent> {
227 let serialized = serde_json::to_vec(event)?;
228
229 let raw_event = event.raw();
231 let (relates_to, rel_type) = extract_event_relation(raw_event).unzip();
232
233 let content = self.encode_value(serialized)?;
235
236 Ok(EncodedEvent {
237 content,
238 rel_type,
239 relates_to: relates_to.map(|relates_to| relates_to.to_string()),
240 })
241 }
242}
243
244struct EncodedEvent {
245 content: Vec<u8>,
246 rel_type: Option<String>,
247 relates_to: Option<String>,
248}
249
250trait TransactionExtForLinkedChunks {
251 fn rebuild_chunk(
252 &self,
253 store: &SqliteEventCacheStore,
254 linked_chunk_id: &Key,
255 previous: Option<u64>,
256 index: u64,
257 next: Option<u64>,
258 chunk_type: &str,
259 ) -> Result<RawChunk<Event, Gap>>;
260
261 fn load_gap_content(
262 &self,
263 store: &SqliteEventCacheStore,
264 linked_chunk_id: &Key,
265 chunk_id: ChunkIdentifier,
266 ) -> Result<Gap>;
267
268 fn load_events_content(
269 &self,
270 store: &SqliteEventCacheStore,
271 linked_chunk_id: &Key,
272 chunk_id: ChunkIdentifier,
273 ) -> Result<Vec<Event>>;
274}
275
276impl TransactionExtForLinkedChunks for Transaction<'_> {
277 fn rebuild_chunk(
278 &self,
279 store: &SqliteEventCacheStore,
280 linked_chunk_id: &Key,
281 previous: Option<u64>,
282 id: u64,
283 next: Option<u64>,
284 chunk_type: &str,
285 ) -> Result<RawChunk<Event, Gap>> {
286 let previous = previous.map(ChunkIdentifier::new);
287 let next = next.map(ChunkIdentifier::new);
288 let id = ChunkIdentifier::new(id);
289
290 match chunk_type {
291 CHUNK_TYPE_GAP_TYPE_STRING => {
292 let gap = self.load_gap_content(store, linked_chunk_id, id)?;
294 Ok(RawChunk { content: ChunkContent::Gap(gap), previous, identifier: id, next })
295 }
296
297 CHUNK_TYPE_EVENT_TYPE_STRING => {
298 let events = self.load_events_content(store, linked_chunk_id, id)?;
300 Ok(RawChunk {
301 content: ChunkContent::Items(events),
302 previous,
303 identifier: id,
304 next,
305 })
306 }
307
308 other => {
309 Err(Error::InvalidData {
311 details: format!("a linked chunk has an unknown type {other}"),
312 })
313 }
314 }
315 }
316
317 fn load_gap_content(
318 &self,
319 store: &SqliteEventCacheStore,
320 linked_chunk_id: &Key,
321 chunk_id: ChunkIdentifier,
322 ) -> Result<Gap> {
323 let encoded_prev_token: Vec<u8> = self.query_row(
326 "SELECT prev_token FROM gap_chunks WHERE chunk_id = ? AND linked_chunk_id = ?",
327 (chunk_id.index(), &linked_chunk_id),
328 |row| row.get(0),
329 )?;
330 let prev_token_bytes = store.decode_value(&encoded_prev_token)?;
331 let prev_token = serde_json::from_slice(&prev_token_bytes)?;
332 Ok(Gap { prev_token })
333 }
334
335 fn load_events_content(
336 &self,
337 store: &SqliteEventCacheStore,
338 linked_chunk_id: &Key,
339 chunk_id: ChunkIdentifier,
340 ) -> Result<Vec<Event>> {
341 let mut events = Vec::new();
343
344 for event_data in self
345 .prepare(
346 r#"
347 SELECT events.content
348 FROM event_chunks ec, events
349 WHERE events.event_id = ec.event_id AND ec.chunk_id = ? AND ec.linked_chunk_id = ?
350 ORDER BY ec.position ASC
351 "#,
352 )?
353 .query_map((chunk_id.index(), &linked_chunk_id), |row| row.get::<_, Vec<u8>>(0))?
354 {
355 let encoded_content = event_data?;
356 let serialized_content = store.decode_value(&encoded_content)?;
357 let event = serde_json::from_slice(&serialized_content)?;
358
359 events.push(event);
360 }
361
362 Ok(events)
363 }
364}
365
366async fn run_migrations(conn: &SqliteAsyncConn, version: u8) -> Result<()> {
368 if version == 0 {
369 debug!("Creating database");
370 } else if version < DATABASE_VERSION {
371 debug!(version, new_version = DATABASE_VERSION, "Upgrading database");
372 } else {
373 return Ok(());
374 }
375
376 conn.execute_batch("PRAGMA foreign_keys = ON;").await?;
378
379 if version < 1 {
380 conn.execute_batch("PRAGMA journal_mode = wal;").await?;
383 conn.with_transaction(|txn| {
384 txn.execute_batch(include_str!("../migrations/event_cache_store/001_init.sql"))?;
385 txn.set_db_version(1)
386 })
387 .await?;
388 }
389
390 if version < 2 {
391 conn.with_transaction(|txn| {
392 txn.execute_batch(include_str!("../migrations/event_cache_store/002_lease_locks.sql"))?;
393 txn.set_db_version(2)
394 })
395 .await?;
396 }
397
398 if version < 3 {
399 conn.with_transaction(|txn| {
400 txn.execute_batch(include_str!("../migrations/event_cache_store/003_events.sql"))?;
401 txn.set_db_version(3)
402 })
403 .await?;
404 }
405
406 if version < 4 {
407 conn.with_transaction(|txn| {
408 txn.execute_batch(include_str!(
409 "../migrations/event_cache_store/004_ignore_policy.sql"
410 ))?;
411 txn.set_db_version(4)
412 })
413 .await?;
414 }
415
416 if version < 5 {
417 conn.with_transaction(|txn| {
418 txn.execute_batch(include_str!(
419 "../migrations/event_cache_store/005_events_index_on_event_id.sql"
420 ))?;
421 txn.set_db_version(5)
422 })
423 .await?;
424 }
425
426 if version < 6 {
427 conn.with_transaction(|txn| {
428 txn.execute_batch(include_str!("../migrations/event_cache_store/006_events.sql"))?;
429 txn.set_db_version(6)
430 })
431 .await?;
432 }
433
434 if version < 7 {
435 conn.with_transaction(|txn| {
436 txn.execute_batch(include_str!(
437 "../migrations/event_cache_store/007_event_chunks.sql"
438 ))?;
439 txn.set_db_version(7)
440 })
441 .await?;
442 }
443
444 if version < 8 {
445 conn.with_transaction(|txn| {
446 txn.execute_batch(include_str!(
447 "../migrations/event_cache_store/008_linked_chunk_id.sql"
448 ))?;
449 txn.set_db_version(8)
450 })
451 .await?;
452 }
453
454 if version < 9 {
455 conn.with_transaction(|txn| {
456 txn.execute_batch(include_str!(
457 "../migrations/event_cache_store/009_related_event_index.sql"
458 ))?;
459 txn.set_db_version(9)
460 })
461 .await?;
462 }
463
464 Ok(())
465}
466
467#[async_trait]
468impl EventCacheStore for SqliteEventCacheStore {
469 type Error = Error;
470
471 #[instrument(skip(self))]
472 async fn try_take_leased_lock(
473 &self,
474 lease_duration_ms: u32,
475 key: &str,
476 holder: &str,
477 ) -> Result<bool> {
478 let _timer = timer!("method");
479
480 let key = key.to_owned();
481 let holder = holder.to_owned();
482
483 let now: u64 = MilliSecondsSinceUnixEpoch::now().get().into();
484 let expiration = now + lease_duration_ms as u64;
485
486 let num_touched = self
487 .write()
488 .await?
489 .with_transaction(move |txn| {
490 txn.execute(
491 "INSERT INTO lease_locks (key, holder, expiration)
492 VALUES (?1, ?2, ?3)
493 ON CONFLICT (key)
494 DO
495 UPDATE SET holder = ?2, expiration = ?3
496 WHERE holder = ?2
497 OR expiration < ?4
498 ",
499 (key, holder, expiration, now),
500 )
501 })
502 .await?;
503
504 Ok(num_touched == 1)
505 }
506
507 #[instrument(skip(self, updates))]
508 async fn handle_linked_chunk_updates(
509 &self,
510 linked_chunk_id: LinkedChunkId<'_>,
511 updates: Vec<Update<Event, Gap>>,
512 ) -> Result<(), Self::Error> {
513 let _timer = timer!("method");
514
515 let hashed_linked_chunk_id =
518 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
519 let linked_chunk_id = linked_chunk_id.to_owned();
520 let this = self.clone();
521
522 with_immediate_transaction(self, move |txn| {
523 for up in updates {
524 match up {
525 Update::NewItemsChunk { previous, new, next } => {
526 let previous = previous.as_ref().map(ChunkIdentifier::index);
527 let new = new.index();
528 let next = next.as_ref().map(ChunkIdentifier::index);
529
530 trace!(
531 %linked_chunk_id,
532 "new events chunk (prev={previous:?}, i={new}, next={next:?})",
533 );
534
535 insert_chunk(
536 txn,
537 &hashed_linked_chunk_id,
538 previous,
539 new,
540 next,
541 CHUNK_TYPE_EVENT_TYPE_STRING,
542 )?;
543 }
544
545 Update::NewGapChunk { previous, new, next, gap } => {
546 let serialized = serde_json::to_vec(&gap.prev_token)?;
547 let prev_token = this.encode_value(serialized)?;
548
549 let previous = previous.as_ref().map(ChunkIdentifier::index);
550 let new = new.index();
551 let next = next.as_ref().map(ChunkIdentifier::index);
552
553 trace!(
554 %linked_chunk_id,
555 "new gap chunk (prev={previous:?}, i={new}, next={next:?})",
556 );
557
558 insert_chunk(
560 txn,
561 &hashed_linked_chunk_id,
562 previous,
563 new,
564 next,
565 CHUNK_TYPE_GAP_TYPE_STRING,
566 )?;
567
568 txn.execute(
570 r#"
571 INSERT INTO gap_chunks(chunk_id, linked_chunk_id, prev_token)
572 VALUES (?, ?, ?)
573 "#,
574 (new, &hashed_linked_chunk_id, prev_token),
575 )?;
576 }
577
578 Update::RemoveChunk(chunk_identifier) => {
579 let chunk_id = chunk_identifier.index();
580
581 trace!(%linked_chunk_id, "removing chunk @ {chunk_id}");
582
583 let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
585 "SELECT previous, next FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?",
586 (chunk_id, &hashed_linked_chunk_id),
587 |row| Ok((row.get(0)?, row.get(1)?))
588 )?;
589
590 if let Some(previous) = previous {
592 txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND linked_chunk_id = ?", (next, previous, &hashed_linked_chunk_id))?;
593 }
594
595 if let Some(next) = next {
597 txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND linked_chunk_id = ?", (previous, next, &hashed_linked_chunk_id))?;
598 }
599
600 txn.execute("DELETE FROM linked_chunks WHERE id = ? AND linked_chunk_id = ?", (chunk_id, &hashed_linked_chunk_id))?;
603 }
604
605 Update::PushItems { at, items } => {
606 if items.is_empty() {
607 continue;
609 }
610
611 let chunk_id = at.chunk_identifier().index();
612
613 trace!(%linked_chunk_id, "pushing {} items @ {chunk_id}", items.len());
614
615 let mut chunk_statement = txn.prepare(
616 "INSERT INTO event_chunks(chunk_id, linked_chunk_id, event_id, position) VALUES (?, ?, ?, ?)"
617 )?;
618
619 let mut content_statement = txn.prepare(
624 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
625 )?;
626
627 let invalid_event = |event: TimelineEvent| {
628 let Some(event_id) = event.event_id() else {
629 error!(%linked_chunk_id, "Trying to push an event with no ID");
630 return None;
631 };
632
633 Some((event_id.to_string(), event))
634 };
635
636 let room_id = linked_chunk_id.room_id();
637 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
638
639 for (i, (event_id, event)) in items.into_iter().filter_map(invalid_event).enumerate() {
640 let index = at.index() + i;
642 chunk_statement.execute((chunk_id, &hashed_linked_chunk_id, &event_id, index))?;
643
644 let encoded_event = this.encode_event(&event)?;
646 content_statement.execute((&hashed_room_id, event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
647 }
648 }
649
650 Update::ReplaceItem { at, item: event } => {
651 let chunk_id = at.chunk_identifier().index();
652
653 let index = at.index();
654
655 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
656
657 let Some(event_id) = event.event_id().map(|event_id| event_id.to_string()) else {
659 error!(%linked_chunk_id, "Trying to replace an event with a new one that has no ID");
660 continue;
661 };
662
663 let encoded_event = this.encode_event(&event)?;
667 let room_id = linked_chunk_id.room_id();
668 let hashed_room_id = this.encode_key(keys::LINKED_CHUNKS, room_id);
669 txn.execute(
670 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
671 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
672
673 txn.execute(
675 r#"UPDATE event_chunks SET event_id = ? WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?"#,
676 (event_id, &hashed_linked_chunk_id, chunk_id, index)
677 )?;
678 }
679
680 Update::RemoveItem { at } => {
681 let chunk_id = at.chunk_identifier().index();
682 let index = at.index();
683
684 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
685
686 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position = ?", (&hashed_linked_chunk_id, chunk_id, index))?;
688
689 txn.execute(
788 r#"
789 UPDATE event_chunks
790 SET position = -(position - 1)
791 WHERE linked_chunk_id = ? AND chunk_id = ? AND position > ?
792 "#,
793 (&hashed_linked_chunk_id, chunk_id, index)
794 )?;
795 txn.execute(
796 r#"
797 UPDATE event_chunks
798 SET position = -position
799 WHERE position < 0 AND linked_chunk_id = ? AND chunk_id = ?
800 "#,
801 (&hashed_linked_chunk_id, chunk_id)
802 )?;
803 }
804
805 Update::DetachLastItems { at } => {
806 let chunk_id = at.chunk_identifier().index();
807 let index = at.index();
808
809 trace!(%linked_chunk_id, "truncating items >= {chunk_id}:{index}");
810
811 txn.execute("DELETE FROM event_chunks WHERE linked_chunk_id = ? AND chunk_id = ? AND position >= ?", (&hashed_linked_chunk_id, chunk_id, index))?;
813 }
814
815 Update::Clear => {
816 trace!(%linked_chunk_id, "clearing items");
817
818 txn.execute(
820 "DELETE FROM linked_chunks WHERE linked_chunk_id = ?",
821 (&hashed_linked_chunk_id,),
822 )?;
823 }
824
825 Update::StartReattachItems | Update::EndReattachItems => {
826 }
828 }
829 }
830
831 Ok(())
832 })
833 .await?;
834
835 Ok(())
836 }
837
838 #[instrument(skip(self))]
839 async fn load_all_chunks(
840 &self,
841 linked_chunk_id: LinkedChunkId<'_>,
842 ) -> Result<Vec<RawChunk<Event, Gap>>, Self::Error> {
843 let _timer = timer!("method");
844
845 let hashed_linked_chunk_id =
846 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
847
848 let this = self.clone();
849
850 let result = self
851 .read()
852 .await?
853 .with_transaction(move |txn| -> Result<_> {
854 let mut items = Vec::new();
855
856 for data in txn
858 .prepare(
859 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? ORDER BY id",
860 )?
861 .query_map((&hashed_linked_chunk_id,), Self::map_row_to_chunk)?
862 {
863 let (id, previous, next, chunk_type) = data?;
864 let new = txn.rebuild_chunk(
865 &this,
866 &hashed_linked_chunk_id,
867 previous,
868 id,
869 next,
870 chunk_type.as_str(),
871 )?;
872 items.push(new);
873 }
874
875 Ok(items)
876 })
877 .await?;
878
879 Ok(result)
880 }
881
882 #[instrument(skip(self))]
883 async fn load_all_chunks_metadata(
884 &self,
885 linked_chunk_id: LinkedChunkId<'_>,
886 ) -> Result<Vec<ChunkMetadata>, Self::Error> {
887 let _timer = timer!("method");
888
889 let hashed_linked_chunk_id =
890 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
891
892 self.read()
893 .await?
894 .with_transaction(move |txn| -> Result<_> {
895 let num_events_by_chunk_ids = txn
922 .prepare(
923 r#"
924 SELECT ec.chunk_id, COUNT(ec.event_id)
925 FROM event_chunks as ec
926 WHERE ec.linked_chunk_id = ?
927 GROUP BY ec.chunk_id
928 "#,
929 )?
930 .query_map((&hashed_linked_chunk_id,), |row| {
931 Ok((row.get::<_, u64>(0)?, row.get::<_, usize>(1)?))
932 })?
933 .collect::<Result<HashMap<_, _>, _>>()?;
934
935 txn.prepare(
936 r#"
937 SELECT
938 lc.id,
939 lc.previous,
940 lc.next,
941 lc.type
942 FROM linked_chunks as lc
943 WHERE lc.linked_chunk_id = ?
944 ORDER BY lc.id"#,
945 )?
946 .query_map((&hashed_linked_chunk_id,), |row| {
947 Ok((
948 row.get::<_, u64>(0)?,
949 row.get::<_, Option<u64>>(1)?,
950 row.get::<_, Option<u64>>(2)?,
951 row.get::<_, String>(3)?,
952 ))
953 })?
954 .map(|data| -> Result<_> {
955 let (id, previous, next, chunk_type) = data?;
956
957 let num_items = if chunk_type == CHUNK_TYPE_GAP_TYPE_STRING {
963 0
964 } else {
965 num_events_by_chunk_ids.get(&id).copied().unwrap_or(0)
966 };
967
968 Ok(ChunkMetadata {
969 identifier: ChunkIdentifier::new(id),
970 previous: previous.map(ChunkIdentifier::new),
971 next: next.map(ChunkIdentifier::new),
972 num_items,
973 })
974 })
975 .collect::<Result<Vec<_>, _>>()
976 })
977 .await
978 }
979
980 #[instrument(skip(self))]
981 async fn load_last_chunk(
982 &self,
983 linked_chunk_id: LinkedChunkId<'_>,
984 ) -> Result<(Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator), Self::Error> {
985 let _timer = timer!("method");
986
987 let hashed_linked_chunk_id =
988 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
989
990 let this = self.clone();
991
992 self
993 .read()
994 .await?
995 .with_transaction(move |txn| -> Result<_> {
996 let (chunk_identifier_generator, number_of_chunks) = txn
998 .prepare(
999 "SELECT MAX(id), COUNT(*) FROM linked_chunks WHERE linked_chunk_id = ?"
1000 )?
1001 .query_row(
1002 (&hashed_linked_chunk_id,),
1003 |row| {
1004 Ok((
1005 row.get::<_, Option<u64>>(0)?,
1010 row.get::<_, u64>(1)?,
1011 ))
1012 }
1013 )?;
1014
1015 let chunk_identifier_generator = match chunk_identifier_generator {
1016 Some(last_chunk_identifier) => {
1017 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(
1018 ChunkIdentifier::new(last_chunk_identifier)
1019 )
1020 },
1021 None => ChunkIdentifierGenerator::new_from_scratch(),
1022 };
1023
1024 let Some((chunk_identifier, previous_chunk, chunk_type)) = txn
1026 .prepare(
1027 "SELECT id, previous, type FROM linked_chunks WHERE linked_chunk_id = ? AND next IS NULL"
1028 )?
1029 .query_row(
1030 (&hashed_linked_chunk_id,),
1031 |row| {
1032 Ok((
1033 row.get::<_, u64>(0)?,
1034 row.get::<_, Option<u64>>(1)?,
1035 row.get::<_, String>(2)?,
1036 ))
1037 }
1038 )
1039 .optional()?
1040 else {
1041 if number_of_chunks == 0 {
1044 return Ok((None, chunk_identifier_generator));
1045 }
1046 else {
1051 return Err(Error::InvalidData {
1052 details:
1053 "last chunk is not found but chunks exist: the linked chunk contains a cycle"
1054 .to_owned()
1055 }
1056 )
1057 }
1058 };
1059
1060 let last_chunk = txn.rebuild_chunk(
1062 &this,
1063 &hashed_linked_chunk_id,
1064 previous_chunk,
1065 chunk_identifier,
1066 None,
1067 &chunk_type
1068 )?;
1069
1070 Ok((Some(last_chunk), chunk_identifier_generator))
1071 })
1072 .await
1073 }
1074
1075 #[instrument(skip(self))]
1076 async fn load_previous_chunk(
1077 &self,
1078 linked_chunk_id: LinkedChunkId<'_>,
1079 before_chunk_identifier: ChunkIdentifier,
1080 ) -> Result<Option<RawChunk<Event, Gap>>, Self::Error> {
1081 let _timer = timer!("method");
1082
1083 let hashed_linked_chunk_id =
1084 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1085
1086 let this = self.clone();
1087
1088 self
1089 .read()
1090 .await?
1091 .with_transaction(move |txn| -> Result<_> {
1092 let Some((chunk_identifier, previous_chunk, next_chunk, chunk_type)) = txn
1094 .prepare(
1095 "SELECT id, previous, next, type FROM linked_chunks WHERE linked_chunk_id = ? AND next = ?"
1096 )?
1097 .query_row(
1098 (&hashed_linked_chunk_id, before_chunk_identifier.index()),
1099 |row| {
1100 Ok((
1101 row.get::<_, u64>(0)?,
1102 row.get::<_, Option<u64>>(1)?,
1103 row.get::<_, Option<u64>>(2)?,
1104 row.get::<_, String>(3)?,
1105 ))
1106 }
1107 )
1108 .optional()?
1109 else {
1110 return Ok(None);
1112 };
1113
1114 let last_chunk = txn.rebuild_chunk(
1116 &this,
1117 &hashed_linked_chunk_id,
1118 previous_chunk,
1119 chunk_identifier,
1120 next_chunk,
1121 &chunk_type
1122 )?;
1123
1124 Ok(Some(last_chunk))
1125 })
1126 .await
1127 }
1128
1129 #[instrument(skip(self))]
1130 async fn clear_all_linked_chunks(&self) -> Result<(), Self::Error> {
1131 let _timer = timer!("method");
1132
1133 self.write()
1134 .await?
1135 .with_transaction(move |txn| {
1136 txn.execute("DELETE FROM linked_chunks", ())?;
1138 txn.execute("DELETE FROM events", ())
1140 })
1141 .await?;
1142
1143 Ok(())
1144 }
1145
1146 #[instrument(skip(self, events))]
1147 async fn filter_duplicated_events(
1148 &self,
1149 linked_chunk_id: LinkedChunkId<'_>,
1150 events: Vec<OwnedEventId>,
1151 ) -> Result<Vec<(OwnedEventId, Position)>, Self::Error> {
1152 let _timer = timer!("method");
1153
1154 if events.is_empty() {
1158 return Ok(Vec::new());
1159 }
1160
1161 let hashed_linked_chunk_id =
1163 self.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key());
1164 let linked_chunk_id = linked_chunk_id.to_owned();
1165
1166 self.read()
1167 .await?
1168 .with_transaction(move |txn| -> Result<_> {
1169 txn.chunk_large_query_over(events, None, move |txn, events| {
1170 let query = format!(
1171 r#"
1172 SELECT event_id, chunk_id, position
1173 FROM event_chunks
1174 WHERE linked_chunk_id = ? AND event_id IN ({})
1175 ORDER BY chunk_id ASC, position ASC
1176 "#,
1177 repeat_vars(events.len()),
1178 );
1179
1180 let parameters = params_from_iter(
1181 once(
1183 hashed_linked_chunk_id
1184 .to_sql()
1185 .unwrap(),
1187 )
1188 .chain(events.iter().map(|event| {
1190 event
1191 .as_str()
1192 .to_sql()
1193 .unwrap()
1195 })),
1196 );
1197
1198 let mut duplicated_events = Vec::new();
1199
1200 for duplicated_event in txn.prepare(&query)?.query_map(parameters, |row| {
1201 Ok((
1202 row.get::<_, String>(0)?,
1203 row.get::<_, u64>(1)?,
1204 row.get::<_, usize>(2)?,
1205 ))
1206 })? {
1207 let (duplicated_event, chunk_identifier, index) = duplicated_event?;
1208
1209 let Ok(duplicated_event) = EventId::parse(duplicated_event.clone()) else {
1210 error!(%duplicated_event, %linked_chunk_id, "Reading an malformed event ID");
1213 continue;
1214 };
1215
1216 duplicated_events.push((
1217 duplicated_event,
1218 Position::new(ChunkIdentifier::new(chunk_identifier), index),
1219 ));
1220 }
1221
1222 Ok(duplicated_events)
1223 })
1224 })
1225 .await
1226 }
1227
1228 #[instrument(skip(self, event_id))]
1229 async fn find_event(
1230 &self,
1231 room_id: &RoomId,
1232 event_id: &EventId,
1233 ) -> Result<Option<Event>, Self::Error> {
1234 let _timer = timer!("method");
1235
1236 let event_id = event_id.to_owned();
1237 let this = self.clone();
1238
1239 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1240
1241 self.read()
1242 .await?
1243 .with_transaction(move |txn| -> Result<_> {
1244 let Some(event) = txn
1245 .prepare("SELECT content FROM events WHERE event_id = ? AND room_id = ?")?
1246 .query_row((event_id.as_str(), hashed_room_id), |row| row.get::<_, Vec<u8>>(0))
1247 .optional()?
1248 else {
1249 return Ok(None);
1251 };
1252
1253 let event = serde_json::from_slice(&this.decode_value(&event)?)?;
1254
1255 Ok(Some(event))
1256 })
1257 .await
1258 }
1259
1260 #[instrument(skip(self, event_id, filters))]
1261 async fn find_event_relations(
1262 &self,
1263 room_id: &RoomId,
1264 event_id: &EventId,
1265 filters: Option<&[RelationType]>,
1266 ) -> Result<Vec<(Event, Option<Position>)>, Self::Error> {
1267 let _timer = timer!("method");
1268
1269 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1270
1271 let hashed_linked_chunk_id =
1272 self.encode_key(keys::LINKED_CHUNKS, LinkedChunkId::Room(room_id).storage_key());
1273
1274 let event_id = event_id.to_owned();
1275 let filters = filters.map(ToOwned::to_owned);
1276 let store = self.clone();
1277
1278 self.read()
1279 .await?
1280 .with_transaction(move |txn| -> Result<_> {
1281 find_event_relations_transaction(
1282 store,
1283 hashed_room_id,
1284 hashed_linked_chunk_id,
1285 event_id,
1286 filters,
1287 txn,
1288 )
1289 })
1290 .await
1291 }
1292
1293 #[instrument(skip(self, event))]
1294 async fn save_event(&self, room_id: &RoomId, event: Event) -> Result<(), Self::Error> {
1295 let _timer = timer!("method");
1296
1297 let Some(event_id) = event.event_id() else {
1298 error!(%room_id, "Trying to save an event with no ID");
1299 return Ok(());
1300 };
1301
1302 let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
1303 let event_id = event_id.to_string();
1304 let encoded_event = self.encode_event(&event)?;
1305
1306 self.write()
1307 .await?
1308 .with_transaction(move |txn| -> Result<_> {
1309 txn.execute(
1310 "INSERT OR REPLACE INTO events(room_id, event_id, content, relates_to, rel_type) VALUES (?, ?, ?, ?, ?)"
1311 , (&hashed_room_id, &event_id, encoded_event.content, encoded_event.relates_to, encoded_event.rel_type))?;
1312
1313 Ok(())
1314 })
1315 .await
1316 }
1317
1318 #[instrument(skip_all)]
1319 async fn add_media_content(
1320 &self,
1321 request: &MediaRequestParameters,
1322 content: Vec<u8>,
1323 ignore_policy: IgnoreMediaRetentionPolicy,
1324 ) -> Result<()> {
1325 let _timer = timer!("method");
1326
1327 self.media_service.add_media_content(self, request, content, ignore_policy).await
1328 }
1329
1330 #[instrument(skip_all)]
1331 async fn replace_media_key(
1332 &self,
1333 from: &MediaRequestParameters,
1334 to: &MediaRequestParameters,
1335 ) -> Result<(), Self::Error> {
1336 let _timer = timer!("method");
1337
1338 let prev_uri = self.encode_key(keys::MEDIA, from.source.unique_key());
1339 let prev_format = self.encode_key(keys::MEDIA, from.format.unique_key());
1340
1341 let new_uri = self.encode_key(keys::MEDIA, to.source.unique_key());
1342 let new_format = self.encode_key(keys::MEDIA, to.format.unique_key());
1343
1344 let conn = self.write().await?;
1345 conn.execute(
1346 r#"UPDATE media SET uri = ?, format = ? WHERE uri = ? AND format = ?"#,
1347 (new_uri, new_format, prev_uri, prev_format),
1348 )
1349 .await?;
1350
1351 Ok(())
1352 }
1353
1354 #[instrument(skip_all)]
1355 async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
1356 let _timer = timer!("method");
1357
1358 self.media_service.get_media_content(self, request).await
1359 }
1360
1361 #[instrument(skip_all)]
1362 async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
1363 let _timer = timer!("method");
1364
1365 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1366 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1367
1368 let conn = self.write().await?;
1369 conn.execute("DELETE FROM media WHERE uri = ? AND format = ?", (uri, format)).await?;
1370
1371 Ok(())
1372 }
1373
1374 #[instrument(skip(self))]
1375 async fn get_media_content_for_uri(
1376 &self,
1377 uri: &MxcUri,
1378 ) -> Result<Option<Vec<u8>>, Self::Error> {
1379 let _timer = timer!("method");
1380
1381 self.media_service.get_media_content_for_uri(self, uri).await
1382 }
1383
1384 #[instrument(skip(self))]
1385 async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
1386 let _timer = timer!("method");
1387
1388 let uri = self.encode_key(keys::MEDIA, uri);
1389
1390 let conn = self.write().await?;
1391 conn.execute("DELETE FROM media WHERE uri = ?", (uri,)).await?;
1392
1393 Ok(())
1394 }
1395
1396 #[instrument(skip_all)]
1397 async fn set_media_retention_policy(
1398 &self,
1399 policy: MediaRetentionPolicy,
1400 ) -> Result<(), Self::Error> {
1401 let _timer = timer!("method");
1402
1403 self.media_service.set_media_retention_policy(self, policy).await
1404 }
1405
1406 #[instrument(skip_all)]
1407 fn media_retention_policy(&self) -> MediaRetentionPolicy {
1408 let _timer = timer!("method");
1409
1410 self.media_service.media_retention_policy()
1411 }
1412
1413 #[instrument(skip_all)]
1414 async fn set_ignore_media_retention_policy(
1415 &self,
1416 request: &MediaRequestParameters,
1417 ignore_policy: IgnoreMediaRetentionPolicy,
1418 ) -> Result<(), Self::Error> {
1419 let _timer = timer!("method");
1420
1421 self.media_service.set_ignore_media_retention_policy(self, request, ignore_policy).await
1422 }
1423
1424 #[instrument(skip_all)]
1425 async fn clean_up_media_cache(&self) -> Result<(), Self::Error> {
1426 let _timer = timer!("method");
1427
1428 self.media_service.clean_up_media_cache(self).await
1429 }
1430}
1431
1432#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1433#[cfg_attr(not(target_family = "wasm"), async_trait)]
1434impl EventCacheStoreMedia for SqliteEventCacheStore {
1435 type Error = Error;
1436
1437 async fn media_retention_policy_inner(
1438 &self,
1439 ) -> Result<Option<MediaRetentionPolicy>, Self::Error> {
1440 let conn = self.read().await?;
1441 conn.get_serialized_kv(keys::MEDIA_RETENTION_POLICY).await
1442 }
1443
1444 async fn set_media_retention_policy_inner(
1445 &self,
1446 policy: MediaRetentionPolicy,
1447 ) -> Result<(), Self::Error> {
1448 let conn = self.write().await?;
1449 conn.set_serialized_kv(keys::MEDIA_RETENTION_POLICY, policy).await?;
1450 Ok(())
1451 }
1452
1453 async fn add_media_content_inner(
1454 &self,
1455 request: &MediaRequestParameters,
1456 data: Vec<u8>,
1457 last_access: SystemTime,
1458 policy: MediaRetentionPolicy,
1459 ignore_policy: IgnoreMediaRetentionPolicy,
1460 ) -> Result<(), Self::Error> {
1461 let ignore_policy = ignore_policy.is_yes();
1462 let data = self.encode_value(data)?;
1463
1464 if !ignore_policy && policy.exceeds_max_file_size(data.len() as u64) {
1465 return Ok(());
1466 }
1467
1468 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1469 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1470 let timestamp = time_to_timestamp(last_access);
1471
1472 let conn = self.write().await?;
1473 conn.execute(
1474 "INSERT OR REPLACE INTO media (uri, format, data, last_access, ignore_policy) VALUES (?, ?, ?, ?, ?)",
1475 (uri, format, data, timestamp, ignore_policy),
1476 )
1477 .await?;
1478
1479 Ok(())
1480 }
1481
1482 async fn set_ignore_media_retention_policy_inner(
1483 &self,
1484 request: &MediaRequestParameters,
1485 ignore_policy: IgnoreMediaRetentionPolicy,
1486 ) -> Result<(), Self::Error> {
1487 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1488 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1489 let ignore_policy = ignore_policy.is_yes();
1490
1491 let conn = self.write().await?;
1492 conn.execute(
1493 r#"UPDATE media SET ignore_policy = ? WHERE uri = ? AND format = ?"#,
1494 (ignore_policy, uri, format),
1495 )
1496 .await?;
1497
1498 Ok(())
1499 }
1500
1501 async fn get_media_content_inner(
1502 &self,
1503 request: &MediaRequestParameters,
1504 current_time: SystemTime,
1505 ) -> Result<Option<Vec<u8>>, Self::Error> {
1506 let uri = self.encode_key(keys::MEDIA, request.source.unique_key());
1507 let format = self.encode_key(keys::MEDIA, request.format.unique_key());
1508 let timestamp = time_to_timestamp(current_time);
1509
1510 let conn = self.write().await?;
1511 let data = conn
1512 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1513 txn.execute(
1517 "UPDATE media SET last_access = ? WHERE uri = ? AND format = ?",
1518 (timestamp, &uri, &format),
1519 )?;
1520
1521 txn.query_row::<Vec<u8>, _, _>(
1522 "SELECT data FROM media WHERE uri = ? AND format = ?",
1523 (&uri, &format),
1524 |row| row.get(0),
1525 )
1526 .optional()
1527 })
1528 .await?;
1529
1530 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1531 }
1532
1533 async fn get_media_content_for_uri_inner(
1534 &self,
1535 uri: &MxcUri,
1536 current_time: SystemTime,
1537 ) -> Result<Option<Vec<u8>>, Self::Error> {
1538 let uri = self.encode_key(keys::MEDIA, uri);
1539 let timestamp = time_to_timestamp(current_time);
1540
1541 let conn = self.write().await?;
1542 let data = conn
1543 .with_transaction::<_, rusqlite::Error, _>(move |txn| {
1544 txn.execute("UPDATE media SET last_access = ? WHERE uri = ?", (timestamp, &uri))?;
1548
1549 txn.query_row::<Vec<u8>, _, _>(
1550 "SELECT data FROM media WHERE uri = ?",
1551 (&uri,),
1552 |row| row.get(0),
1553 )
1554 .optional()
1555 })
1556 .await?;
1557
1558 data.map(|v| self.decode_value(&v).map(Into::into)).transpose()
1559 }
1560
1561 async fn clean_up_media_cache_inner(
1562 &self,
1563 policy: MediaRetentionPolicy,
1564 current_time: SystemTime,
1565 ) -> Result<(), Self::Error> {
1566 if !policy.has_limitations() {
1567 return Ok(());
1569 }
1570
1571 let conn = self.write().await?;
1572 let removed = conn
1573 .with_transaction::<_, Error, _>(move |txn| {
1574 let mut removed = false;
1575
1576 if let Some(max_file_size) = policy.computed_max_file_size() {
1578 let count = txn.execute(
1579 "DELETE FROM media WHERE ignore_policy IS FALSE AND length(data) > ?",
1580 (max_file_size,),
1581 )?;
1582
1583 if count > 0 {
1584 removed = true;
1585 }
1586 }
1587
1588 if let Some(last_access_expiry) = policy.last_access_expiry {
1590 let current_timestamp = time_to_timestamp(current_time);
1591 let expiry_secs = last_access_expiry.as_secs();
1592 let count = txn.execute(
1593 "DELETE FROM media WHERE ignore_policy IS FALSE AND (? - last_access) >= ?",
1594 (current_timestamp, expiry_secs),
1595 )?;
1596
1597 if count > 0 {
1598 removed = true;
1599 }
1600 }
1601
1602 if let Some(max_cache_size) = policy.max_cache_size {
1604 let cache_size = txn
1607 .query_row(
1608 "SELECT sum(length(data)) FROM media WHERE ignore_policy IS FALSE",
1609 (),
1610 |row| {
1611 row.get::<_, Option<u64>>(0)
1613 },
1614 )?
1615 .unwrap_or_default();
1616
1617 if cache_size > max_cache_size {
1619 let mut cached_stmt = txn.prepare_cached(
1621 "SELECT rowid, length(data) FROM media \
1622 WHERE ignore_policy IS FALSE ORDER BY last_access DESC",
1623 )?;
1624 let content_sizes = cached_stmt
1625 .query(())?
1626 .mapped(|row| Ok((row.get::<_, i64>(0)?, row.get::<_, u64>(1)?)));
1627
1628 let mut accumulated_items_size = 0u64;
1629 let mut limit_reached = false;
1630 let mut rows_to_remove = Vec::new();
1631
1632 for result in content_sizes {
1633 let (row_id, size) = match result {
1634 Ok(content_size) => content_size,
1635 Err(error) => {
1636 return Err(error.into());
1637 }
1638 };
1639
1640 if limit_reached {
1641 rows_to_remove.push(row_id);
1642 continue;
1643 }
1644
1645 match accumulated_items_size.checked_add(size) {
1646 Some(acc) if acc > max_cache_size => {
1647 limit_reached = true;
1649 rows_to_remove.push(row_id);
1650 }
1651 Some(acc) => accumulated_items_size = acc,
1652 None => {
1653 limit_reached = true;
1656 rows_to_remove.push(row_id);
1657 }
1658 }
1659 }
1660
1661 if !rows_to_remove.is_empty() {
1662 removed = true;
1663 }
1664
1665 txn.chunk_large_query_over(rows_to_remove, None, |txn, row_ids| {
1666 let sql_params = repeat_vars(row_ids.len());
1667 let query = format!("DELETE FROM media WHERE rowid IN ({sql_params})");
1668 txn.prepare(&query)?.execute(params_from_iter(row_ids))?;
1669 Ok(Vec::<()>::new())
1670 })?;
1671 }
1672 }
1673
1674 txn.set_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME, current_time)?;
1675
1676 Ok(removed)
1677 })
1678 .await?;
1679
1680 if removed {
1683 conn.vacuum().await?;
1684 }
1685
1686 Ok(())
1687 }
1688
1689 async fn last_media_cleanup_time_inner(&self) -> Result<Option<SystemTime>, Self::Error> {
1690 let conn = self.read().await?;
1691 conn.get_serialized_kv(keys::LAST_MEDIA_CLEANUP_TIME).await
1692 }
1693}
1694
1695fn find_event_relations_transaction(
1696 store: SqliteEventCacheStore,
1697 hashed_room_id: Key,
1698 hashed_linked_chunk_id: Key,
1699 event_id: OwnedEventId,
1700 filters: Option<Vec<RelationType>>,
1701 txn: &Transaction<'_>,
1702) -> Result<Vec<(Event, Option<Position>)>> {
1703 let get_rows = |row: &rusqlite::Row<'_>| {
1704 Ok((
1705 row.get::<_, Vec<u8>>(0)?,
1706 row.get::<_, Option<u64>>(1)?,
1707 row.get::<_, Option<usize>>(2)?,
1708 ))
1709 };
1710
1711 let collect_results = |transaction| {
1713 let mut related = Vec::new();
1714
1715 for result in transaction {
1716 let (event_blob, chunk_id, index): (Vec<u8>, Option<u64>, _) = result?;
1717
1718 let event: Event = serde_json::from_slice(&store.decode_value(&event_blob)?)?;
1719
1720 let pos = chunk_id
1723 .zip(index)
1724 .map(|(chunk_id, index)| Position::new(ChunkIdentifier::new(chunk_id), index));
1725
1726 related.push((event, pos));
1727 }
1728
1729 Ok(related)
1730 };
1731
1732 let related = if let Some(filters) = compute_filters_string(filters.as_deref()) {
1733 let question_marks = repeat_vars(filters.len());
1734 let query = format!(
1735 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1736 FROM events
1737 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1738 WHERE relates_to = ? AND room_id = ? AND rel_type IN ({question_marks})"
1739 );
1740
1741 let filters: Vec<_> = filters.iter().map(|f| f.to_sql().unwrap()).collect();
1742 let parameters = params_from_iter(
1743 [
1744 hashed_linked_chunk_id.to_sql().expect(
1745 "We should be able to convert a hashed linked chunk ID to a SQLite value",
1746 ),
1747 event_id
1748 .as_str()
1749 .to_sql()
1750 .expect("We should be able to convert an event ID to a SQLite value"),
1751 hashed_room_id
1752 .to_sql()
1753 .expect("We should be able to convert a room ID to a SQLite value"),
1754 ]
1755 .into_iter()
1756 .chain(filters),
1757 );
1758
1759 let mut transaction = txn.prepare(&query)?;
1760 let transaction = transaction.query_map(parameters, get_rows)?;
1761
1762 collect_results(transaction)
1763 } else {
1764 let query =
1765 "SELECT events.content, event_chunks.chunk_id, event_chunks.position
1766 FROM events
1767 LEFT JOIN event_chunks ON events.event_id = event_chunks.event_id AND event_chunks.linked_chunk_id = ?
1768 WHERE relates_to = ? AND room_id = ?";
1769 let parameters = (hashed_linked_chunk_id, event_id.as_str(), hashed_room_id);
1770
1771 let mut transaction = txn.prepare(query)?;
1772 let transaction = transaction.query_map(parameters, get_rows)?;
1773
1774 collect_results(transaction)
1775 };
1776
1777 related
1778}
1779
1780async fn with_immediate_transaction<
1785 T: Send + 'static,
1786 F: FnOnce(&Transaction<'_>) -> Result<T, Error> + Send + 'static,
1787>(
1788 this: &SqliteEventCacheStore,
1789 f: F,
1790) -> Result<T, Error> {
1791 this.write()
1792 .await?
1793 .interact(move |conn| -> Result<T, Error> {
1794 conn.set_transaction_behavior(TransactionBehavior::Immediate);
1798
1799 let code = || -> Result<T, Error> {
1800 let txn = conn.transaction()?;
1801 let res = f(&txn)?;
1802 txn.commit()?;
1803 Ok(res)
1804 };
1805
1806 let res = code();
1807
1808 conn.set_transaction_behavior(TransactionBehavior::Deferred);
1811
1812 res
1813 })
1814 .await
1815 .unwrap()
1817}
1818
1819fn insert_chunk(
1820 txn: &Transaction<'_>,
1821 linked_chunk_id: &Key,
1822 previous: Option<u64>,
1823 new: u64,
1824 next: Option<u64>,
1825 type_str: &str,
1826) -> rusqlite::Result<()> {
1827 txn.execute(
1829 r#"
1830 INSERT INTO linked_chunks(id, linked_chunk_id, previous, next, type)
1831 VALUES (?, ?, ?, ?, ?)
1832 "#,
1833 (new, linked_chunk_id, previous, next, type_str),
1834 )?;
1835
1836 if let Some(previous) = previous {
1838 txn.execute(
1839 r#"
1840 UPDATE linked_chunks
1841 SET next = ?
1842 WHERE id = ? AND linked_chunk_id = ?
1843 "#,
1844 (new, previous, linked_chunk_id),
1845 )?;
1846 }
1847
1848 if let Some(next) = next {
1850 txn.execute(
1851 r#"
1852 UPDATE linked_chunks
1853 SET previous = ?
1854 WHERE id = ? AND linked_chunk_id = ?
1855 "#,
1856 (new, next, linked_chunk_id),
1857 )?;
1858 }
1859
1860 Ok(())
1861}
1862
1863#[cfg(test)]
1864mod tests {
1865 use std::{
1866 path::PathBuf,
1867 sync::atomic::{AtomicU32, Ordering::SeqCst},
1868 time::Duration,
1869 };
1870
1871 use assert_matches::assert_matches;
1872 use matrix_sdk_base::{
1873 event_cache::{
1874 store::{
1875 integration_tests::{
1876 check_test_event, make_test_event, make_test_event_with_event_id,
1877 },
1878 media::IgnoreMediaRetentionPolicy,
1879 EventCacheStore, EventCacheStoreError,
1880 },
1881 Gap,
1882 },
1883 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
1884 event_cache_store_media_integration_tests,
1885 linked_chunk::{ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update},
1886 media::{MediaFormat, MediaRequestParameters, MediaThumbnailSettings},
1887 };
1888 use matrix_sdk_test::{async_test, DEFAULT_TEST_ROOM_ID};
1889 use once_cell::sync::Lazy;
1890 use ruma::{event_id, events::room::MediaSource, media::Method, mxc_uri, room_id, uint};
1891 use tempfile::{tempdir, TempDir};
1892
1893 use super::SqliteEventCacheStore;
1894 use crate::{
1895 event_cache_store::keys,
1896 utils::{EncryptableStore as _, SqliteAsyncConnExt},
1897 SqliteStoreConfig,
1898 };
1899
1900 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
1901 static NUM: AtomicU32 = AtomicU32::new(0);
1902
1903 fn new_event_cache_store_workspace() -> PathBuf {
1904 let name = NUM.fetch_add(1, SeqCst).to_string();
1905 TMP_DIR.path().join(name)
1906 }
1907
1908 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
1909 let tmpdir_path = new_event_cache_store_workspace();
1910
1911 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
1912
1913 Ok(SqliteEventCacheStore::open(tmpdir_path.to_str().unwrap(), None).await.unwrap())
1914 }
1915
1916 event_cache_store_integration_tests!();
1917 event_cache_store_integration_tests_time!();
1918 event_cache_store_media_integration_tests!(with_media_size_tests);
1919
1920 async fn get_event_cache_store_content_sorted_by_last_access(
1921 event_cache_store: &SqliteEventCacheStore,
1922 ) -> Vec<Vec<u8>> {
1923 let sqlite_db = event_cache_store.read().await.expect("accessing sqlite db failed");
1924 sqlite_db
1925 .prepare("SELECT data FROM media ORDER BY last_access DESC", |mut stmt| {
1926 stmt.query(())?.mapped(|row| row.get(0)).collect()
1927 })
1928 .await
1929 .expect("querying media cache content by last access failed")
1930 }
1931
1932 #[async_test]
1933 async fn test_pool_size() {
1934 let tmpdir_path = new_event_cache_store_workspace();
1935 let store_open_config = SqliteStoreConfig::new(tmpdir_path).pool_max_size(42);
1936
1937 let store = SqliteEventCacheStore::open_with_config(store_open_config).await.unwrap();
1938
1939 assert_eq!(store.pool.status().max_size, 42);
1940 }
1941
1942 #[async_test]
1943 async fn test_last_access() {
1944 let event_cache_store = get_event_cache_store().await.expect("creating media cache failed");
1945 let uri = mxc_uri!("mxc://localhost/media");
1946 let file_request = MediaRequestParameters {
1947 source: MediaSource::Plain(uri.to_owned()),
1948 format: MediaFormat::File,
1949 };
1950 let thumbnail_request = MediaRequestParameters {
1951 source: MediaSource::Plain(uri.to_owned()),
1952 format: MediaFormat::Thumbnail(MediaThumbnailSettings::with_method(
1953 Method::Crop,
1954 uint!(100),
1955 uint!(100),
1956 )),
1957 };
1958
1959 let content: Vec<u8> = "hello world".into();
1960 let thumbnail_content: Vec<u8> = "hello…".into();
1961
1962 event_cache_store
1964 .add_media_content(&file_request, content.clone(), IgnoreMediaRetentionPolicy::No)
1965 .await
1966 .expect("adding file failed");
1967
1968 tokio::time::sleep(Duration::from_secs(3)).await;
1971
1972 event_cache_store
1973 .add_media_content(
1974 &thumbnail_request,
1975 thumbnail_content.clone(),
1976 IgnoreMediaRetentionPolicy::No,
1977 )
1978 .await
1979 .expect("adding thumbnail failed");
1980
1981 let contents =
1983 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
1984
1985 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
1986 assert_eq!(contents[0], thumbnail_content, "thumbnail is not last access");
1987 assert_eq!(contents[1], content, "file is not second-to-last access");
1988
1989 tokio::time::sleep(Duration::from_secs(3)).await;
1992
1993 let _ = event_cache_store
1995 .get_media_content(&file_request)
1996 .await
1997 .expect("getting file failed")
1998 .expect("file is missing");
1999
2000 let contents =
2002 get_event_cache_store_content_sorted_by_last_access(&event_cache_store).await;
2003
2004 assert_eq!(contents.len(), 2, "media cache contents length is wrong");
2005 assert_eq!(contents[0], content, "file is not last access");
2006 assert_eq!(contents[1], thumbnail_content, "thumbnail is not second-to-last access");
2007 }
2008
2009 #[async_test]
2010 async fn test_linked_chunk_new_items_chunk() {
2011 let store = get_event_cache_store().await.expect("creating cache store failed");
2012
2013 let room_id = &DEFAULT_TEST_ROOM_ID;
2014 let linked_chunk_id = LinkedChunkId::Room(room_id);
2015
2016 store
2017 .handle_linked_chunk_updates(
2018 linked_chunk_id,
2019 vec![
2020 Update::NewItemsChunk {
2021 previous: None,
2022 new: ChunkIdentifier::new(42),
2023 next: None, },
2025 Update::NewItemsChunk {
2026 previous: Some(ChunkIdentifier::new(42)),
2027 new: ChunkIdentifier::new(13),
2028 next: Some(ChunkIdentifier::new(37)), },
2031 Update::NewItemsChunk {
2032 previous: Some(ChunkIdentifier::new(13)),
2033 new: ChunkIdentifier::new(37),
2034 next: None,
2035 },
2036 ],
2037 )
2038 .await
2039 .unwrap();
2040
2041 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2042
2043 assert_eq!(chunks.len(), 3);
2044
2045 {
2046 let c = chunks.remove(0);
2048 assert_eq!(c.identifier, ChunkIdentifier::new(13));
2049 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2050 assert_eq!(c.next, Some(ChunkIdentifier::new(37)));
2051 assert_matches!(c.content, ChunkContent::Items(events) => {
2052 assert!(events.is_empty());
2053 });
2054
2055 let c = chunks.remove(0);
2056 assert_eq!(c.identifier, ChunkIdentifier::new(37));
2057 assert_eq!(c.previous, Some(ChunkIdentifier::new(13)));
2058 assert_eq!(c.next, None);
2059 assert_matches!(c.content, ChunkContent::Items(events) => {
2060 assert!(events.is_empty());
2061 });
2062
2063 let c = chunks.remove(0);
2064 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2065 assert_eq!(c.previous, None);
2066 assert_eq!(c.next, Some(ChunkIdentifier::new(13)));
2067 assert_matches!(c.content, ChunkContent::Items(events) => {
2068 assert!(events.is_empty());
2069 });
2070 }
2071 }
2072
2073 #[async_test]
2074 async fn test_linked_chunk_new_gap_chunk() {
2075 let store = get_event_cache_store().await.expect("creating cache store failed");
2076
2077 let room_id = &DEFAULT_TEST_ROOM_ID;
2078 let linked_chunk_id = LinkedChunkId::Room(room_id);
2079
2080 store
2081 .handle_linked_chunk_updates(
2082 linked_chunk_id,
2083 vec![Update::NewGapChunk {
2084 previous: None,
2085 new: ChunkIdentifier::new(42),
2086 next: None,
2087 gap: Gap { prev_token: "raclette".to_owned() },
2088 }],
2089 )
2090 .await
2091 .unwrap();
2092
2093 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2094
2095 assert_eq!(chunks.len(), 1);
2096
2097 let c = chunks.remove(0);
2099 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2100 assert_eq!(c.previous, None);
2101 assert_eq!(c.next, None);
2102 assert_matches!(c.content, ChunkContent::Gap(gap) => {
2103 assert_eq!(gap.prev_token, "raclette");
2104 });
2105 }
2106
2107 #[async_test]
2108 async fn test_linked_chunk_replace_item() {
2109 let store = get_event_cache_store().await.expect("creating cache store failed");
2110
2111 let room_id = &DEFAULT_TEST_ROOM_ID;
2112 let linked_chunk_id = LinkedChunkId::Room(room_id);
2113 let event_id = event_id!("$world");
2114
2115 store
2116 .handle_linked_chunk_updates(
2117 linked_chunk_id,
2118 vec![
2119 Update::NewItemsChunk {
2120 previous: None,
2121 new: ChunkIdentifier::new(42),
2122 next: None,
2123 },
2124 Update::PushItems {
2125 at: Position::new(ChunkIdentifier::new(42), 0),
2126 items: vec![
2127 make_test_event(room_id, "hello"),
2128 make_test_event_with_event_id(room_id, "world", Some(event_id)),
2129 ],
2130 },
2131 Update::ReplaceItem {
2132 at: Position::new(ChunkIdentifier::new(42), 1),
2133 item: make_test_event_with_event_id(room_id, "yolo", Some(event_id)),
2134 },
2135 ],
2136 )
2137 .await
2138 .unwrap();
2139
2140 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2141
2142 assert_eq!(chunks.len(), 1);
2143
2144 let c = chunks.remove(0);
2145 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2146 assert_eq!(c.previous, None);
2147 assert_eq!(c.next, None);
2148 assert_matches!(c.content, ChunkContent::Items(events) => {
2149 assert_eq!(events.len(), 2);
2150 check_test_event(&events[0], "hello");
2151 check_test_event(&events[1], "yolo");
2152 });
2153 }
2154
2155 #[async_test]
2156 async fn test_linked_chunk_remove_chunk() {
2157 let store = get_event_cache_store().await.expect("creating cache store failed");
2158
2159 let room_id = &DEFAULT_TEST_ROOM_ID;
2160 let linked_chunk_id = LinkedChunkId::Room(room_id);
2161
2162 store
2163 .handle_linked_chunk_updates(
2164 linked_chunk_id,
2165 vec![
2166 Update::NewGapChunk {
2167 previous: None,
2168 new: ChunkIdentifier::new(42),
2169 next: None,
2170 gap: Gap { prev_token: "raclette".to_owned() },
2171 },
2172 Update::NewGapChunk {
2173 previous: Some(ChunkIdentifier::new(42)),
2174 new: ChunkIdentifier::new(43),
2175 next: None,
2176 gap: Gap { prev_token: "fondue".to_owned() },
2177 },
2178 Update::NewGapChunk {
2179 previous: Some(ChunkIdentifier::new(43)),
2180 new: ChunkIdentifier::new(44),
2181 next: None,
2182 gap: Gap { prev_token: "tartiflette".to_owned() },
2183 },
2184 Update::RemoveChunk(ChunkIdentifier::new(43)),
2185 ],
2186 )
2187 .await
2188 .unwrap();
2189
2190 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2191
2192 assert_eq!(chunks.len(), 2);
2193
2194 let c = chunks.remove(0);
2196 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2197 assert_eq!(c.previous, None);
2198 assert_eq!(c.next, Some(ChunkIdentifier::new(44)));
2199 assert_matches!(c.content, ChunkContent::Gap(gap) => {
2200 assert_eq!(gap.prev_token, "raclette");
2201 });
2202
2203 let c = chunks.remove(0);
2204 assert_eq!(c.identifier, ChunkIdentifier::new(44));
2205 assert_eq!(c.previous, Some(ChunkIdentifier::new(42)));
2206 assert_eq!(c.next, None);
2207 assert_matches!(c.content, ChunkContent::Gap(gap) => {
2208 assert_eq!(gap.prev_token, "tartiflette");
2209 });
2210
2211 let gaps = store
2213 .read()
2214 .await
2215 .unwrap()
2216 .with_transaction(|txn| -> rusqlite::Result<_> {
2217 let mut gaps = Vec::new();
2218 for data in txn
2219 .prepare("SELECT chunk_id FROM gap_chunks ORDER BY chunk_id")?
2220 .query_map((), |row| row.get::<_, u64>(0))?
2221 {
2222 gaps.push(data?);
2223 }
2224 Ok(gaps)
2225 })
2226 .await
2227 .unwrap();
2228
2229 assert_eq!(gaps, vec![42, 44]);
2230 }
2231
2232 #[async_test]
2233 async fn test_linked_chunk_push_items() {
2234 let store = get_event_cache_store().await.expect("creating cache store failed");
2235
2236 let room_id = &DEFAULT_TEST_ROOM_ID;
2237 let linked_chunk_id = LinkedChunkId::Room(room_id);
2238
2239 store
2240 .handle_linked_chunk_updates(
2241 linked_chunk_id,
2242 vec![
2243 Update::NewItemsChunk {
2244 previous: None,
2245 new: ChunkIdentifier::new(42),
2246 next: None,
2247 },
2248 Update::PushItems {
2249 at: Position::new(ChunkIdentifier::new(42), 0),
2250 items: vec![
2251 make_test_event(room_id, "hello"),
2252 make_test_event(room_id, "world"),
2253 ],
2254 },
2255 Update::PushItems {
2256 at: Position::new(ChunkIdentifier::new(42), 2),
2257 items: vec![make_test_event(room_id, "who?")],
2258 },
2259 ],
2260 )
2261 .await
2262 .unwrap();
2263
2264 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2265
2266 assert_eq!(chunks.len(), 1);
2267
2268 let c = chunks.remove(0);
2269 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2270 assert_eq!(c.previous, None);
2271 assert_eq!(c.next, None);
2272 assert_matches!(c.content, ChunkContent::Items(events) => {
2273 assert_eq!(events.len(), 3);
2274
2275 check_test_event(&events[0], "hello");
2276 check_test_event(&events[1], "world");
2277 check_test_event(&events[2], "who?");
2278 });
2279 }
2280
2281 #[async_test]
2282 async fn test_linked_chunk_remove_item() {
2283 let store = get_event_cache_store().await.expect("creating cache store failed");
2284
2285 let room_id = *DEFAULT_TEST_ROOM_ID;
2286 let linked_chunk_id = LinkedChunkId::Room(room_id);
2287
2288 store
2289 .handle_linked_chunk_updates(
2290 linked_chunk_id,
2291 vec![
2292 Update::NewItemsChunk {
2293 previous: None,
2294 new: ChunkIdentifier::new(42),
2295 next: None,
2296 },
2297 Update::PushItems {
2298 at: Position::new(ChunkIdentifier::new(42), 0),
2299 items: vec![
2300 make_test_event(room_id, "one"),
2301 make_test_event(room_id, "two"),
2302 make_test_event(room_id, "three"),
2303 make_test_event(room_id, "four"),
2304 make_test_event(room_id, "five"),
2305 make_test_event(room_id, "six"),
2306 ],
2307 },
2308 Update::RemoveItem {
2309 at: Position::new(ChunkIdentifier::new(42), 2), },
2311 ],
2312 )
2313 .await
2314 .unwrap();
2315
2316 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2317
2318 assert_eq!(chunks.len(), 1);
2319
2320 let c = chunks.remove(0);
2321 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2322 assert_eq!(c.previous, None);
2323 assert_eq!(c.next, None);
2324 assert_matches!(c.content, ChunkContent::Items(events) => {
2325 assert_eq!(events.len(), 5);
2326 check_test_event(&events[0], "one");
2327 check_test_event(&events[1], "two");
2328 check_test_event(&events[2], "four");
2329 check_test_event(&events[3], "five");
2330 check_test_event(&events[4], "six");
2331 });
2332
2333 let num_rows: u64 = store
2335 .read()
2336 .await
2337 .unwrap()
2338 .with_transaction(move |txn| {
2339 txn.query_row(
2340 "SELECT COUNT(*) FROM event_chunks WHERE chunk_id = 42 AND linked_chunk_id = ? AND position IN (2, 3, 4)",
2341 (store.encode_key(keys::LINKED_CHUNKS, linked_chunk_id.storage_key()),),
2342 |row| row.get(0),
2343 )
2344 })
2345 .await
2346 .unwrap();
2347 assert_eq!(num_rows, 3);
2348 }
2349
2350 #[async_test]
2351 async fn test_linked_chunk_detach_last_items() {
2352 let store = get_event_cache_store().await.expect("creating cache store failed");
2353
2354 let room_id = *DEFAULT_TEST_ROOM_ID;
2355 let linked_chunk_id = LinkedChunkId::Room(room_id);
2356
2357 store
2358 .handle_linked_chunk_updates(
2359 linked_chunk_id,
2360 vec![
2361 Update::NewItemsChunk {
2362 previous: None,
2363 new: ChunkIdentifier::new(42),
2364 next: None,
2365 },
2366 Update::PushItems {
2367 at: Position::new(ChunkIdentifier::new(42), 0),
2368 items: vec![
2369 make_test_event(room_id, "hello"),
2370 make_test_event(room_id, "world"),
2371 make_test_event(room_id, "howdy"),
2372 ],
2373 },
2374 Update::DetachLastItems { at: Position::new(ChunkIdentifier::new(42), 1) },
2375 ],
2376 )
2377 .await
2378 .unwrap();
2379
2380 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2381
2382 assert_eq!(chunks.len(), 1);
2383
2384 let c = chunks.remove(0);
2385 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2386 assert_eq!(c.previous, None);
2387 assert_eq!(c.next, None);
2388 assert_matches!(c.content, ChunkContent::Items(events) => {
2389 assert_eq!(events.len(), 1);
2390 check_test_event(&events[0], "hello");
2391 });
2392 }
2393
2394 #[async_test]
2395 async fn test_linked_chunk_start_end_reattach_items() {
2396 let store = get_event_cache_store().await.expect("creating cache store failed");
2397
2398 let room_id = *DEFAULT_TEST_ROOM_ID;
2399 let linked_chunk_id = LinkedChunkId::Room(room_id);
2400
2401 store
2405 .handle_linked_chunk_updates(
2406 linked_chunk_id,
2407 vec![
2408 Update::NewItemsChunk {
2409 previous: None,
2410 new: ChunkIdentifier::new(42),
2411 next: None,
2412 },
2413 Update::PushItems {
2414 at: Position::new(ChunkIdentifier::new(42), 0),
2415 items: vec![
2416 make_test_event(room_id, "hello"),
2417 make_test_event(room_id, "world"),
2418 make_test_event(room_id, "howdy"),
2419 ],
2420 },
2421 Update::StartReattachItems,
2422 Update::EndReattachItems,
2423 ],
2424 )
2425 .await
2426 .unwrap();
2427
2428 let mut chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2429
2430 assert_eq!(chunks.len(), 1);
2431
2432 let c = chunks.remove(0);
2433 assert_eq!(c.identifier, ChunkIdentifier::new(42));
2434 assert_eq!(c.previous, None);
2435 assert_eq!(c.next, None);
2436 assert_matches!(c.content, ChunkContent::Items(events) => {
2437 assert_eq!(events.len(), 3);
2438 check_test_event(&events[0], "hello");
2439 check_test_event(&events[1], "world");
2440 check_test_event(&events[2], "howdy");
2441 });
2442 }
2443
2444 #[async_test]
2445 async fn test_linked_chunk_clear() {
2446 let store = get_event_cache_store().await.expect("creating cache store failed");
2447
2448 let room_id = *DEFAULT_TEST_ROOM_ID;
2449 let linked_chunk_id = LinkedChunkId::Room(room_id);
2450 let event_0 = make_test_event(room_id, "hello");
2451 let event_1 = make_test_event(room_id, "world");
2452 let event_2 = make_test_event(room_id, "howdy");
2453
2454 store
2455 .handle_linked_chunk_updates(
2456 linked_chunk_id,
2457 vec![
2458 Update::NewItemsChunk {
2459 previous: None,
2460 new: ChunkIdentifier::new(42),
2461 next: None,
2462 },
2463 Update::NewGapChunk {
2464 previous: Some(ChunkIdentifier::new(42)),
2465 new: ChunkIdentifier::new(54),
2466 next: None,
2467 gap: Gap { prev_token: "fondue".to_owned() },
2468 },
2469 Update::PushItems {
2470 at: Position::new(ChunkIdentifier::new(42), 0),
2471 items: vec![event_0.clone(), event_1, event_2],
2472 },
2473 Update::Clear,
2474 ],
2475 )
2476 .await
2477 .unwrap();
2478
2479 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2480 assert!(chunks.is_empty());
2481
2482 store
2484 .read()
2485 .await
2486 .unwrap()
2487 .with_transaction(|txn| -> rusqlite::Result<_> {
2488 let num_gaps = txn
2489 .prepare("SELECT COUNT(chunk_id) FROM gap_chunks ORDER BY chunk_id")?
2490 .query_row((), |row| row.get::<_, u64>(0))?;
2491 assert_eq!(num_gaps, 0);
2492
2493 let num_events = txn
2494 .prepare("SELECT COUNT(event_id) FROM event_chunks ORDER BY chunk_id")?
2495 .query_row((), |row| row.get::<_, u64>(0))?;
2496 assert_eq!(num_events, 0);
2497
2498 Ok(())
2499 })
2500 .await
2501 .unwrap();
2502
2503 store
2505 .handle_linked_chunk_updates(
2506 linked_chunk_id,
2507 vec![
2508 Update::NewItemsChunk {
2509 previous: None,
2510 new: ChunkIdentifier::new(42),
2511 next: None,
2512 },
2513 Update::PushItems {
2514 at: Position::new(ChunkIdentifier::new(42), 0),
2515 items: vec![event_0],
2516 },
2517 ],
2518 )
2519 .await
2520 .unwrap();
2521 }
2522
2523 #[async_test]
2524 async fn test_linked_chunk_multiple_rooms() {
2525 let store = get_event_cache_store().await.expect("creating cache store failed");
2526
2527 let room1 = room_id!("!realcheeselovers:raclette.fr");
2528 let linked_chunk_id1 = LinkedChunkId::Room(room1);
2529 let room2 = room_id!("!realcheeselovers:fondue.ch");
2530 let linked_chunk_id2 = LinkedChunkId::Room(room2);
2531
2532 store
2536 .handle_linked_chunk_updates(
2537 linked_chunk_id1,
2538 vec![
2539 Update::NewItemsChunk {
2540 previous: None,
2541 new: ChunkIdentifier::new(42),
2542 next: None,
2543 },
2544 Update::PushItems {
2545 at: Position::new(ChunkIdentifier::new(42), 0),
2546 items: vec![
2547 make_test_event(room1, "best cheese is raclette"),
2548 make_test_event(room1, "obviously"),
2549 ],
2550 },
2551 ],
2552 )
2553 .await
2554 .unwrap();
2555
2556 store
2557 .handle_linked_chunk_updates(
2558 linked_chunk_id2,
2559 vec![
2560 Update::NewItemsChunk {
2561 previous: None,
2562 new: ChunkIdentifier::new(42),
2563 next: None,
2564 },
2565 Update::PushItems {
2566 at: Position::new(ChunkIdentifier::new(42), 0),
2567 items: vec![make_test_event(room1, "beaufort is the best")],
2568 },
2569 ],
2570 )
2571 .await
2572 .unwrap();
2573
2574 let mut chunks_room1 = store.load_all_chunks(linked_chunk_id1).await.unwrap();
2576 assert_eq!(chunks_room1.len(), 1);
2577
2578 let c = chunks_room1.remove(0);
2579 assert_matches!(c.content, ChunkContent::Items(events) => {
2580 assert_eq!(events.len(), 2);
2581 check_test_event(&events[0], "best cheese is raclette");
2582 check_test_event(&events[1], "obviously");
2583 });
2584
2585 let mut chunks_room2 = store.load_all_chunks(linked_chunk_id2).await.unwrap();
2587 assert_eq!(chunks_room2.len(), 1);
2588
2589 let c = chunks_room2.remove(0);
2590 assert_matches!(c.content, ChunkContent::Items(events) => {
2591 assert_eq!(events.len(), 1);
2592 check_test_event(&events[0], "beaufort is the best");
2593 });
2594 }
2595
2596 #[async_test]
2597 async fn test_linked_chunk_update_is_a_transaction() {
2598 let store = get_event_cache_store().await.expect("creating cache store failed");
2599
2600 let room_id = *DEFAULT_TEST_ROOM_ID;
2601 let linked_chunk_id = LinkedChunkId::Room(room_id);
2602
2603 let err = store
2606 .handle_linked_chunk_updates(
2607 linked_chunk_id,
2608 vec![
2609 Update::NewItemsChunk {
2610 previous: None,
2611 new: ChunkIdentifier::new(42),
2612 next: None,
2613 },
2614 Update::NewItemsChunk {
2615 previous: None,
2616 new: ChunkIdentifier::new(42),
2617 next: None,
2618 },
2619 ],
2620 )
2621 .await
2622 .unwrap_err();
2623
2624 assert_matches!(err, crate::error::Error::Sqlite(err) => {
2626 assert_matches!(err.sqlite_error_code(), Some(rusqlite::ErrorCode::ConstraintViolation));
2627 });
2628
2629 let chunks = store.load_all_chunks(linked_chunk_id).await.unwrap();
2633 assert!(chunks.is_empty());
2634 }
2635
2636 #[async_test]
2637 async fn test_filter_duplicate_events_no_events() {
2638 let store = get_event_cache_store().await.expect("creating cache store failed");
2639
2640 let room_id = *DEFAULT_TEST_ROOM_ID;
2641 let linked_chunk_id = LinkedChunkId::Room(room_id);
2642 let duplicates = store.filter_duplicated_events(linked_chunk_id, Vec::new()).await.unwrap();
2643 assert!(duplicates.is_empty());
2644 }
2645
2646 #[async_test]
2647 async fn test_load_last_chunk() {
2648 let room_id = room_id!("!r0:matrix.org");
2649 let linked_chunk_id = LinkedChunkId::Room(room_id);
2650 let event = |msg: &str| make_test_event(room_id, msg);
2651 let store = get_event_cache_store().await.expect("creating cache store failed");
2652
2653 {
2655 let (last_chunk, chunk_identifier_generator) =
2656 store.load_last_chunk(linked_chunk_id).await.unwrap();
2657
2658 assert!(last_chunk.is_none());
2659 assert_eq!(chunk_identifier_generator.current(), 0);
2660 }
2661
2662 {
2664 store
2665 .handle_linked_chunk_updates(
2666 linked_chunk_id,
2667 vec![
2668 Update::NewItemsChunk {
2669 previous: None,
2670 new: ChunkIdentifier::new(42),
2671 next: None,
2672 },
2673 Update::PushItems {
2674 at: Position::new(ChunkIdentifier::new(42), 0),
2675 items: vec![event("saucisse de morteau"), event("comté")],
2676 },
2677 ],
2678 )
2679 .await
2680 .unwrap();
2681
2682 let (last_chunk, chunk_identifier_generator) =
2683 store.load_last_chunk(linked_chunk_id).await.unwrap();
2684
2685 assert_matches!(last_chunk, Some(last_chunk) => {
2686 assert_eq!(last_chunk.identifier, 42);
2687 assert!(last_chunk.previous.is_none());
2688 assert!(last_chunk.next.is_none());
2689 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2690 assert_eq!(items.len(), 2);
2691 check_test_event(&items[0], "saucisse de morteau");
2692 check_test_event(&items[1], "comté");
2693 });
2694 });
2695 assert_eq!(chunk_identifier_generator.current(), 42);
2696 }
2697
2698 {
2700 store
2701 .handle_linked_chunk_updates(
2702 linked_chunk_id,
2703 vec![
2704 Update::NewItemsChunk {
2705 previous: Some(ChunkIdentifier::new(42)),
2706 new: ChunkIdentifier::new(7),
2707 next: None,
2708 },
2709 Update::PushItems {
2710 at: Position::new(ChunkIdentifier::new(7), 0),
2711 items: vec![event("fondue"), event("gruyère"), event("mont d'or")],
2712 },
2713 ],
2714 )
2715 .await
2716 .unwrap();
2717
2718 let (last_chunk, chunk_identifier_generator) =
2719 store.load_last_chunk(linked_chunk_id).await.unwrap();
2720
2721 assert_matches!(last_chunk, Some(last_chunk) => {
2722 assert_eq!(last_chunk.identifier, 7);
2723 assert_matches!(last_chunk.previous, Some(previous) => {
2724 assert_eq!(previous, 42);
2725 });
2726 assert!(last_chunk.next.is_none());
2727 assert_matches!(last_chunk.content, ChunkContent::Items(items) => {
2728 assert_eq!(items.len(), 3);
2729 check_test_event(&items[0], "fondue");
2730 check_test_event(&items[1], "gruyère");
2731 check_test_event(&items[2], "mont d'or");
2732 });
2733 });
2734 assert_eq!(chunk_identifier_generator.current(), 42);
2735 }
2736 }
2737
2738 #[async_test]
2739 async fn test_load_last_chunk_with_a_cycle() {
2740 let room_id = room_id!("!r0:matrix.org");
2741 let linked_chunk_id = LinkedChunkId::Room(room_id);
2742 let store = get_event_cache_store().await.expect("creating cache store failed");
2743
2744 store
2745 .handle_linked_chunk_updates(
2746 linked_chunk_id,
2747 vec![
2748 Update::NewItemsChunk {
2749 previous: None,
2750 new: ChunkIdentifier::new(0),
2751 next: None,
2752 },
2753 Update::NewItemsChunk {
2754 previous: Some(ChunkIdentifier::new(0)),
2758 new: ChunkIdentifier::new(1),
2759 next: Some(ChunkIdentifier::new(0)),
2760 },
2761 ],
2762 )
2763 .await
2764 .unwrap();
2765
2766 store.load_last_chunk(linked_chunk_id).await.unwrap_err();
2767 }
2768
2769 #[async_test]
2770 async fn test_load_previous_chunk() {
2771 let room_id = room_id!("!r0:matrix.org");
2772 let linked_chunk_id = LinkedChunkId::Room(room_id);
2773 let event = |msg: &str| make_test_event(room_id, msg);
2774 let store = get_event_cache_store().await.expect("creating cache store failed");
2775
2776 {
2779 let previous_chunk = store
2780 .load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(153))
2781 .await
2782 .unwrap();
2783
2784 assert!(previous_chunk.is_none());
2785 }
2786
2787 {
2790 store
2791 .handle_linked_chunk_updates(
2792 linked_chunk_id,
2793 vec![Update::NewItemsChunk {
2794 previous: None,
2795 new: ChunkIdentifier::new(42),
2796 next: None,
2797 }],
2798 )
2799 .await
2800 .unwrap();
2801
2802 let previous_chunk =
2803 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2804
2805 assert!(previous_chunk.is_none());
2806 }
2807
2808 {
2810 store
2811 .handle_linked_chunk_updates(
2812 linked_chunk_id,
2813 vec![
2814 Update::NewItemsChunk {
2816 previous: None,
2817 new: ChunkIdentifier::new(7),
2818 next: Some(ChunkIdentifier::new(42)),
2819 },
2820 Update::PushItems {
2821 at: Position::new(ChunkIdentifier::new(7), 0),
2822 items: vec![event("brigand du jorat"), event("morbier")],
2823 },
2824 ],
2825 )
2826 .await
2827 .unwrap();
2828
2829 let previous_chunk =
2830 store.load_previous_chunk(linked_chunk_id, ChunkIdentifier::new(42)).await.unwrap();
2831
2832 assert_matches!(previous_chunk, Some(previous_chunk) => {
2833 assert_eq!(previous_chunk.identifier, 7);
2834 assert!(previous_chunk.previous.is_none());
2835 assert_matches!(previous_chunk.next, Some(next) => {
2836 assert_eq!(next, 42);
2837 });
2838 assert_matches!(previous_chunk.content, ChunkContent::Items(items) => {
2839 assert_eq!(items.len(), 2);
2840 check_test_event(&items[0], "brigand du jorat");
2841 check_test_event(&items[1], "morbier");
2842 });
2843 });
2844 }
2845 }
2846}
2847
2848#[cfg(test)]
2849mod encrypted_tests {
2850 use std::sync::atomic::{AtomicU32, Ordering::SeqCst};
2851
2852 use matrix_sdk_base::{
2853 event_cache::store::{EventCacheStore, EventCacheStoreError},
2854 event_cache_store_integration_tests, event_cache_store_integration_tests_time,
2855 event_cache_store_media_integration_tests,
2856 };
2857 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2858 use once_cell::sync::Lazy;
2859 use ruma::{
2860 event_id,
2861 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2862 room_id, user_id,
2863 };
2864 use tempfile::{tempdir, TempDir};
2865
2866 use super::SqliteEventCacheStore;
2867
2868 static TMP_DIR: Lazy<TempDir> = Lazy::new(|| tempdir().unwrap());
2869 static NUM: AtomicU32 = AtomicU32::new(0);
2870
2871 async fn get_event_cache_store() -> Result<SqliteEventCacheStore, EventCacheStoreError> {
2872 let name = NUM.fetch_add(1, SeqCst).to_string();
2873 let tmpdir_path = TMP_DIR.path().join(name);
2874
2875 tracing::info!("using event cache store @ {}", tmpdir_path.to_str().unwrap());
2876
2877 Ok(SqliteEventCacheStore::open(
2878 tmpdir_path.to_str().unwrap(),
2879 Some("default_test_password"),
2880 )
2881 .await
2882 .unwrap())
2883 }
2884
2885 event_cache_store_integration_tests!();
2886 event_cache_store_integration_tests_time!();
2887 event_cache_store_media_integration_tests!();
2888
2889 #[async_test]
2890 async fn test_no_sqlite_injection_in_find_event_relations() {
2891 let room_id = room_id!("!test:localhost");
2892 let another_room_id = room_id!("!r1:matrix.org");
2893 let sender = user_id!("@alice:localhost");
2894
2895 let store = get_event_cache_store()
2896 .await
2897 .expect("We should be able to create a new, empty, event cache store");
2898
2899 let f = EventFactory::new().room(room_id).sender(sender);
2900
2901 let event_id = event_id!("$DO_NOT_FIND_ME:matrix.org");
2903 let event = f.text_msg("DO NOT FIND").event_id(event_id).into_event();
2904
2905 let edit_id = event_id!("$find_me:matrix.org");
2907 let edit = f
2908 .text_msg("Find me")
2909 .event_id(edit_id)
2910 .edit(event_id, RoomMessageEventContentWithoutRelation::text_plain("jebote"))
2911 .into_event();
2912
2913 let f = f.room(another_room_id);
2915
2916 let another_event_id = event_id!("$DO_NOT_FIND_ME_EITHER:matrix.org");
2917 let another_event =
2918 f.text_msg("DO NOT FIND ME EITHER").event_id(another_event_id).into_event();
2919
2920 store.save_event(room_id, event).await.unwrap();
2922 store.save_event(room_id, edit).await.unwrap();
2923 store.save_event(another_room_id, another_event).await.unwrap();
2924
2925 let filter = Some(vec![RelationType::Replacement, "x\") OR 1=1; --".into()]);
2929
2930 let results = store
2932 .find_event_relations(room_id, event_id, filter.as_deref())
2933 .await
2934 .expect("We should be able to attempt to find event relations");
2935
2936 similar_asserts::assert_eq!(
2938 results.len(),
2939 1,
2940 "We should only have loaded events for the first room {results:#?}"
2941 );
2942
2943 let (found_event, _) = &results[0];
2945 assert_eq!(
2946 found_event.event_id().as_deref(),
2947 Some(edit_id),
2948 "The single event we found should be the edit event"
2949 );
2950 }
2951}