matrix_sdk/sliding_sync/
room.rs

1use std::{
2    fmt::Debug,
3    sync::{Arc, RwLock},
4};
5
6use eyeball_im::Vector;
7use matrix_sdk_base::deserialized_responses::TimelineEvent;
8use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId, RoomId};
9use serde::{Deserialize, Serialize};
10
11/// The state of a [`SlidingSyncRoom`].
12#[derive(Copy, Clone, Debug, Default, PartialEq)]
13pub enum SlidingSyncRoomState {
14    /// The room is not loaded, i.e. not updates have been received yet.
15    #[default]
16    NotLoaded,
17
18    /// The room has been preloaded, i.e. its values come from the cache, but no
19    /// updates have been received yet.
20    Preloaded,
21
22    /// The room has received updates.
23    Loaded,
24}
25
26/// A Sliding Sync Room.
27///
28/// It contains some information about a specific room, along with a queue of
29/// events for the timeline.
30///
31/// It is OK to clone this type as much as you need: cloning it is cheap, and
32/// shallow. All clones of the same value are sharing the same state.
33#[derive(Debug, Clone)]
34pub struct SlidingSyncRoom {
35    inner: Arc<SlidingSyncRoomInner>,
36}
37
38impl SlidingSyncRoom {
39    /// Create a new `SlidingSyncRoom`.
40    pub fn new(
41        room_id: OwnedRoomId,
42        prev_batch: Option<String>,
43        timeline: Vec<TimelineEvent>,
44    ) -> Self {
45        Self {
46            inner: Arc::new(SlidingSyncRoomInner {
47                room_id,
48                state: RwLock::new(SlidingSyncRoomState::NotLoaded),
49                prev_batch: RwLock::new(prev_batch),
50                timeline_queue: RwLock::new(timeline.into()),
51            }),
52        }
53    }
54
55    /// Get the room ID of this `SlidingSyncRoom`.
56    pub fn room_id(&self) -> &RoomId {
57        &self.inner.room_id
58    }
59
60    /// Get the token for back-pagination.
61    pub fn prev_batch(&self) -> Option<String> {
62        self.inner.prev_batch.read().unwrap().clone()
63    }
64
65    /// Get a copy of the cached timeline events.
66    ///
67    /// Note: This API only exists temporarily, it *will* be removed in the
68    /// future.
69    pub fn timeline_queue(&self) -> Vector<TimelineEvent> {
70        self.inner.timeline_queue.read().unwrap().clone()
71    }
72
73    pub(super) fn update(
74        &mut self,
75        room_data: http::response::Room,
76        timeline_updates: Vec<TimelineEvent>,
77    ) {
78        let http::response::Room { prev_batch, limited, .. } = room_data;
79
80        {
81            if let Some(prev_batch) = &prev_batch {
82                let mut lock = self.inner.prev_batch.write().unwrap();
83                let _ = lock.replace(prev_batch.clone());
84            }
85        }
86
87        let mut state = self.inner.state.write().unwrap();
88
89        {
90            let mut timeline_queue = self.inner.timeline_queue.write().unwrap();
91
92            // There are timeline updates.
93            if !timeline_updates.is_empty() {
94                if let SlidingSyncRoomState::Preloaded = *state {
95                    // If the room has been read from the cache, we overwrite the timeline queue
96                    // with the timeline updates.
97
98                    timeline_queue.clear();
99                    timeline_queue.extend(timeline_updates);
100                } else if limited {
101                    // The server alerted us that we missed items in between.
102
103                    timeline_queue.clear();
104                    timeline_queue.extend(timeline_updates);
105                } else {
106                    // It's the hot path. We have new updates that must be added to the existing
107                    // timeline queue.
108
109                    timeline_queue.extend(timeline_updates);
110                }
111            } else if limited {
112                // No timeline updates, but `limited` is set to true. It's a way to
113                // alert that we are stale. In this case, we should just clear the
114                // existing timeline.
115
116                timeline_queue.clear();
117            }
118        }
119
120        *state = SlidingSyncRoomState::Loaded;
121    }
122
123    pub(super) fn from_frozen(frozen_room: FrozenSlidingSyncRoom) -> Self {
124        let FrozenSlidingSyncRoom { room_id, prev_batch, timeline_queue } = frozen_room;
125
126        Self {
127            inner: Arc::new(SlidingSyncRoomInner {
128                room_id,
129                prev_batch: RwLock::new(prev_batch),
130                state: RwLock::new(SlidingSyncRoomState::Preloaded),
131                timeline_queue: RwLock::new(timeline_queue),
132            }),
133        }
134    }
135}
136
137#[cfg(test)]
138impl SlidingSyncRoom {
139    fn state(&self) -> SlidingSyncRoomState {
140        *self.inner.state.read().unwrap()
141    }
142
143    fn set_state(&mut self, state: SlidingSyncRoomState) {
144        *self.inner.state.write().unwrap() = state;
145    }
146}
147
148#[derive(Debug)]
149struct SlidingSyncRoomInner {
150    /// The room ID.
151    room_id: OwnedRoomId,
152
153    /// Internal state of `Self`.
154    state: RwLock<SlidingSyncRoomState>,
155
156    /// The token for back-pagination.
157    prev_batch: RwLock<Option<String>>,
158
159    /// A queue of received events, used to build a
160    /// [`Timeline`][crate::Timeline].
161    ///
162    /// Given a room, its size is theoretically unbounded: we'll accumulate
163    /// events in this list, until we reach a limited sync, in which case
164    /// we'll clear it.
165    ///
166    /// When persisting the room, this queue is truncated to keep only the last
167    /// N events.
168    timeline_queue: RwLock<Vector<TimelineEvent>>,
169}
170
171/// A “frozen” [`SlidingSyncRoom`], i.e. that can be written into, or read from
172/// a store.
173#[derive(Debug, Serialize, Deserialize)]
174pub(super) struct FrozenSlidingSyncRoom {
175    pub(super) room_id: OwnedRoomId,
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub(super) prev_batch: Option<String>,
178    #[serde(rename = "timeline")]
179    pub(super) timeline_queue: Vector<TimelineEvent>,
180}
181
182/// Number of timeline events to keep when [`SlidingSyncRoom`] is saved in the
183/// cache.
184const NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE: usize = 10;
185
186impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom {
187    fn from(value: &SlidingSyncRoom) -> Self {
188        let timeline_queue = &value.inner.timeline_queue.read().unwrap();
189        let timeline_length = timeline_queue.len();
190
191        // To not overflow the cache, we only freeze the newest N items. On doing
192        // so, we must drop the `prev_batch` key however, as we'd otherwise
193        // create a gap between what we have loaded and where the
194        // prev_batch-key will start loading when paginating backwards.
195        let (timeline_queue, prev_batch) =
196            if timeline_length > NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE {
197                (
198                    (*timeline_queue)
199                        .iter()
200                        .skip(timeline_length - NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE)
201                        .cloned()
202                        .collect::<Vec<_>>()
203                        .into(),
204                    None, // Erase the `prev_batch`.
205                )
206            } else {
207                ((*timeline_queue).clone(), value.prev_batch())
208            };
209
210        Self { room_id: value.inner.room_id.clone(), prev_batch, timeline_queue }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use imbl::vector;
217    use matrix_sdk_common::deserialized_responses::TimelineEvent;
218    use matrix_sdk_test::async_test;
219    use ruma::{events::room::message::RoomMessageEventContent, room_id, serde::Raw, RoomId};
220    use serde_json::json;
221
222    use super::{http, NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE};
223    use crate::sliding_sync::{FrozenSlidingSyncRoom, SlidingSyncRoom, SlidingSyncRoomState};
224
225    macro_rules! room_response {
226        ( $( $json:tt )+ ) => {
227            serde_json::from_value::<http::response::Room>(
228                json!( $( $json )+ )
229            ).unwrap()
230        };
231    }
232
233    fn new_room(room_id: &RoomId, inner: http::response::Room) -> SlidingSyncRoom {
234        new_room_with_timeline(room_id, inner, vec![])
235    }
236
237    fn new_room_with_timeline(
238        room_id: &RoomId,
239        inner: http::response::Room,
240        timeline: Vec<TimelineEvent>,
241    ) -> SlidingSyncRoom {
242        SlidingSyncRoom::new(room_id.to_owned(), inner.prev_batch, timeline)
243    }
244
245    #[async_test]
246    async fn test_state_from_not_loaded() {
247        let mut room = new_room(room_id!("!foo:bar.org"), room_response!({}));
248
249        assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
250
251        // Update with an empty response, but it doesn't matter.
252        room.update(room_response!({}), vec![]);
253
254        assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
255    }
256
257    #[async_test]
258    async fn test_state_from_preloaded() {
259        let mut room = new_room(room_id!("!foo:bar.org"), room_response!({}));
260
261        room.set_state(SlidingSyncRoomState::Preloaded);
262
263        // Update with an empty response, but it doesn't matter.
264        room.update(room_response!({}), vec![]);
265
266        assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
267    }
268
269    #[async_test]
270    async fn test_room_room_id() {
271        let room_id = room_id!("!foo:bar.org");
272        let room = new_room(room_id, room_response!({}));
273
274        assert_eq!(room.room_id(), room_id);
275    }
276
277    #[async_test]
278    async fn test_prev_batch() {
279        // Default value.
280        {
281            let room = new_room(room_id!("!foo:bar.org"), room_response!({}));
282
283            assert_eq!(room.prev_batch(), None);
284        }
285
286        // Some value when initializing.
287        {
288            let room =
289                new_room(room_id!("!foo:bar.org"), room_response!({"prev_batch": "t111_222_333"}));
290
291            assert_eq!(room.prev_batch(), Some("t111_222_333".to_owned()));
292        }
293
294        // Some value when updating.
295        {
296            let mut room = new_room(room_id!("!foo:bar.org"), room_response!({}));
297
298            assert_eq!(room.prev_batch(), None);
299
300            room.update(room_response!({"prev_batch": "t111_222_333"}), vec![]);
301            assert_eq!(room.prev_batch(), Some("t111_222_333".to_owned()));
302
303            room.update(room_response!({}), vec![]);
304            assert_eq!(room.prev_batch(), Some("t111_222_333".to_owned()));
305        }
306    }
307
308    #[async_test]
309    async fn test_timeline_queue_initially_empty() {
310        let room = new_room(room_id!("!foo:bar.org"), room_response!({}));
311
312        assert!(room.timeline_queue().is_empty());
313    }
314
315    macro_rules! timeline_event {
316        (from $sender:literal with id $event_id:literal at $ts:literal: $message:literal) => {
317            TimelineEvent::new(
318                Raw::new(&json!({
319                    "content": RoomMessageEventContent::text_plain($message),
320                    "type": "m.room.message",
321                    "event_id": $event_id,
322                    "room_id": "!foo:bar.org",
323                    "origin_server_ts": $ts,
324                    "sender": $sender,
325                }))
326                .unwrap()
327                .cast()
328            )
329        };
330    }
331
332    macro_rules! assert_timeline_queue_event_ids {
333        (
334            with $( $timeline_queue:ident ).* {
335                $(
336                    $nth:literal => $event_id:literal
337                ),*
338                $(,)*
339            }
340        ) => {
341            let timeline = & $( $timeline_queue ).*;
342
343            $(
344                assert_eq!(timeline[ $nth ].raw().deserialize().unwrap().event_id(), $event_id);
345            )*
346        };
347    }
348
349    #[test]
350    fn test_timeline_queue_initially_not_empty() {
351        let room = new_room_with_timeline(
352            room_id!("!foo:bar.org"),
353            room_response!({}),
354            vec![
355                timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
356                timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
357            ],
358        );
359
360        {
361            let timeline_queue = room.timeline_queue();
362
363            assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
364            assert_eq!(timeline_queue.len(), 2);
365            assert_timeline_queue_event_ids!(
366                with timeline_queue {
367                    0 => "$x0:baz.org",
368                    1 => "$x1:baz.org",
369                }
370            );
371        }
372    }
373
374    #[test]
375    fn test_timeline_queue_update_with_empty_timeline() {
376        let mut room = new_room_with_timeline(
377            room_id!("!foo:bar.org"),
378            room_response!({}),
379            vec![
380                timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
381                timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
382            ],
383        );
384
385        {
386            let timeline_queue = room.timeline_queue();
387
388            assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
389            assert_eq!(timeline_queue.len(), 2);
390            assert_timeline_queue_event_ids!(
391                with timeline_queue {
392                    0 => "$x0:baz.org",
393                    1 => "$x1:baz.org",
394                }
395            );
396        }
397
398        room.update(room_response!({}), vec![]);
399
400        // The queue is unmodified.
401        {
402            let timeline_queue = room.timeline_queue();
403
404            assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
405            assert_eq!(timeline_queue.len(), 2);
406            assert_timeline_queue_event_ids!(
407                with timeline_queue {
408                    0 => "$x0:baz.org",
409                    1 => "$x1:baz.org",
410                }
411            );
412        }
413    }
414
415    #[test]
416    fn test_timeline_queue_update_with_empty_timeline_and_with_limited() {
417        let mut room = new_room_with_timeline(
418            room_id!("!foo:bar.org"),
419            room_response!({}),
420            vec![
421                timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
422                timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
423            ],
424        );
425
426        {
427            let timeline_queue = room.timeline_queue();
428
429            assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
430            assert_eq!(timeline_queue.len(), 2);
431            assert_timeline_queue_event_ids!(
432                with timeline_queue {
433                    0 => "$x0:baz.org",
434                    1 => "$x1:baz.org",
435                }
436            );
437        }
438
439        room.update(
440            room_response!({
441                "limited": true
442            }),
443            vec![],
444        );
445
446        // The queue has been emptied.
447        {
448            let timeline_queue = room.timeline_queue();
449
450            assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
451            assert_eq!(timeline_queue.len(), 0);
452        }
453    }
454
455    #[test]
456    fn test_timeline_queue_update_from_preloaded() {
457        let mut room = new_room_with_timeline(
458            room_id!("!foo:bar.org"),
459            room_response!({}),
460            vec![
461                timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
462                timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
463            ],
464        );
465
466        room.set_state(SlidingSyncRoomState::Preloaded);
467
468        {
469            let timeline_queue = room.timeline_queue();
470
471            assert_eq!(room.state(), SlidingSyncRoomState::Preloaded);
472            assert_eq!(timeline_queue.len(), 2);
473            assert_timeline_queue_event_ids!(
474                with timeline_queue {
475                    0 => "$x0:baz.org",
476                    1 => "$x1:baz.org",
477                }
478            );
479        }
480
481        room.update(
482            room_response!({}),
483            vec![
484                timeline_event!(from "@alice:baz.org" with id "$x2:baz.org" at 2: "message 2"),
485                timeline_event!(from "@alice:baz.org" with id "$x3:baz.org" at 3: "message 3"),
486            ],
487        );
488
489        // The queue is emptied, and new events are appended.
490        {
491            let timeline_queue = room.timeline_queue();
492
493            assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
494            assert_eq!(timeline_queue.len(), 2);
495            assert_timeline_queue_event_ids!(
496                with timeline_queue {
497                    0 => "$x2:baz.org",
498                    1 => "$x3:baz.org",
499                }
500            );
501        }
502    }
503
504    #[test]
505    fn test_timeline_queue_update_from_not_loaded() {
506        let mut room = new_room_with_timeline(
507            room_id!("!foo:bar.org"),
508            room_response!({}),
509            vec![
510                timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
511                timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
512            ],
513        );
514
515        {
516            let timeline_queue = room.timeline_queue();
517
518            assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
519            assert_eq!(timeline_queue.len(), 2);
520            assert_timeline_queue_event_ids!(
521                with timeline_queue {
522                    0 => "$x0:baz.org",
523                    1 => "$x1:baz.org",
524                }
525            );
526        }
527
528        room.update(
529            room_response!({}),
530            vec![
531                timeline_event!(from "@alice:baz.org" with id "$x2:baz.org" at 2: "message 2"),
532                timeline_event!(from "@alice:baz.org" with id "$x3:baz.org" at 3: "message 3"),
533            ],
534        );
535
536        // New events are appended to the queue.
537        {
538            let timeline_queue = room.timeline_queue();
539
540            assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
541            assert_eq!(timeline_queue.len(), 4);
542            assert_timeline_queue_event_ids!(
543                with timeline_queue {
544                    0 => "$x0:baz.org",
545                    1 => "$x1:baz.org",
546                    2 => "$x2:baz.org",
547                    3 => "$x3:baz.org",
548                }
549            );
550        }
551    }
552
553    #[test]
554    fn test_timeline_queue_update_from_not_loaded_with_limited() {
555        let mut room = new_room_with_timeline(
556            room_id!("!foo:bar.org"),
557            room_response!({}),
558            vec![
559                timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
560                timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
561            ],
562        );
563
564        {
565            let timeline_queue = room.timeline_queue();
566
567            assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
568            assert_eq!(timeline_queue.len(), 2);
569            assert_timeline_queue_event_ids!(
570                with timeline_queue {
571                    0 => "$x0:baz.org",
572                    1 => "$x1:baz.org",
573                }
574
575            );
576        }
577
578        room.update(
579            room_response!({
580                "limited": true,
581            }),
582            vec![
583                timeline_event!(from "@alice:baz.org" with id "$x2:baz.org" at 2: "message 2"),
584                timeline_event!(from "@alice:baz.org" with id "$x3:baz.org" at 3: "message 3"),
585            ],
586        );
587
588        // The queue is emptied, and new events are appended.
589        {
590            let timeline_queue = room.timeline_queue();
591
592            assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
593            assert_eq!(timeline_queue.len(), 2);
594            assert_timeline_queue_event_ids!(
595                with timeline_queue {
596                    0 => "$x2:baz.org",
597                    1 => "$x3:baz.org",
598                }
599            );
600        }
601    }
602
603    #[test]
604    fn test_frozen_sliding_sync_room_serialization() {
605        let frozen_room = FrozenSlidingSyncRoom {
606            room_id: room_id!("!29fhd83h92h0:example.com").to_owned(),
607            prev_batch: Some("foo".to_owned()),
608            timeline_queue: vector![TimelineEvent::new(
609                Raw::new(&json!({
610                    "content": RoomMessageEventContent::text_plain("let it gooo!"),
611                    "type": "m.room.message",
612                    "event_id": "$xxxxx:example.org",
613                    "room_id": "!someroom:example.com",
614                    "origin_server_ts": 2189,
615                    "sender": "@bob:example.com",
616                }))
617                .unwrap()
618                .cast(),
619            )],
620        };
621
622        let serialized = serde_json::to_value(&frozen_room).unwrap();
623
624        assert_eq!(
625            serialized,
626            json!({
627                "room_id": "!29fhd83h92h0:example.com",
628                "prev_batch": "foo",
629                "timeline": [
630                    {
631                        "kind": { "PlainText": { "event": {
632                            "content": {
633                                "body": "let it gooo!",
634                                "msgtype": "m.text"
635                            },
636                            "event_id": "$xxxxx:example.org",
637                            "origin_server_ts": 2189,
638                            "room_id": "!someroom:example.com",
639                            "sender": "@bob:example.com",
640                            "type": "m.room.message"
641                        }}}
642                    }
643                ]
644            })
645        );
646
647        let deserialized = serde_json::from_value::<FrozenSlidingSyncRoom>(serialized).unwrap();
648
649        assert_eq!(deserialized.room_id, frozen_room.room_id);
650    }
651
652    #[test]
653    fn test_frozen_sliding_sync_room_has_a_capped_version_of_the_timeline() {
654        // Just below the limit.
655        {
656            let max = NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE - 1;
657            let timeline_events = (0..=max)
658                .map(|nth| {
659                    TimelineEvent::new(
660                        Raw::new(&json!({
661                            "content": RoomMessageEventContent::text_plain(format!("message {nth}")),
662                            "type": "m.room.message",
663                            "event_id": format!("$x{nth}:baz.org"),
664                            "room_id": "!foo:bar.org",
665                            "origin_server_ts": nth,
666                            "sender": "@alice:baz.org",
667                        }))
668                        .unwrap()
669                        .cast(),
670                    )
671                })
672                .collect::<Vec<_>>();
673
674            let room = new_room_with_timeline(
675                room_id!("!foo:bar.org"),
676                room_response!({}),
677                timeline_events,
678            );
679
680            let frozen_room = FrozenSlidingSyncRoom::from(&room);
681            assert_eq!(frozen_room.timeline_queue.len(), max + 1);
682            // Check that the last event is the last event of the timeline, i.e. we only
683            // keep the _latest_ events, not the _first_ events.
684            assert_eq!(
685                frozen_room.timeline_queue.last().unwrap().raw().deserialize().unwrap().event_id(),
686                &format!("$x{max}:baz.org")
687            );
688        }
689
690        // Above the limit.
691        {
692            let max = NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE + 2;
693            let timeline_events = (0..=max)
694                .map(|nth| {
695                    TimelineEvent::new(
696                    Raw::new(&json!({
697                        "content": RoomMessageEventContent::text_plain(format!("message {nth}")),
698                        "type": "m.room.message",
699                        "event_id": format!("$x{nth}:baz.org"),
700                        "room_id": "!foo:bar.org",
701                        "origin_server_ts": nth,
702                        "sender": "@alice:baz.org",
703                    }))
704                    .unwrap()
705                    .cast(),
706                )
707                })
708                .collect::<Vec<_>>();
709
710            let room = new_room_with_timeline(
711                room_id!("!foo:bar.org"),
712                room_response!({}),
713                timeline_events,
714            );
715
716            let frozen_room = FrozenSlidingSyncRoom::from(&room);
717            assert_eq!(
718                frozen_room.timeline_queue.len(),
719                NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE
720            );
721            // Check that the last event is the last event of the timeline, i.e. we only
722            // keep the _latest_ events, not the _first_ events.
723            assert_eq!(
724                frozen_room.timeline_queue.last().unwrap().raw().deserialize().unwrap().event_id(),
725                &format!("$x{max}:baz.org")
726            );
727        }
728    }
729}