1use std::{
2 fmt::Debug,
3 sync::{Arc, RwLock},
4};
5
6use eyeball_im::Vector;
7use matrix_sdk_base::deserialized_responses::TimelineEvent;
8use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId, RoomId};
9use serde::{Deserialize, Serialize};
10
11#[derive(Copy, Clone, Debug, Default, PartialEq)]
13pub enum SlidingSyncRoomState {
14 #[default]
16 NotLoaded,
17
18 Preloaded,
21
22 Loaded,
24}
25
26#[derive(Debug, Clone)]
34pub struct SlidingSyncRoom {
35 inner: Arc<SlidingSyncRoomInner>,
36}
37
38impl SlidingSyncRoom {
39 pub fn new(
41 room_id: OwnedRoomId,
42 prev_batch: Option<String>,
43 timeline: Vec<TimelineEvent>,
44 ) -> Self {
45 Self {
46 inner: Arc::new(SlidingSyncRoomInner {
47 room_id,
48 state: RwLock::new(SlidingSyncRoomState::NotLoaded),
49 prev_batch: RwLock::new(prev_batch),
50 timeline_queue: RwLock::new(timeline.into()),
51 }),
52 }
53 }
54
55 pub fn room_id(&self) -> &RoomId {
57 &self.inner.room_id
58 }
59
60 pub fn prev_batch(&self) -> Option<String> {
62 self.inner.prev_batch.read().unwrap().clone()
63 }
64
65 pub fn timeline_queue(&self) -> Vector<TimelineEvent> {
70 self.inner.timeline_queue.read().unwrap().clone()
71 }
72
73 pub(super) fn update(
74 &mut self,
75 room_data: http::response::Room,
76 timeline_updates: Vec<TimelineEvent>,
77 ) {
78 let http::response::Room { prev_batch, limited, .. } = room_data;
79
80 {
81 if let Some(prev_batch) = &prev_batch {
82 let mut lock = self.inner.prev_batch.write().unwrap();
83 let _ = lock.replace(prev_batch.clone());
84 }
85 }
86
87 let mut state = self.inner.state.write().unwrap();
88
89 {
90 let mut timeline_queue = self.inner.timeline_queue.write().unwrap();
91
92 if !timeline_updates.is_empty() {
94 if let SlidingSyncRoomState::Preloaded = *state {
95 timeline_queue.clear();
99 timeline_queue.extend(timeline_updates);
100 } else if limited {
101 timeline_queue.clear();
104 timeline_queue.extend(timeline_updates);
105 } else {
106 timeline_queue.extend(timeline_updates);
110 }
111 } else if limited {
112 timeline_queue.clear();
117 }
118 }
119
120 *state = SlidingSyncRoomState::Loaded;
121 }
122
123 pub(super) fn from_frozen(frozen_room: FrozenSlidingSyncRoom) -> Self {
124 let FrozenSlidingSyncRoom { room_id, prev_batch, timeline_queue } = frozen_room;
125
126 Self {
127 inner: Arc::new(SlidingSyncRoomInner {
128 room_id,
129 prev_batch: RwLock::new(prev_batch),
130 state: RwLock::new(SlidingSyncRoomState::Preloaded),
131 timeline_queue: RwLock::new(timeline_queue),
132 }),
133 }
134 }
135}
136
137#[cfg(test)]
138impl SlidingSyncRoom {
139 fn state(&self) -> SlidingSyncRoomState {
140 *self.inner.state.read().unwrap()
141 }
142
143 fn set_state(&mut self, state: SlidingSyncRoomState) {
144 *self.inner.state.write().unwrap() = state;
145 }
146}
147
148#[derive(Debug)]
149struct SlidingSyncRoomInner {
150 room_id: OwnedRoomId,
152
153 state: RwLock<SlidingSyncRoomState>,
155
156 prev_batch: RwLock<Option<String>>,
158
159 timeline_queue: RwLock<Vector<TimelineEvent>>,
169}
170
171#[derive(Debug, Serialize, Deserialize)]
174pub(super) struct FrozenSlidingSyncRoom {
175 pub(super) room_id: OwnedRoomId,
176 #[serde(skip_serializing_if = "Option::is_none")]
177 pub(super) prev_batch: Option<String>,
178 #[serde(rename = "timeline")]
179 pub(super) timeline_queue: Vector<TimelineEvent>,
180}
181
182const NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE: usize = 10;
185
186impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom {
187 fn from(value: &SlidingSyncRoom) -> Self {
188 let timeline_queue = &value.inner.timeline_queue.read().unwrap();
189 let timeline_length = timeline_queue.len();
190
191 let (timeline_queue, prev_batch) =
196 if timeline_length > NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE {
197 (
198 (*timeline_queue)
199 .iter()
200 .skip(timeline_length - NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE)
201 .cloned()
202 .collect::<Vec<_>>()
203 .into(),
204 None, )
206 } else {
207 ((*timeline_queue).clone(), value.prev_batch())
208 };
209
210 Self { room_id: value.inner.room_id.clone(), prev_batch, timeline_queue }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use imbl::vector;
217 use matrix_sdk_common::deserialized_responses::TimelineEvent;
218 use matrix_sdk_test::async_test;
219 use ruma::{events::room::message::RoomMessageEventContent, room_id, serde::Raw, RoomId};
220 use serde_json::json;
221
222 use super::{http, NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE};
223 use crate::sliding_sync::{FrozenSlidingSyncRoom, SlidingSyncRoom, SlidingSyncRoomState};
224
225 macro_rules! room_response {
226 ( $( $json:tt )+ ) => {
227 serde_json::from_value::<http::response::Room>(
228 json!( $( $json )+ )
229 ).unwrap()
230 };
231 }
232
233 fn new_room(room_id: &RoomId, inner: http::response::Room) -> SlidingSyncRoom {
234 new_room_with_timeline(room_id, inner, vec![])
235 }
236
237 fn new_room_with_timeline(
238 room_id: &RoomId,
239 inner: http::response::Room,
240 timeline: Vec<TimelineEvent>,
241 ) -> SlidingSyncRoom {
242 SlidingSyncRoom::new(room_id.to_owned(), inner.prev_batch, timeline)
243 }
244
245 #[async_test]
246 async fn test_state_from_not_loaded() {
247 let mut room = new_room(room_id!("!foo:bar.org"), room_response!({}));
248
249 assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
250
251 room.update(room_response!({}), vec![]);
253
254 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
255 }
256
257 #[async_test]
258 async fn test_state_from_preloaded() {
259 let mut room = new_room(room_id!("!foo:bar.org"), room_response!({}));
260
261 room.set_state(SlidingSyncRoomState::Preloaded);
262
263 room.update(room_response!({}), vec![]);
265
266 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
267 }
268
269 #[async_test]
270 async fn test_room_room_id() {
271 let room_id = room_id!("!foo:bar.org");
272 let room = new_room(room_id, room_response!({}));
273
274 assert_eq!(room.room_id(), room_id);
275 }
276
277 #[async_test]
278 async fn test_prev_batch() {
279 {
281 let room = new_room(room_id!("!foo:bar.org"), room_response!({}));
282
283 assert_eq!(room.prev_batch(), None);
284 }
285
286 {
288 let room =
289 new_room(room_id!("!foo:bar.org"), room_response!({"prev_batch": "t111_222_333"}));
290
291 assert_eq!(room.prev_batch(), Some("t111_222_333".to_owned()));
292 }
293
294 {
296 let mut room = new_room(room_id!("!foo:bar.org"), room_response!({}));
297
298 assert_eq!(room.prev_batch(), None);
299
300 room.update(room_response!({"prev_batch": "t111_222_333"}), vec![]);
301 assert_eq!(room.prev_batch(), Some("t111_222_333".to_owned()));
302
303 room.update(room_response!({}), vec![]);
304 assert_eq!(room.prev_batch(), Some("t111_222_333".to_owned()));
305 }
306 }
307
308 #[async_test]
309 async fn test_timeline_queue_initially_empty() {
310 let room = new_room(room_id!("!foo:bar.org"), room_response!({}));
311
312 assert!(room.timeline_queue().is_empty());
313 }
314
315 macro_rules! timeline_event {
316 (from $sender:literal with id $event_id:literal at $ts:literal: $message:literal) => {
317 TimelineEvent::new(
318 Raw::new(&json!({
319 "content": RoomMessageEventContent::text_plain($message),
320 "type": "m.room.message",
321 "event_id": $event_id,
322 "room_id": "!foo:bar.org",
323 "origin_server_ts": $ts,
324 "sender": $sender,
325 }))
326 .unwrap()
327 .cast()
328 )
329 };
330 }
331
332 macro_rules! assert_timeline_queue_event_ids {
333 (
334 with $( $timeline_queue:ident ).* {
335 $(
336 $nth:literal => $event_id:literal
337 ),*
338 $(,)*
339 }
340 ) => {
341 let timeline = & $( $timeline_queue ).*;
342
343 $(
344 assert_eq!(timeline[ $nth ].raw().deserialize().unwrap().event_id(), $event_id);
345 )*
346 };
347 }
348
349 #[test]
350 fn test_timeline_queue_initially_not_empty() {
351 let room = new_room_with_timeline(
352 room_id!("!foo:bar.org"),
353 room_response!({}),
354 vec![
355 timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
356 timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
357 ],
358 );
359
360 {
361 let timeline_queue = room.timeline_queue();
362
363 assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
364 assert_eq!(timeline_queue.len(), 2);
365 assert_timeline_queue_event_ids!(
366 with timeline_queue {
367 0 => "$x0:baz.org",
368 1 => "$x1:baz.org",
369 }
370 );
371 }
372 }
373
374 #[test]
375 fn test_timeline_queue_update_with_empty_timeline() {
376 let mut room = new_room_with_timeline(
377 room_id!("!foo:bar.org"),
378 room_response!({}),
379 vec![
380 timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
381 timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
382 ],
383 );
384
385 {
386 let timeline_queue = room.timeline_queue();
387
388 assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
389 assert_eq!(timeline_queue.len(), 2);
390 assert_timeline_queue_event_ids!(
391 with timeline_queue {
392 0 => "$x0:baz.org",
393 1 => "$x1:baz.org",
394 }
395 );
396 }
397
398 room.update(room_response!({}), vec![]);
399
400 {
402 let timeline_queue = room.timeline_queue();
403
404 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
405 assert_eq!(timeline_queue.len(), 2);
406 assert_timeline_queue_event_ids!(
407 with timeline_queue {
408 0 => "$x0:baz.org",
409 1 => "$x1:baz.org",
410 }
411 );
412 }
413 }
414
415 #[test]
416 fn test_timeline_queue_update_with_empty_timeline_and_with_limited() {
417 let mut room = new_room_with_timeline(
418 room_id!("!foo:bar.org"),
419 room_response!({}),
420 vec![
421 timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
422 timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
423 ],
424 );
425
426 {
427 let timeline_queue = room.timeline_queue();
428
429 assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
430 assert_eq!(timeline_queue.len(), 2);
431 assert_timeline_queue_event_ids!(
432 with timeline_queue {
433 0 => "$x0:baz.org",
434 1 => "$x1:baz.org",
435 }
436 );
437 }
438
439 room.update(
440 room_response!({
441 "limited": true
442 }),
443 vec![],
444 );
445
446 {
448 let timeline_queue = room.timeline_queue();
449
450 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
451 assert_eq!(timeline_queue.len(), 0);
452 }
453 }
454
455 #[test]
456 fn test_timeline_queue_update_from_preloaded() {
457 let mut room = new_room_with_timeline(
458 room_id!("!foo:bar.org"),
459 room_response!({}),
460 vec![
461 timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
462 timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
463 ],
464 );
465
466 room.set_state(SlidingSyncRoomState::Preloaded);
467
468 {
469 let timeline_queue = room.timeline_queue();
470
471 assert_eq!(room.state(), SlidingSyncRoomState::Preloaded);
472 assert_eq!(timeline_queue.len(), 2);
473 assert_timeline_queue_event_ids!(
474 with timeline_queue {
475 0 => "$x0:baz.org",
476 1 => "$x1:baz.org",
477 }
478 );
479 }
480
481 room.update(
482 room_response!({}),
483 vec![
484 timeline_event!(from "@alice:baz.org" with id "$x2:baz.org" at 2: "message 2"),
485 timeline_event!(from "@alice:baz.org" with id "$x3:baz.org" at 3: "message 3"),
486 ],
487 );
488
489 {
491 let timeline_queue = room.timeline_queue();
492
493 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
494 assert_eq!(timeline_queue.len(), 2);
495 assert_timeline_queue_event_ids!(
496 with timeline_queue {
497 0 => "$x2:baz.org",
498 1 => "$x3:baz.org",
499 }
500 );
501 }
502 }
503
504 #[test]
505 fn test_timeline_queue_update_from_not_loaded() {
506 let mut room = new_room_with_timeline(
507 room_id!("!foo:bar.org"),
508 room_response!({}),
509 vec![
510 timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
511 timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
512 ],
513 );
514
515 {
516 let timeline_queue = room.timeline_queue();
517
518 assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
519 assert_eq!(timeline_queue.len(), 2);
520 assert_timeline_queue_event_ids!(
521 with timeline_queue {
522 0 => "$x0:baz.org",
523 1 => "$x1:baz.org",
524 }
525 );
526 }
527
528 room.update(
529 room_response!({}),
530 vec![
531 timeline_event!(from "@alice:baz.org" with id "$x2:baz.org" at 2: "message 2"),
532 timeline_event!(from "@alice:baz.org" with id "$x3:baz.org" at 3: "message 3"),
533 ],
534 );
535
536 {
538 let timeline_queue = room.timeline_queue();
539
540 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
541 assert_eq!(timeline_queue.len(), 4);
542 assert_timeline_queue_event_ids!(
543 with timeline_queue {
544 0 => "$x0:baz.org",
545 1 => "$x1:baz.org",
546 2 => "$x2:baz.org",
547 3 => "$x3:baz.org",
548 }
549 );
550 }
551 }
552
553 #[test]
554 fn test_timeline_queue_update_from_not_loaded_with_limited() {
555 let mut room = new_room_with_timeline(
556 room_id!("!foo:bar.org"),
557 room_response!({}),
558 vec![
559 timeline_event!(from "@alice:baz.org" with id "$x0:baz.org" at 0: "message 0"),
560 timeline_event!(from "@alice:baz.org" with id "$x1:baz.org" at 1: "message 1"),
561 ],
562 );
563
564 {
565 let timeline_queue = room.timeline_queue();
566
567 assert_eq!(room.state(), SlidingSyncRoomState::NotLoaded);
568 assert_eq!(timeline_queue.len(), 2);
569 assert_timeline_queue_event_ids!(
570 with timeline_queue {
571 0 => "$x0:baz.org",
572 1 => "$x1:baz.org",
573 }
574
575 );
576 }
577
578 room.update(
579 room_response!({
580 "limited": true,
581 }),
582 vec![
583 timeline_event!(from "@alice:baz.org" with id "$x2:baz.org" at 2: "message 2"),
584 timeline_event!(from "@alice:baz.org" with id "$x3:baz.org" at 3: "message 3"),
585 ],
586 );
587
588 {
590 let timeline_queue = room.timeline_queue();
591
592 assert_eq!(room.state(), SlidingSyncRoomState::Loaded);
593 assert_eq!(timeline_queue.len(), 2);
594 assert_timeline_queue_event_ids!(
595 with timeline_queue {
596 0 => "$x2:baz.org",
597 1 => "$x3:baz.org",
598 }
599 );
600 }
601 }
602
603 #[test]
604 fn test_frozen_sliding_sync_room_serialization() {
605 let frozen_room = FrozenSlidingSyncRoom {
606 room_id: room_id!("!29fhd83h92h0:example.com").to_owned(),
607 prev_batch: Some("foo".to_owned()),
608 timeline_queue: vector![TimelineEvent::new(
609 Raw::new(&json!({
610 "content": RoomMessageEventContent::text_plain("let it gooo!"),
611 "type": "m.room.message",
612 "event_id": "$xxxxx:example.org",
613 "room_id": "!someroom:example.com",
614 "origin_server_ts": 2189,
615 "sender": "@bob:example.com",
616 }))
617 .unwrap()
618 .cast(),
619 )],
620 };
621
622 let serialized = serde_json::to_value(&frozen_room).unwrap();
623
624 assert_eq!(
625 serialized,
626 json!({
627 "room_id": "!29fhd83h92h0:example.com",
628 "prev_batch": "foo",
629 "timeline": [
630 {
631 "kind": { "PlainText": { "event": {
632 "content": {
633 "body": "let it gooo!",
634 "msgtype": "m.text"
635 },
636 "event_id": "$xxxxx:example.org",
637 "origin_server_ts": 2189,
638 "room_id": "!someroom:example.com",
639 "sender": "@bob:example.com",
640 "type": "m.room.message"
641 }}}
642 }
643 ]
644 })
645 );
646
647 let deserialized = serde_json::from_value::<FrozenSlidingSyncRoom>(serialized).unwrap();
648
649 assert_eq!(deserialized.room_id, frozen_room.room_id);
650 }
651
652 #[test]
653 fn test_frozen_sliding_sync_room_has_a_capped_version_of_the_timeline() {
654 {
656 let max = NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE - 1;
657 let timeline_events = (0..=max)
658 .map(|nth| {
659 TimelineEvent::new(
660 Raw::new(&json!({
661 "content": RoomMessageEventContent::text_plain(format!("message {nth}")),
662 "type": "m.room.message",
663 "event_id": format!("$x{nth}:baz.org"),
664 "room_id": "!foo:bar.org",
665 "origin_server_ts": nth,
666 "sender": "@alice:baz.org",
667 }))
668 .unwrap()
669 .cast(),
670 )
671 })
672 .collect::<Vec<_>>();
673
674 let room = new_room_with_timeline(
675 room_id!("!foo:bar.org"),
676 room_response!({}),
677 timeline_events,
678 );
679
680 let frozen_room = FrozenSlidingSyncRoom::from(&room);
681 assert_eq!(frozen_room.timeline_queue.len(), max + 1);
682 assert_eq!(
685 frozen_room.timeline_queue.last().unwrap().raw().deserialize().unwrap().event_id(),
686 &format!("$x{max}:baz.org")
687 );
688 }
689
690 {
692 let max = NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE + 2;
693 let timeline_events = (0..=max)
694 .map(|nth| {
695 TimelineEvent::new(
696 Raw::new(&json!({
697 "content": RoomMessageEventContent::text_plain(format!("message {nth}")),
698 "type": "m.room.message",
699 "event_id": format!("$x{nth}:baz.org"),
700 "room_id": "!foo:bar.org",
701 "origin_server_ts": nth,
702 "sender": "@alice:baz.org",
703 }))
704 .unwrap()
705 .cast(),
706 )
707 })
708 .collect::<Vec<_>>();
709
710 let room = new_room_with_timeline(
711 room_id!("!foo:bar.org"),
712 room_response!({}),
713 timeline_events,
714 );
715
716 let frozen_room = FrozenSlidingSyncRoom::from(&room);
717 assert_eq!(
718 frozen_room.timeline_queue.len(),
719 NUMBER_OF_TIMELINE_EVENTS_TO_KEEP_FOR_THE_CACHE
720 );
721 assert_eq!(
724 frozen_room.timeline_queue.last().unwrap().raw().deserialize().unwrap().event_id(),
725 &format!("$x{max}:baz.org")
726 );
727 }
728 }
729}