1#![cfg_attr(not(test), allow(unused))]
16
17use std::{rc::Rc, time::Duration};
18
19use indexed_db_futures::{Build, database::Database};
20#[cfg(target_family = "wasm")]
21use matrix_sdk_base::cross_process_lock::{
22 CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION,
23};
24use matrix_sdk_base::{
25 event_cache::{Event, Gap, store::EventCacheStore},
26 linked_chunk::{
27 ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28 RawChunk, Update,
29 },
30 timer,
31};
32use ruma::{
33 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
34};
35use tracing::{error, instrument, trace};
36use web_sys::IdbTransactionMode;
37
38use crate::{
39 event_cache_store::{
40 migrations::current::keys,
41 transaction::IndexeddbEventCacheStoreTransaction,
42 types::{ChunkType, InBandEvent, Lease, OutOfBandEvent},
43 },
44 serializer::indexed_type::{IndexedTypeSerializer, traits::Indexed},
45 transaction::TransactionError,
46};
47
48mod builder;
49mod error;
50#[cfg(all(test, target_family = "wasm"))]
51mod integration_tests;
52mod migrations;
53mod serializer;
54mod transaction;
55mod types;
56
57pub use builder::IndexeddbEventCacheStoreBuilder;
58pub use error::IndexeddbEventCacheStoreError;
59
60#[derive(Debug, Clone)]
66pub struct IndexeddbEventCacheStore {
67 inner: Rc<Database>,
69 serializer: IndexedTypeSerializer,
71}
72
73impl IndexeddbEventCacheStore {
74 pub fn builder() -> IndexeddbEventCacheStoreBuilder {
77 IndexeddbEventCacheStoreBuilder::default()
78 }
79
80 pub fn transaction<'a>(
84 &'a self,
85 stores: &[&str],
86 mode: IdbTransactionMode,
87 ) -> Result<IndexeddbEventCacheStoreTransaction<'a>, IndexeddbEventCacheStoreError> {
88 Ok(IndexeddbEventCacheStoreTransaction::new(
89 self.inner
90 .transaction(stores)
91 .with_mode(mode)
92 .build()
93 .map_err(TransactionError::from)?,
94 &self.serializer,
95 ))
96 }
97}
98
99#[cfg(target_family = "wasm")]
100#[async_trait::async_trait(?Send)]
101impl EventCacheStore for IndexeddbEventCacheStore {
102 type Error = IndexeddbEventCacheStoreError;
103
104 #[instrument(skip(self))]
105 async fn try_take_leased_lock(
106 &self,
107 lease_duration_ms: u32,
108 key: &str,
109 holder: &str,
110 ) -> Result<Option<CrossProcessLockGeneration>, IndexeddbEventCacheStoreError> {
111 let transaction =
112 self.transaction(&[Lease::OBJECT_STORE], IdbTransactionMode::Readwrite)?;
113
114 let now = Duration::from_millis(MilliSecondsSinceUnixEpoch::now().get().into());
115 let expiration = now + Duration::from_millis(lease_duration_ms.into());
116
117 let lease = match transaction.get_lease_by_id(key).await? {
118 Some(mut lease) => {
119 if lease.holder == holder {
120 lease.expiration = expiration;
122
123 Some(lease)
124 } else {
125 if lease.expiration < now {
127 lease.holder = holder.to_owned();
129 lease.expiration = expiration;
130 lease.generation += 1;
131
132 Some(lease)
133 } else {
134 None
136 }
137 }
138 }
139 None => {
140 let lease = Lease {
141 key: key.to_owned(),
142 holder: holder.to_owned(),
143 expiration,
144 generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
145 };
146
147 Some(lease)
148 }
149 };
150
151 Ok(if let Some(lease) = lease {
152 transaction.put_lease(&lease).await?;
153 transaction.commit().await?;
154
155 Some(lease.generation)
156 } else {
157 None
158 })
159 }
160
161 #[instrument(skip(self, updates))]
162 async fn handle_linked_chunk_updates(
163 &self,
164 linked_chunk_id: LinkedChunkId<'_>,
165 updates: Vec<Update<Event, Gap>>,
166 ) -> Result<(), IndexeddbEventCacheStoreError> {
167 let _timer = timer!("method");
168
169 let transaction = self.transaction(
170 &[keys::LINKED_CHUNKS, keys::GAPS, keys::EVENTS],
171 IdbTransactionMode::Readwrite,
172 )?;
173
174 for update in updates {
175 match update {
176 Update::NewItemsChunk { previous, new, next } => {
177 trace!(%linked_chunk_id, "Inserting new chunk (prev={previous:?}, new={new:?}, next={next:?})");
178 transaction
179 .add_chunk(&types::Chunk {
180 linked_chunk_id: linked_chunk_id.to_owned(),
181 identifier: new.index(),
182 previous: previous.map(|i| i.index()),
183 next: next.map(|i| i.index()),
184 chunk_type: ChunkType::Event,
185 })
186 .await?;
187 }
188 Update::NewGapChunk { previous, new, next, gap } => {
189 trace!(%linked_chunk_id, "Inserting new gap (prev={previous:?}, new={new:?}, next={next:?})");
190 transaction
191 .add_item(&types::Gap {
192 linked_chunk_id: linked_chunk_id.to_owned(),
193 chunk_identifier: new.index(),
194 prev_token: gap.prev_token,
195 })
196 .await?;
197 transaction
198 .add_chunk(&types::Chunk {
199 linked_chunk_id: linked_chunk_id.to_owned(),
200 identifier: new.index(),
201 previous: previous.map(|i| i.index()),
202 next: next.map(|i| i.index()),
203 chunk_type: ChunkType::Gap,
204 })
205 .await?;
206 }
207 Update::RemoveChunk(chunk_id) => {
208 trace!(%linked_chunk_id, "Removing chunk {chunk_id:?}");
209 transaction.delete_chunk_by_id(linked_chunk_id, chunk_id).await?;
210 }
211 Update::PushItems { at, items } => {
212 let chunk_identifier = at.chunk_identifier().index();
213
214 trace!(%linked_chunk_id, "pushing {} items @ {chunk_identifier}", items.len());
215
216 for (i, item) in items.into_iter().enumerate() {
217 transaction
218 .put_event(&types::Event::InBand(InBandEvent {
219 linked_chunk_id: linked_chunk_id.to_owned(),
220 content: item,
221 position: types::Position {
222 chunk_identifier,
223 index: at.index() + i,
224 },
225 }))
226 .await?;
227 }
228 }
229 Update::ReplaceItem { at, item } => {
230 let chunk_id = at.chunk_identifier().index();
231 let index = at.index();
232
233 trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
234
235 transaction
236 .put_event(&types::Event::InBand(InBandEvent {
237 linked_chunk_id: linked_chunk_id.to_owned(),
238 content: item,
239 position: at.into(),
240 }))
241 .await?;
242 }
243 Update::RemoveItem { at } => {
244 let chunk_id = at.chunk_identifier().index();
245 let index = at.index();
246
247 trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
248
249 transaction.delete_event_by_position(linked_chunk_id, at.into()).await?;
250 }
251 Update::DetachLastItems { at } => {
252 let chunk_id = at.chunk_identifier().index();
253 let index = at.index();
254
255 trace!(%linked_chunk_id, "detaching last items @ {chunk_id}:{index}");
256
257 transaction
258 .delete_events_by_chunk_from_index(linked_chunk_id, at.into())
259 .await?;
260 }
261 Update::StartReattachItems | Update::EndReattachItems => {
262 }
264 Update::Clear => {
265 trace!(%linked_chunk_id, "clearing room");
266 transaction.delete_chunks_by_linked_chunk_id(linked_chunk_id).await?;
267 transaction.delete_events_by_linked_chunk_id(linked_chunk_id).await?;
268 transaction.delete_gaps_by_linked_chunk_id(linked_chunk_id).await?;
269 }
270 }
271 }
272 transaction.commit().await?;
273 Ok(())
274 }
275
276 #[instrument(skip(self))]
277 async fn load_all_chunks(
278 &self,
279 linked_chunk_id: LinkedChunkId<'_>,
280 ) -> Result<Vec<RawChunk<Event, Gap>>, IndexeddbEventCacheStoreError> {
281 let _ = timer!("method");
282
283 let transaction = self.transaction(
284 &[keys::LINKED_CHUNKS, keys::GAPS, keys::EVENTS],
285 IdbTransactionMode::Readwrite,
286 )?;
287
288 let mut raw_chunks = Vec::new();
289 let chunks = transaction.get_chunks_by_linked_chunk_id(linked_chunk_id).await?;
290 for chunk in chunks {
291 if let Some(raw_chunk) = transaction
292 .load_chunk_by_id(linked_chunk_id, ChunkIdentifier::new(chunk.identifier))
293 .await?
294 {
295 raw_chunks.push(raw_chunk);
296 }
297 }
298 Ok(raw_chunks)
299 }
300
301 #[instrument(skip(self))]
302 async fn load_all_chunks_metadata(
303 &self,
304 linked_chunk_id: LinkedChunkId<'_>,
305 ) -> Result<Vec<ChunkMetadata>, IndexeddbEventCacheStoreError> {
306 let _ = timer!("method");
317
318 let transaction = self.transaction(
319 &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
320 IdbTransactionMode::Readwrite,
321 )?;
322
323 let mut raw_chunks = Vec::new();
324 let chunks = transaction.get_chunks_by_linked_chunk_id(linked_chunk_id).await?;
325 for chunk in chunks {
326 let chunk_id = ChunkIdentifier::new(chunk.identifier);
327 let num_items =
328 transaction.get_events_count_by_chunk(linked_chunk_id, chunk_id).await?;
329 raw_chunks.push(ChunkMetadata {
330 num_items,
331 previous: chunk.previous.map(ChunkIdentifier::new),
332 identifier: ChunkIdentifier::new(chunk.identifier),
333 next: chunk.next.map(ChunkIdentifier::new),
334 });
335 }
336 Ok(raw_chunks)
337 }
338
339 #[instrument(skip(self))]
340 async fn load_last_chunk(
341 &self,
342 linked_chunk_id: LinkedChunkId<'_>,
343 ) -> Result<
344 (Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator),
345 IndexeddbEventCacheStoreError,
346 > {
347 let _timer = timer!("method");
348
349 let transaction = self.transaction(
350 &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
351 IdbTransactionMode::Readonly,
352 )?;
353
354 if transaction.get_chunks_count_by_linked_chunk_id(linked_chunk_id).await? == 0 {
355 return Ok((None, ChunkIdentifierGenerator::new_from_scratch()));
356 }
357 match transaction.get_chunk_by_next_chunk_id(linked_chunk_id, None).await {
361 Err(TransactionError::ItemIsNotUnique) => {
362 Err(IndexeddbEventCacheStoreError::ChunksContainDisjointLists)
366 }
367 Err(e) => {
368 Err(e.into())
371 }
372 Ok(None) => {
373 Err(IndexeddbEventCacheStoreError::ChunksContainCycle)
376 }
377 Ok(Some(last_chunk)) => {
378 let last_chunk_identifier = ChunkIdentifier::new(last_chunk.identifier);
379 let last_raw_chunk = transaction
380 .load_chunk_by_id(linked_chunk_id, last_chunk_identifier)
381 .await?
382 .ok_or(IndexeddbEventCacheStoreError::UnableToLoadChunk)?;
383 let max_chunk_id = transaction
384 .get_max_chunk_by_id(linked_chunk_id)
385 .await?
386 .map(|chunk| ChunkIdentifier::new(chunk.identifier))
387 .ok_or(IndexeddbEventCacheStoreError::NoMaxChunkId)?;
388 let generator =
389 ChunkIdentifierGenerator::new_from_previous_chunk_identifier(max_chunk_id);
390 Ok((Some(last_raw_chunk), generator))
391 }
392 }
393 }
394
395 #[instrument(skip(self))]
396 async fn load_previous_chunk(
397 &self,
398 linked_chunk_id: LinkedChunkId<'_>,
399 before_chunk_identifier: ChunkIdentifier,
400 ) -> Result<Option<RawChunk<Event, Gap>>, IndexeddbEventCacheStoreError> {
401 let _timer = timer!("method");
402
403 let transaction = self.transaction(
404 &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
405 IdbTransactionMode::Readonly,
406 )?;
407 if let Some(chunk) =
408 transaction.get_chunk_by_id(linked_chunk_id, before_chunk_identifier).await?
409 && let Some(previous_identifier) = chunk.previous
410 {
411 let previous_identifier = ChunkIdentifier::new(previous_identifier);
412 Ok(transaction.load_chunk_by_id(linked_chunk_id, previous_identifier).await?)
413 } else {
414 Ok(None)
415 }
416 }
417
418 #[instrument(skip(self))]
419 async fn clear_all_linked_chunks(&self) -> Result<(), IndexeddbEventCacheStoreError> {
420 let _timer = timer!("method");
421
422 let transaction = self.transaction(
423 &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
424 IdbTransactionMode::Readwrite,
425 )?;
426 transaction.clear::<types::Chunk>().await?;
427 transaction.clear::<types::Event>().await?;
428 transaction.clear::<types::Gap>().await?;
429 transaction.commit().await?;
430 Ok(())
431 }
432
433 #[instrument(skip(self, events))]
434 async fn filter_duplicated_events(
435 &self,
436 linked_chunk_id: LinkedChunkId<'_>,
437 events: Vec<OwnedEventId>,
438 ) -> Result<Vec<(OwnedEventId, Position)>, IndexeddbEventCacheStoreError> {
439 let _timer = timer!("method");
440
441 if events.is_empty() {
442 return Ok(Vec::new());
443 }
444
445 let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
446 let mut duplicated = Vec::new();
447 for event_id in events {
448 if let Some(types::Event::InBand(event)) =
449 transaction.get_event_by_id(linked_chunk_id, &event_id).await?
450 {
451 duplicated.push((event_id, event.position.into()));
452 }
453 }
454 Ok(duplicated)
455 }
456
457 #[instrument(skip(self, event_id))]
458 async fn find_event(
459 &self,
460 room_id: &RoomId,
461 event_id: &EventId,
462 ) -> Result<Option<Event>, IndexeddbEventCacheStoreError> {
463 let _timer = timer!("method");
464
465 let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
466 transaction
467 .get_event_by_room(room_id, event_id)
468 .await
469 .map(|ok| ok.map(Into::into))
470 .map_err(Into::into)
471 }
472
473 #[instrument(skip(self, event_id, filters))]
474 async fn find_event_relations(
475 &self,
476 room_id: &RoomId,
477 event_id: &EventId,
478 filters: Option<&[RelationType]>,
479 ) -> Result<Vec<(Event, Option<Position>)>, IndexeddbEventCacheStoreError> {
480 let _timer = timer!("method");
481
482 let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
483
484 let mut related_events = Vec::new();
485 match filters {
486 Some(relation_types) if !relation_types.is_empty() => {
487 for relation_type in relation_types {
488 let relation = (event_id, relation_type);
489 let events = transaction.get_events_by_relation(room_id, relation).await?;
490 for event in events {
491 let position = event.position().map(Into::into);
492 related_events.push((event.into(), position));
493 }
494 }
495 }
496 _ => {
497 for event in transaction.get_events_by_related_event(room_id, event_id).await? {
498 let position = event.position().map(Into::into);
499 related_events.push((event.into(), position));
500 }
501 }
502 }
503 Ok(related_events)
504 }
505
506 #[instrument(skip(self))]
507 async fn get_room_events(
508 &self,
509 room_id: &RoomId,
510 event_type: Option<&str>,
511 session_id: Option<&str>,
512 ) -> Result<Vec<Event>, IndexeddbEventCacheStoreError> {
513 let _timer = timer!("method");
514
515 let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
519 transaction
520 .get_room_events(room_id)
521 .await
522 .map(|vec| {
523 vec.into_iter()
524 .map(Event::from)
525 .filter(|e| {
526 event_type.is_none_or(|event_type| {
527 Some(event_type) == e.kind.event_type().as_deref()
528 })
529 })
530 .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
531 .collect()
532 })
533 .map_err(Into::into)
534 }
535
536 #[instrument(skip(self, event))]
537 async fn save_event(
538 &self,
539 room_id: &RoomId,
540 event: Event,
541 ) -> Result<(), IndexeddbEventCacheStoreError> {
542 let _timer = timer!("method");
543
544 let Some(event_id) = event.event_id() else {
545 error!(%room_id, "Trying to save an event with no ID");
546 return Ok(());
547 };
548 let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readwrite)?;
549 let event = match transaction.get_event_by_room(room_id, &event_id).await? {
550 Some(inner) => inner.with_content(event),
551 None => types::Event::OutOfBand(OutOfBandEvent {
552 linked_chunk_id: LinkedChunkId::Room(room_id).to_owned(),
553 content: event,
554 position: (),
555 }),
556 };
557 transaction.put_event(&event).await?;
558 transaction.commit().await?;
559 Ok(())
560 }
561
562 async fn optimize(&self) -> Result<(), Self::Error> {
563 Ok(())
564 }
565
566 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
567 Ok(None)
568 }
569}
570
571#[cfg(all(test, target_family = "wasm"))]
572mod tests {
573 use matrix_sdk_base::{
574 event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
575 event_cache_store_integration_tests_time,
576 };
577 use uuid::Uuid;
578
579 use crate::{
580 event_cache_store::IndexeddbEventCacheStore, indexeddb_event_cache_store_integration_tests,
581 };
582
583 mod unencrypted {
584 use super::*;
585
586 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
587
588 async fn get_event_cache_store() -> Result<IndexeddbEventCacheStore, EventCacheStoreError> {
589 let name = format!("test-event-cache-store-{}", Uuid::new_v4().as_hyphenated());
590 Ok(IndexeddbEventCacheStore::builder().database_name(name).build().await?)
591 }
592
593 event_cache_store_integration_tests!();
594 event_cache_store_integration_tests_time!();
595
596 indexeddb_event_cache_store_integration_tests!();
597 }
598
599 mod encrypted {
600 use std::sync::Arc;
601
602 use matrix_sdk_store_encryption::StoreCipher;
603
604 use super::*;
605
606 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
607
608 async fn get_event_cache_store() -> Result<IndexeddbEventCacheStore, EventCacheStoreError> {
609 let name = format!("test-event-cache-store-{}", Uuid::new_v4().as_hyphenated());
610 Ok(IndexeddbEventCacheStore::builder()
611 .database_name(name)
612 .store_cipher(Arc::new(StoreCipher::new().expect("store cipher")))
613 .build()
614 .await?)
615 }
616
617 event_cache_store_integration_tests!();
618 event_cache_store_integration_tests_time!();
619
620 indexeddb_event_cache_store_integration_tests!();
621 }
622}