matrix_sdk/sliding_sync/
cache.rs

1//! Cache utilities.
2//!
3//! A `SlidingSync` instance can be stored in a cache, and restored from the
4//! same cache. It helps to define what it sometimes called a “cold start”, or a
5//!  “fast start”.
6
7use std::collections::BTreeMap;
8
9use matrix_sdk_base::{StateStore, StoreError};
10use matrix_sdk_common::timer;
11use ruma::{OwnedRoomId, UserId};
12use tracing::{trace, warn};
13
14use super::{
15    FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
16    SlidingSyncPositionMarkers, SlidingSyncRoom,
17};
18#[cfg(feature = "e2e-encryption")]
19use crate::sliding_sync::FrozenSlidingSyncPos;
20use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result};
21
22/// Be careful: as this is used as a storage key; changing it requires migrating
23/// data!
24pub(super) fn format_storage_key_prefix(id: &str, user_id: &UserId) -> String {
25    format!("sliding_sync_store::{}::{}", id, user_id)
26}
27
28/// Be careful: as this is used as a storage key; changing it requires migrating
29/// data!
30fn format_storage_key_for_sliding_sync(storage_key: &str) -> String {
31    format!("{storage_key}::instance")
32}
33
34/// Be careful: as this is used as a storage key; changing it requires migrating
35/// data!
36fn format_storage_key_for_sliding_sync_list(storage_key: &str, list_name: &str) -> String {
37    format!("{storage_key}::list::{list_name}")
38}
39
40/// Invalidate a single [`SlidingSyncList`] cache entry by removing it from the
41/// state store cache.
42async fn invalidate_cached_list(
43    storage: &dyn StateStore<Error = StoreError>,
44    storage_key: &str,
45    list_name: &str,
46) {
47    let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
48    let _ = storage.remove_custom_value(storage_key_for_list.as_bytes()).await;
49}
50
51/// Clean the storage for everything related to `SlidingSync` and all known
52/// lists.
53async fn clean_storage(
54    client: &Client,
55    storage_key: &str,
56    lists: &BTreeMap<String, SlidingSyncList>,
57) {
58    let storage = client.store();
59    for list_name in lists.keys() {
60        invalidate_cached_list(storage, storage_key, list_name).await;
61    }
62    let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
63    let _ = storage.remove_custom_value(instance_storage_key.as_bytes()).await;
64
65    #[cfg(feature = "e2e-encryption")]
66    if let Some(olm_machine) = &*client.olm_machine().await {
67        // Invalidate the value stored for the TERRIBLE HACK.
68        let _ = olm_machine
69            .store()
70            .set_custom_value(&instance_storage_key, "".as_bytes().to_vec())
71            .await;
72    }
73}
74
75/// Store the `SlidingSync`'s state in the storage.
76pub(super) async fn store_sliding_sync_state(
77    sliding_sync: &SlidingSync,
78    _position: &SlidingSyncPositionMarkers,
79) -> Result<()> {
80    let storage_key = &sliding_sync.inner.storage_key;
81    let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
82
83    trace!(storage_key, "Saving a `SlidingSync` to the state store");
84    let storage = sliding_sync.inner.client.store();
85
86    // Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside
87    // the store.
88    storage
89        .set_custom_value(
90            instance_storage_key.as_bytes(),
91            serde_json::to_vec(&FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await))?,
92        )
93        .await?;
94
95    #[cfg(feature = "e2e-encryption")]
96    {
97        let position = _position;
98
99        // FIXME (TERRIBLE HACK): we want to save `pos` in a cross-process safe manner,
100        // with both processes sharing the same database backend; that needs to
101        // go in the crypto process store at the moment, but should be fixed
102        // later on.
103        if let Some(olm_machine) = &*sliding_sync.inner.client.olm_machine().await {
104            let pos_blob = serde_json::to_vec(&FrozenSlidingSyncPos { pos: position.pos.clone() })?;
105            olm_machine.store().set_custom_value(&instance_storage_key, pos_blob).await?;
106        }
107    }
108
109    // Write every `SlidingSyncList` that's configured for caching into the store.
110    let frozen_lists = {
111        sliding_sync
112            .inner
113            .lists
114            .read()
115            .await
116            .iter()
117            .filter(|(_, list)| matches!(list.cache_policy(), SlidingSyncListCachePolicy::Enabled))
118            .map(|(list_name, list)| {
119                Ok((
120                    format_storage_key_for_sliding_sync_list(storage_key, list_name),
121                    serde_json::to_vec(&FrozenSlidingSyncList::freeze(list))?,
122                ))
123            })
124            .collect::<Result<Vec<_>, crate::Error>>()?
125    };
126
127    for (storage_key_for_list, frozen_list) in frozen_lists {
128        trace!(storage_key_for_list, "Saving a `SlidingSyncList`");
129
130        storage.set_custom_value(storage_key_for_list.as_bytes(), frozen_list).await?;
131    }
132
133    Ok(())
134}
135
136/// Try to restore a single [`SlidingSyncList`] from the cache.
137///
138/// If it fails to deserialize for some reason, invalidate the cache entry.
139pub(super) async fn restore_sliding_sync_list(
140    storage: &dyn StateStore<Error = StoreError>,
141    storage_key: &str,
142    list_name: &str,
143) -> Result<Option<FrozenSlidingSyncList>> {
144    let _timer = timer!(format!("loading list from DB {list_name}"));
145
146    let storage_key_for_list = format_storage_key_for_sliding_sync_list(storage_key, list_name);
147
148    match storage
149        .get_custom_value(storage_key_for_list.as_bytes())
150        .await?
151        .map(|custom_value| serde_json::from_slice::<FrozenSlidingSyncList>(&custom_value))
152    {
153        Some(Ok(frozen_list)) => {
154            // List has been found and successfully deserialized.
155            trace!(list_name, "successfully read the list from cache");
156            return Ok(Some(frozen_list));
157        }
158
159        Some(Err(_)) => {
160            // List has been found, but it wasn't possible to deserialize it. It's declared
161            // as obsolete. The main reason might be that the internal representation of a
162            // `SlidingSyncList` might have changed. Instead of considering this as a strong
163            // error, we remove the entry from the cache and keep the list in its initial
164            // state.
165            warn!(
166                list_name,
167                "failed to deserialize the list from the cache, it is obsolete; removing the cache entry!"
168            );
169            // Let's clear the list and stop here.
170            invalidate_cached_list(storage, storage_key, list_name).await;
171        }
172
173        None => {
174            // A missing cache doesn't make anything obsolete.
175            // We just do nothing here.
176            trace!(list_name, "failed to find the list in the cache");
177        }
178    }
179
180    Ok(None)
181}
182
183/// Fields restored during `restore_sliding_sync_state`.
184#[derive(Default)]
185pub(super) struct RestoredFields {
186    pub to_device_token: Option<String>,
187    pub pos: Option<String>,
188    pub rooms: BTreeMap<OwnedRoomId, SlidingSyncRoom>,
189}
190
191/// Restore the `SlidingSync`'s state from what is stored in the storage.
192///
193/// If one cache is obsolete (corrupted, and cannot be deserialized or
194/// anything), the entire `SlidingSync` cache is removed.
195pub(super) async fn restore_sliding_sync_state(
196    client: &Client,
197    storage_key: &str,
198    lists: &BTreeMap<String, SlidingSyncList>,
199) -> Result<Option<RestoredFields>> {
200    let _timer = timer!(format!("loading sliding sync {storage_key} state from DB"));
201
202    let mut restored_fields = RestoredFields::default();
203
204    #[cfg(feature = "e2e-encryption")]
205    if let Some(olm_machine) = &*client.olm_machine().await {
206        match olm_machine.store().next_batch_token().await? {
207            Some(token) => {
208                restored_fields.to_device_token = Some(token);
209            }
210            None => trace!("No `SlidingSync` in the crypto-store cache"),
211        }
212    }
213
214    let storage = client.store();
215    let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
216
217    // Preload the `SlidingSync` object from the cache.
218    match storage
219        .get_custom_value(instance_storage_key.as_bytes())
220        .await?
221        .map(|custom_value| serde_json::from_slice::<FrozenSlidingSync>(&custom_value))
222    {
223        // `SlidingSync` has been found and successfully deserialized.
224        Some(Ok(FrozenSlidingSync { to_device_since, rooms: frozen_rooms })) => {
225            trace!("Successfully read the `SlidingSync` from the cache");
226            // Only update the to-device token if we failed to read it from the crypto store
227            // above.
228            if restored_fields.to_device_token.is_none() {
229                restored_fields.to_device_token = to_device_since;
230            }
231
232            #[cfg(feature = "e2e-encryption")]
233            {
234                if let Some(olm_machine) = &*client.olm_machine().await {
235                    if let Ok(Some(blob)) =
236                        olm_machine.store().get_custom_value(&instance_storage_key).await
237                    {
238                        if let Ok(frozen_pos) =
239                            serde_json::from_slice::<FrozenSlidingSyncPos>(&blob)
240                        {
241                            trace!("Successfully read the `Sliding Sync` pos from the crypto store cache");
242                            restored_fields.pos = frozen_pos.pos;
243                        }
244                    }
245                }
246            }
247
248            restored_fields.rooms = frozen_rooms
249                .into_iter()
250                .map(|frozen_room| {
251                    (frozen_room.room_id.clone(), SlidingSyncRoom::from_frozen(frozen_room))
252                })
253                .collect();
254        }
255
256        // `SlidingSync` has been found, but it wasn't possible to deserialize it. It's
257        // declared as obsolete. The main reason might be that the internal
258        // representation of a `SlidingSync` might have changed.
259        // Instead of considering this as a strong error, we remove
260        // the entry from the cache and keep `SlidingSync` in its initial
261        // state.
262        Some(Err(_)) => {
263            warn!(
264                "failed to deserialize `SlidingSync` from the cache, it is obsolete; removing the cache entry!"
265            );
266
267            // Let's clear everything and stop here.
268            clean_storage(client, storage_key, lists).await;
269
270            return Ok(None);
271        }
272
273        None => {
274            trace!("No Sliding Sync object in the cache");
275        }
276    }
277
278    Ok(Some(restored_fields))
279}
280
281#[cfg(test)]
282mod tests {
283    use std::sync::{Arc, RwLock};
284
285    use assert_matches::assert_matches;
286    use matrix_sdk_test::async_test;
287    use ruma::owned_room_id;
288
289    use super::{
290        super::FrozenSlidingSyncRoom, clean_storage, format_storage_key_for_sliding_sync,
291        format_storage_key_for_sliding_sync_list, format_storage_key_prefix,
292        restore_sliding_sync_state, store_sliding_sync_state, SlidingSyncList,
293    };
294    use crate::{test_utils::logged_in_client, Result, SlidingSyncRoom};
295
296    #[allow(clippy::await_holding_lock)]
297    #[async_test]
298    async fn test_sliding_sync_can_be_stored_and_restored() -> Result<()> {
299        let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
300
301        let store = client.store();
302
303        // Store entries don't exist.
304        assert!(store
305            .get_custom_value(format_storage_key_for_sliding_sync("hello").as_bytes())
306            .await?
307            .is_none());
308
309        assert!(store
310            .get_custom_value(
311                format_storage_key_for_sliding_sync_list("hello", "list_foo").as_bytes()
312            )
313            .await?
314            .is_none());
315
316        assert!(store
317            .get_custom_value(
318                format_storage_key_for_sliding_sync_list("hello", "list_bar").as_bytes()
319            )
320            .await?
321            .is_none());
322
323        let room_id1 = owned_room_id!("!r1:matrix.org");
324        let room_id2 = owned_room_id!("!r2:matrix.org");
325
326        // Create a new `SlidingSync` instance, and store it.
327        let storage_key = {
328            let sync_id = "test-sync-id";
329            let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
330            let sliding_sync = client
331                .sliding_sync(sync_id)?
332                .add_cached_list(SlidingSyncList::builder("list_foo"))
333                .await?
334                .add_list(SlidingSyncList::builder("list_bar"))
335                .build()
336                .await?;
337
338            // Modify both lists, so we can check expected caching behavior later.
339            {
340                let lists = sliding_sync.inner.lists.write().await;
341
342                let list_foo = lists.get("list_foo").unwrap();
343                list_foo.set_maximum_number_of_rooms(Some(42));
344
345                let list_bar = lists.get("list_bar").unwrap();
346                list_bar.set_maximum_number_of_rooms(Some(1337));
347            }
348
349            // Add some rooms.
350            {
351                let mut rooms = sliding_sync.inner.rooms.write().await;
352
353                rooms.insert(
354                    room_id1.clone(),
355                    SlidingSyncRoom::new(room_id1.clone(), None, Vec::new()),
356                );
357                rooms.insert(
358                    room_id2.clone(),
359                    SlidingSyncRoom::new(room_id2.clone(), None, Vec::new()),
360                );
361            }
362
363            let position_guard = sliding_sync.inner.position.lock().await;
364            assert!(sliding_sync.cache_to_storage(&position_guard).await.is_ok());
365
366            storage_key
367        };
368
369        // Store entries now exist for the sliding sync object and list_foo.
370        assert!(store
371            .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
372            .await?
373            .is_some());
374
375        assert!(store
376            .get_custom_value(
377                format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
378            )
379            .await?
380            .is_some());
381
382        // But not for list_bar.
383        assert!(store
384            .get_custom_value(
385                format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
386            )
387            .await?
388            .is_none());
389
390        // Create a new `SlidingSync`, and it should be read from the cache.
391        let storage_key = {
392            let sync_id = "test-sync-id";
393            let storage_key = format_storage_key_prefix(sync_id, client.user_id().unwrap());
394            let max_number_of_room_stream = Arc::new(RwLock::new(None));
395            let cloned_stream = max_number_of_room_stream.clone();
396            let sliding_sync = client
397                .sliding_sync(sync_id)?
398                .add_cached_list(SlidingSyncList::builder("list_foo").once_built(move |list| {
399                    // In the `once_built()` handler, nothing has been read from the cache yet.
400                    assert_eq!(list.maximum_number_of_rooms(), None);
401
402                    let mut stream = cloned_stream.write().unwrap();
403                    *stream = Some(list.maximum_number_of_rooms_stream());
404                    list
405                }))
406                .await?
407                .add_list(SlidingSyncList::builder("list_bar"))
408                .build()
409                .await?;
410
411            // Check the list' state.
412            {
413                let lists = sliding_sync.inner.lists.read().await;
414
415                // This one was cached.
416                let list_foo = lists.get("list_foo").unwrap();
417                assert_eq!(list_foo.maximum_number_of_rooms(), Some(42));
418
419                // This one wasn't.
420                let list_bar = lists.get("list_bar").unwrap();
421                assert_eq!(list_bar.maximum_number_of_rooms(), None);
422            }
423
424            // Check the rooms.
425            {
426                let rooms = sliding_sync.inner.rooms.read().await;
427
428                // Rooms were cached.
429                assert!(rooms.contains_key(&room_id1));
430                assert!(rooms.contains_key(&room_id2));
431            }
432
433            // The maximum number of rooms reloaded from the cache should have been
434            // published.
435            {
436                let mut stream =
437                    max_number_of_room_stream.write().unwrap().take().expect("stream must be set");
438                let initial_max_number_of_rooms =
439                    stream.next().await.expect("stream must have emitted something");
440                assert_eq!(initial_max_number_of_rooms, Some(42));
441            }
442
443            // Clean the cache.
444            let lists = sliding_sync.inner.lists.read().await;
445            clean_storage(&client, &storage_key, &lists).await;
446            storage_key
447        };
448
449        // Store entries don't exist.
450        assert!(store
451            .get_custom_value(format_storage_key_for_sliding_sync(&storage_key).as_bytes())
452            .await?
453            .is_none());
454
455        assert!(store
456            .get_custom_value(
457                format_storage_key_for_sliding_sync_list(&storage_key, "list_foo").as_bytes()
458            )
459            .await?
460            .is_none());
461
462        assert!(store
463            .get_custom_value(
464                format_storage_key_for_sliding_sync_list(&storage_key, "list_bar").as_bytes()
465            )
466            .await?
467            .is_none());
468
469        Ok(())
470    }
471
472    #[cfg(feature = "e2e-encryption")]
473    #[async_test]
474    async fn test_sliding_sync_high_level_cache_and_restore() -> Result<()> {
475        use imbl::Vector;
476        use ruma::owned_room_id;
477
478        use crate::sliding_sync::FrozenSlidingSync;
479
480        let client = logged_in_client(Some("https://foo.bar".to_owned())).await;
481
482        let sync_id = "test-sync-id";
483        let storage_key_prefix = format_storage_key_prefix(sync_id, client.user_id().unwrap());
484        let full_storage_key = format_storage_key_for_sliding_sync(&storage_key_prefix);
485        let sliding_sync = client.sliding_sync(sync_id)?.build().await?;
486
487        // At first, there's nothing in both stores.
488        if let Some(olm_machine) = &*client.base_client().olm_machine().await {
489            let store = olm_machine.store();
490            assert!(store.next_batch_token().await?.is_none());
491        }
492
493        let state_store = client.store();
494        assert!(state_store.get_custom_value(full_storage_key.as_bytes()).await?.is_none());
495
496        // Emulate some data to be cached.
497        let pos = "pos".to_owned();
498        {
499            let mut position_guard = sliding_sync.inner.position.lock().await;
500            position_guard.pos = Some(pos.clone());
501
502            // Then, we can correctly cache the sliding sync instance.
503            store_sliding_sync_state(&sliding_sync, &position_guard).await?;
504        }
505
506        // The delta token has been correctly written to the state store (but not the
507        // to_device_since, since it's in the other store).
508        let state_store = client.store();
509        assert_matches!(
510            state_store.get_custom_value(full_storage_key.as_bytes()).await?,
511            Some(bytes) => {
512                let deserialized: FrozenSlidingSync = serde_json::from_slice(&bytes)?;
513                assert!(deserialized.to_device_since.is_none());
514            }
515        );
516
517        // Ok, forget about the sliding sync, let's recreate one from scratch.
518        drop(sliding_sync);
519
520        let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into())
521            .await?
522            .expect("must have restored sliding sync fields");
523
524        // After restoring, to-device token could be read.
525        assert_eq!(restored_fields.pos.unwrap(), pos);
526
527        // Test the "migration" path: assume a missing to-device token in crypto store,
528        // but present in a former state store.
529
530        // For our sanity, check no to-device token has been saved in the database.
531        {
532            let olm_machine = client.base_client().olm_machine().await;
533            let olm_machine = olm_machine.as_ref().unwrap();
534            assert!(olm_machine.store().next_batch_token().await?.is_none());
535        }
536
537        let to_device_token = "to_device_token".to_owned();
538
539        // Put that delta-token in the state store.
540        let state_store = client.store();
541        state_store
542            .set_custom_value(
543                full_storage_key.as_bytes(),
544                serde_json::to_vec(&FrozenSlidingSync {
545                    to_device_since: Some(to_device_token.clone()),
546                    rooms: vec![FrozenSlidingSyncRoom {
547                        room_id: owned_room_id!("!r0:matrix.org"),
548                        prev_batch: Some("t0ken".to_owned()),
549                        timeline_queue: Vector::new(),
550                    }],
551                })?,
552            )
553            .await?;
554
555        let restored_fields = restore_sliding_sync_state(&client, &storage_key_prefix, &[].into())
556            .await?
557            .expect("must have restored fields");
558
559        // After restoring, the to-device since token, stream position and rooms could
560        // be read from the state store.
561        assert_eq!(restored_fields.to_device_token.unwrap(), to_device_token);
562        assert_eq!(restored_fields.pos.unwrap(), pos);
563        assert_eq!(restored_fields.rooms.len(), 1);
564
565        Ok(())
566    }
567}