matrix_sdk_indexeddb/event_cache_store/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License
14
15#![cfg_attr(not(test), allow(unused))]
16
17use std::{rc::Rc, time::Duration};
18
19use indexed_db_futures::{Build, database::Database};
20#[cfg(target_family = "wasm")]
21use matrix_sdk_base::cross_process_lock::{
22    CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION,
23};
24use matrix_sdk_base::{
25    event_cache::{Event, Gap, store::EventCacheStore},
26    linked_chunk::{
27        ChunkIdentifier, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position,
28        RawChunk, Update,
29    },
30    timer,
31};
32use ruma::{
33    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, events::relation::RelationType,
34};
35use tracing::{error, instrument, trace};
36use web_sys::IdbTransactionMode;
37
38use crate::{
39    event_cache_store::{
40        migrations::current::keys,
41        transaction::IndexeddbEventCacheStoreTransaction,
42        types::{ChunkType, InBandEvent, Lease, OutOfBandEvent},
43    },
44    serializer::indexed_type::{IndexedTypeSerializer, traits::Indexed},
45    transaction::TransactionError,
46};
47
48mod builder;
49mod error;
50#[cfg(all(test, target_family = "wasm"))]
51mod integration_tests;
52mod migrations;
53mod serializer;
54mod transaction;
55mod types;
56
57pub use builder::IndexeddbEventCacheStoreBuilder;
58pub use error::IndexeddbEventCacheStoreError;
59
60/// A type for providing an IndexedDB implementation of [`EventCacheStore`][1].
61/// This is meant to be used as a backend to [`EventCacheStore`][1] in browser
62/// contexts.
63///
64/// [1]: matrix_sdk_base::event_cache::store::EventCacheStore
65#[derive(Debug, Clone)]
66pub struct IndexeddbEventCacheStore {
67    // A handle to the IndexedDB database
68    inner: Rc<Database>,
69    // A serializer with functionality tailored to `IndexeddbEventCacheStore`
70    serializer: IndexedTypeSerializer,
71}
72
73impl IndexeddbEventCacheStore {
74    /// Provides a type with which to conveniently build an
75    /// [`IndexeddbEventCacheStore`]
76    pub fn builder() -> IndexeddbEventCacheStoreBuilder {
77        IndexeddbEventCacheStoreBuilder::default()
78    }
79
80    /// Initializes a new transaction on the underlying IndexedDB database and
81    /// returns a handle which can be used to combine database operations
82    /// into an atomic unit.
83    pub fn transaction<'a>(
84        &'a self,
85        stores: &[&str],
86        mode: IdbTransactionMode,
87    ) -> Result<IndexeddbEventCacheStoreTransaction<'a>, IndexeddbEventCacheStoreError> {
88        Ok(IndexeddbEventCacheStoreTransaction::new(
89            self.inner
90                .transaction(stores)
91                .with_mode(mode)
92                .build()
93                .map_err(TransactionError::from)?,
94            &self.serializer,
95        ))
96    }
97}
98
99#[cfg(target_family = "wasm")]
100#[async_trait::async_trait(?Send)]
101impl EventCacheStore for IndexeddbEventCacheStore {
102    type Error = IndexeddbEventCacheStoreError;
103
104    #[instrument(skip(self))]
105    async fn try_take_leased_lock(
106        &self,
107        lease_duration_ms: u32,
108        key: &str,
109        holder: &str,
110    ) -> Result<Option<CrossProcessLockGeneration>, IndexeddbEventCacheStoreError> {
111        let transaction =
112            self.transaction(&[Lease::OBJECT_STORE], IdbTransactionMode::Readwrite)?;
113
114        let now = Duration::from_millis(MilliSecondsSinceUnixEpoch::now().get().into());
115        let expiration = now + Duration::from_millis(lease_duration_ms.into());
116
117        let lease = match transaction.get_lease_by_id(key).await? {
118            Some(mut lease) => {
119                if lease.holder == holder {
120                    // We had the lease before, extend it.
121                    lease.expiration = expiration;
122
123                    Some(lease)
124                } else {
125                    // We didn't have it.
126                    if lease.expiration < now {
127                        // Steal it!
128                        lease.holder = holder.to_owned();
129                        lease.expiration = expiration;
130                        lease.generation += 1;
131
132                        Some(lease)
133                    } else {
134                        // We tried our best.
135                        None
136                    }
137                }
138            }
139            None => {
140                let lease = Lease {
141                    key: key.to_owned(),
142                    holder: holder.to_owned(),
143                    expiration,
144                    generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
145                };
146
147                Some(lease)
148            }
149        };
150
151        Ok(if let Some(lease) = lease {
152            transaction.put_lease(&lease).await?;
153            transaction.commit().await?;
154
155            Some(lease.generation)
156        } else {
157            None
158        })
159    }
160
161    #[instrument(skip(self, updates))]
162    async fn handle_linked_chunk_updates(
163        &self,
164        linked_chunk_id: LinkedChunkId<'_>,
165        updates: Vec<Update<Event, Gap>>,
166    ) -> Result<(), IndexeddbEventCacheStoreError> {
167        let _timer = timer!("method");
168
169        let transaction = self.transaction(
170            &[keys::LINKED_CHUNKS, keys::GAPS, keys::EVENTS],
171            IdbTransactionMode::Readwrite,
172        )?;
173
174        for update in updates {
175            match update {
176                Update::NewItemsChunk { previous, new, next } => {
177                    trace!(%linked_chunk_id, "Inserting new chunk (prev={previous:?}, new={new:?}, next={next:?})");
178                    transaction
179                        .add_chunk(&types::Chunk {
180                            linked_chunk_id: linked_chunk_id.to_owned(),
181                            identifier: new.index(),
182                            previous: previous.map(|i| i.index()),
183                            next: next.map(|i| i.index()),
184                            chunk_type: ChunkType::Event,
185                        })
186                        .await?;
187                }
188                Update::NewGapChunk { previous, new, next, gap } => {
189                    trace!(%linked_chunk_id, "Inserting new gap (prev={previous:?}, new={new:?}, next={next:?})");
190                    transaction
191                        .add_item(&types::Gap {
192                            linked_chunk_id: linked_chunk_id.to_owned(),
193                            chunk_identifier: new.index(),
194                            prev_token: gap.prev_token,
195                        })
196                        .await?;
197                    transaction
198                        .add_chunk(&types::Chunk {
199                            linked_chunk_id: linked_chunk_id.to_owned(),
200                            identifier: new.index(),
201                            previous: previous.map(|i| i.index()),
202                            next: next.map(|i| i.index()),
203                            chunk_type: ChunkType::Gap,
204                        })
205                        .await?;
206                }
207                Update::RemoveChunk(chunk_id) => {
208                    trace!(%linked_chunk_id, "Removing chunk {chunk_id:?}");
209                    transaction.delete_chunk_by_id(linked_chunk_id, chunk_id).await?;
210                }
211                Update::PushItems { at, items } => {
212                    let chunk_identifier = at.chunk_identifier().index();
213
214                    trace!(%linked_chunk_id, "pushing {} items @ {chunk_identifier}", items.len());
215
216                    for (i, item) in items.into_iter().enumerate() {
217                        transaction
218                            .put_event(&types::Event::InBand(InBandEvent {
219                                linked_chunk_id: linked_chunk_id.to_owned(),
220                                content: item,
221                                position: types::Position {
222                                    chunk_identifier,
223                                    index: at.index() + i,
224                                },
225                            }))
226                            .await?;
227                    }
228                }
229                Update::ReplaceItem { at, item } => {
230                    let chunk_id = at.chunk_identifier().index();
231                    let index = at.index();
232
233                    trace!(%linked_chunk_id, "replacing item @ {chunk_id}:{index}");
234
235                    transaction
236                        .put_event(&types::Event::InBand(InBandEvent {
237                            linked_chunk_id: linked_chunk_id.to_owned(),
238                            content: item,
239                            position: at.into(),
240                        }))
241                        .await?;
242                }
243                Update::RemoveItem { at } => {
244                    let chunk_id = at.chunk_identifier().index();
245                    let index = at.index();
246
247                    trace!(%linked_chunk_id, "removing item @ {chunk_id}:{index}");
248
249                    transaction.delete_event_by_position(linked_chunk_id, at.into()).await?;
250                }
251                Update::DetachLastItems { at } => {
252                    let chunk_id = at.chunk_identifier().index();
253                    let index = at.index();
254
255                    trace!(%linked_chunk_id, "detaching last items @ {chunk_id}:{index}");
256
257                    transaction
258                        .delete_events_by_chunk_from_index(linked_chunk_id, at.into())
259                        .await?;
260                }
261                Update::StartReattachItems | Update::EndReattachItems => {
262                    // Nothing? See sqlite implementation
263                }
264                Update::Clear => {
265                    trace!(%linked_chunk_id, "clearing room");
266                    transaction.delete_chunks_by_linked_chunk_id(linked_chunk_id).await?;
267                    transaction.delete_events_by_linked_chunk_id(linked_chunk_id).await?;
268                    transaction.delete_gaps_by_linked_chunk_id(linked_chunk_id).await?;
269                }
270            }
271        }
272        transaction.commit().await?;
273        Ok(())
274    }
275
276    #[instrument(skip(self))]
277    async fn load_all_chunks(
278        &self,
279        linked_chunk_id: LinkedChunkId<'_>,
280    ) -> Result<Vec<RawChunk<Event, Gap>>, IndexeddbEventCacheStoreError> {
281        let _ = timer!("method");
282
283        let transaction = self.transaction(
284            &[keys::LINKED_CHUNKS, keys::GAPS, keys::EVENTS],
285            IdbTransactionMode::Readwrite,
286        )?;
287
288        let mut raw_chunks = Vec::new();
289        let chunks = transaction.get_chunks_by_linked_chunk_id(linked_chunk_id).await?;
290        for chunk in chunks {
291            if let Some(raw_chunk) = transaction
292                .load_chunk_by_id(linked_chunk_id, ChunkIdentifier::new(chunk.identifier))
293                .await?
294            {
295                raw_chunks.push(raw_chunk);
296            }
297        }
298        Ok(raw_chunks)
299    }
300
301    #[instrument(skip(self))]
302    async fn load_all_chunks_metadata(
303        &self,
304        linked_chunk_id: LinkedChunkId<'_>,
305    ) -> Result<Vec<ChunkMetadata>, IndexeddbEventCacheStoreError> {
306        // TODO: This call could possibly take a very long time and the
307        // amount of time increases linearly with the number of chunks
308        // it needs to load from the database. This will likely require
309        // some refactoring to deal with performance issues.
310        //
311        // For details on the performance penalties associated with this
312        // call, see https://github.com/matrix-org/matrix-rust-sdk/pull/5407.
313        //
314        // For how this was improved in the SQLite implementation, see
315        // https://github.com/matrix-org/matrix-rust-sdk/pull/5382.
316        let _ = timer!("method");
317
318        let transaction = self.transaction(
319            &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
320            IdbTransactionMode::Readwrite,
321        )?;
322
323        let mut raw_chunks = Vec::new();
324        let chunks = transaction.get_chunks_by_linked_chunk_id(linked_chunk_id).await?;
325        for chunk in chunks {
326            let chunk_id = ChunkIdentifier::new(chunk.identifier);
327            let num_items =
328                transaction.get_events_count_by_chunk(linked_chunk_id, chunk_id).await?;
329            raw_chunks.push(ChunkMetadata {
330                num_items,
331                previous: chunk.previous.map(ChunkIdentifier::new),
332                identifier: ChunkIdentifier::new(chunk.identifier),
333                next: chunk.next.map(ChunkIdentifier::new),
334            });
335        }
336        Ok(raw_chunks)
337    }
338
339    #[instrument(skip(self))]
340    async fn load_last_chunk(
341        &self,
342        linked_chunk_id: LinkedChunkId<'_>,
343    ) -> Result<
344        (Option<RawChunk<Event, Gap>>, ChunkIdentifierGenerator),
345        IndexeddbEventCacheStoreError,
346    > {
347        let _timer = timer!("method");
348
349        let transaction = self.transaction(
350            &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
351            IdbTransactionMode::Readonly,
352        )?;
353
354        if transaction.get_chunks_count_by_linked_chunk_id(linked_chunk_id).await? == 0 {
355            return Ok((None, ChunkIdentifierGenerator::new_from_scratch()));
356        }
357        // Now that we know we have some chunks in the room, we query IndexedDB
358        // for the last chunk in the room by getting the chunk which does not
359        // have a next chunk.
360        match transaction.get_chunk_by_next_chunk_id(linked_chunk_id, None).await {
361            Err(TransactionError::ItemIsNotUnique) => {
362                // If there are multiple chunks that do not have a next chunk, that
363                // means we have more than one last chunk, which means that we have
364                // more than one list in the room.
365                Err(IndexeddbEventCacheStoreError::ChunksContainDisjointLists)
366            }
367            Err(e) => {
368                // There was some error querying IndexedDB, but it is not necessarily
369                // a violation of our data constraints.
370                Err(e.into())
371            }
372            Ok(None) => {
373                // If there is no chunk without a next chunk, that means every chunk
374                // points to another chunk, which means that we have a cycle in our list.
375                Err(IndexeddbEventCacheStoreError::ChunksContainCycle)
376            }
377            Ok(Some(last_chunk)) => {
378                let last_chunk_identifier = ChunkIdentifier::new(last_chunk.identifier);
379                let last_raw_chunk = transaction
380                    .load_chunk_by_id(linked_chunk_id, last_chunk_identifier)
381                    .await?
382                    .ok_or(IndexeddbEventCacheStoreError::UnableToLoadChunk)?;
383                let max_chunk_id = transaction
384                    .get_max_chunk_by_id(linked_chunk_id)
385                    .await?
386                    .map(|chunk| ChunkIdentifier::new(chunk.identifier))
387                    .ok_or(IndexeddbEventCacheStoreError::NoMaxChunkId)?;
388                let generator =
389                    ChunkIdentifierGenerator::new_from_previous_chunk_identifier(max_chunk_id);
390                Ok((Some(last_raw_chunk), generator))
391            }
392        }
393    }
394
395    #[instrument(skip(self))]
396    async fn load_previous_chunk(
397        &self,
398        linked_chunk_id: LinkedChunkId<'_>,
399        before_chunk_identifier: ChunkIdentifier,
400    ) -> Result<Option<RawChunk<Event, Gap>>, IndexeddbEventCacheStoreError> {
401        let _timer = timer!("method");
402
403        let transaction = self.transaction(
404            &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
405            IdbTransactionMode::Readonly,
406        )?;
407        if let Some(chunk) =
408            transaction.get_chunk_by_id(linked_chunk_id, before_chunk_identifier).await?
409            && let Some(previous_identifier) = chunk.previous
410        {
411            let previous_identifier = ChunkIdentifier::new(previous_identifier);
412            Ok(transaction.load_chunk_by_id(linked_chunk_id, previous_identifier).await?)
413        } else {
414            Ok(None)
415        }
416    }
417
418    #[instrument(skip(self))]
419    async fn clear_all_linked_chunks(&self) -> Result<(), IndexeddbEventCacheStoreError> {
420        let _timer = timer!("method");
421
422        let transaction = self.transaction(
423            &[keys::LINKED_CHUNKS, keys::EVENTS, keys::GAPS],
424            IdbTransactionMode::Readwrite,
425        )?;
426        transaction.clear::<types::Chunk>().await?;
427        transaction.clear::<types::Event>().await?;
428        transaction.clear::<types::Gap>().await?;
429        transaction.commit().await?;
430        Ok(())
431    }
432
433    #[instrument(skip(self, events))]
434    async fn filter_duplicated_events(
435        &self,
436        linked_chunk_id: LinkedChunkId<'_>,
437        events: Vec<OwnedEventId>,
438    ) -> Result<Vec<(OwnedEventId, Position)>, IndexeddbEventCacheStoreError> {
439        let _timer = timer!("method");
440
441        if events.is_empty() {
442            return Ok(Vec::new());
443        }
444
445        let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
446        let mut duplicated = Vec::new();
447        for event_id in events {
448            if let Some(types::Event::InBand(event)) =
449                transaction.get_event_by_id(linked_chunk_id, &event_id).await?
450            {
451                duplicated.push((event_id, event.position.into()));
452            }
453        }
454        Ok(duplicated)
455    }
456
457    #[instrument(skip(self, event_id))]
458    async fn find_event(
459        &self,
460        room_id: &RoomId,
461        event_id: &EventId,
462    ) -> Result<Option<Event>, IndexeddbEventCacheStoreError> {
463        let _timer = timer!("method");
464
465        let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
466        transaction
467            .get_event_by_room(room_id, event_id)
468            .await
469            .map(|ok| ok.map(Into::into))
470            .map_err(Into::into)
471    }
472
473    #[instrument(skip(self, event_id, filters))]
474    async fn find_event_relations(
475        &self,
476        room_id: &RoomId,
477        event_id: &EventId,
478        filters: Option<&[RelationType]>,
479    ) -> Result<Vec<(Event, Option<Position>)>, IndexeddbEventCacheStoreError> {
480        let _timer = timer!("method");
481
482        let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
483
484        let mut related_events = Vec::new();
485        match filters {
486            Some(relation_types) if !relation_types.is_empty() => {
487                for relation_type in relation_types {
488                    let relation = (event_id, relation_type);
489                    let events = transaction.get_events_by_relation(room_id, relation).await?;
490                    for event in events {
491                        let position = event.position().map(Into::into);
492                        related_events.push((event.into(), position));
493                    }
494                }
495            }
496            _ => {
497                for event in transaction.get_events_by_related_event(room_id, event_id).await? {
498                    let position = event.position().map(Into::into);
499                    related_events.push((event.into(), position));
500                }
501            }
502        }
503        Ok(related_events)
504    }
505
506    #[instrument(skip(self))]
507    async fn get_room_events(
508        &self,
509        room_id: &RoomId,
510        event_type: Option<&str>,
511        session_id: Option<&str>,
512    ) -> Result<Vec<Event>, IndexeddbEventCacheStoreError> {
513        let _timer = timer!("method");
514
515        // TODO: Make this more efficient so we don't load all events and filter them
516        // here. We should instead only load the relevant events.
517
518        let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readonly)?;
519        transaction
520            .get_room_events(room_id)
521            .await
522            .map(|vec| {
523                vec.into_iter()
524                    .map(Event::from)
525                    .filter(|e| {
526                        event_type.is_none_or(|event_type| {
527                            Some(event_type) == e.kind.event_type().as_deref()
528                        })
529                    })
530                    .filter(|e| session_id.is_none_or(|s| Some(s) == e.kind.session_id()))
531                    .collect()
532            })
533            .map_err(Into::into)
534    }
535
536    #[instrument(skip(self, event))]
537    async fn save_event(
538        &self,
539        room_id: &RoomId,
540        event: Event,
541    ) -> Result<(), IndexeddbEventCacheStoreError> {
542        let _timer = timer!("method");
543
544        let Some(event_id) = event.event_id() else {
545            error!(%room_id, "Trying to save an event with no ID");
546            return Ok(());
547        };
548        let transaction = self.transaction(&[keys::EVENTS], IdbTransactionMode::Readwrite)?;
549        let event = match transaction.get_event_by_room(room_id, &event_id).await? {
550            Some(inner) => inner.with_content(event),
551            None => types::Event::OutOfBand(OutOfBandEvent {
552                linked_chunk_id: LinkedChunkId::Room(room_id).to_owned(),
553                content: event,
554                position: (),
555            }),
556        };
557        transaction.put_event(&event).await?;
558        transaction.commit().await?;
559        Ok(())
560    }
561
562    async fn optimize(&self) -> Result<(), Self::Error> {
563        Ok(())
564    }
565
566    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
567        Ok(None)
568    }
569}
570
571#[cfg(all(test, target_family = "wasm"))]
572mod tests {
573    use matrix_sdk_base::{
574        event_cache::store::EventCacheStoreError, event_cache_store_integration_tests,
575        event_cache_store_integration_tests_time,
576    };
577    use uuid::Uuid;
578
579    use crate::{
580        event_cache_store::IndexeddbEventCacheStore, indexeddb_event_cache_store_integration_tests,
581    };
582
583    mod unencrypted {
584        use super::*;
585
586        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
587
588        async fn get_event_cache_store() -> Result<IndexeddbEventCacheStore, EventCacheStoreError> {
589            let name = format!("test-event-cache-store-{}", Uuid::new_v4().as_hyphenated());
590            Ok(IndexeddbEventCacheStore::builder().database_name(name).build().await?)
591        }
592
593        event_cache_store_integration_tests!();
594        event_cache_store_integration_tests_time!();
595
596        indexeddb_event_cache_store_integration_tests!();
597    }
598
599    mod encrypted {
600        use std::sync::Arc;
601
602        use matrix_sdk_store_encryption::StoreCipher;
603
604        use super::*;
605
606        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
607
608        async fn get_event_cache_store() -> Result<IndexeddbEventCacheStore, EventCacheStoreError> {
609            let name = format!("test-event-cache-store-{}", Uuid::new_v4().as_hyphenated());
610            Ok(IndexeddbEventCacheStore::builder()
611                .database_name(name)
612                .store_cipher(Arc::new(StoreCipher::new().expect("store cipher")))
613                .build()
614                .await?)
615        }
616
617        event_cache_store_integration_tests!();
618        event_cache_store_integration_tests_time!();
619
620        indexeddb_event_cache_store_integration_tests!();
621    }
622}