matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5    RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9    OwnedRoomId,
10    api::{
11        FeatureFlag, SupportedVersions,
12        client::sync::sync_events::v5::{self as http, response},
13    },
14    events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result, sync::subscribe_to_room_latest_events};
20
21/// A sliding sync version.
22#[derive(Clone, Debug)]
23pub enum Version {
24    /// No version. Useful to represent that sliding sync is disabled for
25    /// example, and that the version is unknown.
26    None,
27
28    /// Use the version of the sliding sync implementation inside Synapse, i.e.
29    /// MSC4186.
30    Native,
31}
32
33impl Version {
34    #[cfg(test)]
35    pub(crate) fn is_native(&self) -> bool {
36        matches!(self, Self::Native)
37    }
38}
39
40/// An error when building a version.
41#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43    /// The `.well-known` response is not set.
44    #[error("`.well-known` is not set")]
45    WellKnownNotSet,
46
47    /// The `/versions` response is not set.
48    #[error("The `/versions` response is not set")]
49    MissingVersionsResponse,
50
51    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
52    /// `unstable_features`, or it's not set to true.
53    #[error(
54        "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55         or it's not set to true."
56    )]
57    NativeVersionIsUnset,
58}
59
60/// A builder for [`Version`].
61#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63    /// Build a [`Version::None`].
64    None,
65
66    /// Build a [`Version::Native`].
67    Native,
68
69    /// Build a [`Version::Native`] by auto-discovering it.
70    ///
71    /// It is available if the server enables it via `/versions`.
72    DiscoverNative,
73}
74
75impl VersionBuilder {
76    pub(crate) fn needs_get_supported_versions(&self) -> bool {
77        matches!(self, Self::DiscoverNative)
78    }
79
80    /// Build a [`Version`].
81    ///
82    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
83    /// invalid data.
84    pub fn build(
85        self,
86        supported: Option<&SupportedVersions>,
87    ) -> Result<Version, VersionBuilderError> {
88        Ok(match self {
89            Self::None => Version::None,
90
91            Self::Native => Version::Native,
92
93            Self::DiscoverNative => {
94                let Some(supported) = supported else {
95                    return Err(VersionBuilderError::MissingVersionsResponse);
96                };
97
98                if supported.features.contains(&FeatureFlag::Msc4186) {
99                    Version::Native
100                } else {
101                    return Err(VersionBuilderError::NativeVersionIsUnset);
102                }
103            }
104        })
105    }
106}
107
108impl Client {
109    /// Find all sliding sync versions that are available.
110    ///
111    /// Be careful: This method may hit the store and will send new requests for
112    /// each call. It can be costly to call it repeatedly.
113    ///
114    /// If `.well-known` or `/versions` is unreachable, it will simply move
115    /// potential sliding sync versions aside. No error will be reported.
116    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117        let supported_versions = self.supported_versions().await.ok();
118
119        [VersionBuilder::DiscoverNative]
120            .into_iter()
121            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122            .collect()
123    }
124
125    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
126    /// identifier.
127    ///
128    /// Note: the identifier must not be more than 16 chars long!
129    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130        Ok(SlidingSync::builder(id.into(), self.clone())?)
131    }
132
133    /// Handle all the information provided in a sliding sync response, except
134    /// for the e2ee bits.
135    ///
136    /// If you need to handle encryption too, use the internal
137    /// `SlidingSyncResponseProcessor` instead.
138    #[cfg(any(test, feature = "testing"))]
139    #[tracing::instrument(skip(self, response))]
140    pub async fn process_sliding_sync_test_helper(
141        &self,
142        response: &http::Response,
143        requested_required_states: &RequestedRequiredStates,
144    ) -> Result<SyncResponse> {
145        let response =
146            self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148        tracing::debug!("done processing on base_client");
149        self.call_sync_response_handlers(&response).await?;
150
151        Ok(response)
152    }
153}
154
155/// Small helper to handle a `SlidingSync` response's sub parts.
156///
157/// This will properly handle the encryption and the room response
158/// independently, if needs be, making sure that both are properly processed by
159/// event handlers.
160#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162    client: Client,
163    to_device_events: Vec<ProcessedToDeviceEvent>,
164    response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168    pub fn new(client: Client) -> Self {
169        Self { client, to_device_events: Vec::new(), response: None }
170    }
171
172    #[cfg(feature = "e2e-encryption")]
173    pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174        // This is an internal API misuse if this is triggered (calling
175        // `handle_room_response` before this function), so panic is fine.
176        assert!(self.response.is_none());
177
178        self.to_device_events = if let Some(to_device_events) = self
179            .client
180            .base_client()
181            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182            .await?
183        {
184            // Some new keys might have been received, so trigger a backup if needed.
185            self.client.encryption().backups().maybe_trigger_backup();
186
187            to_device_events
188        } else {
189            Vec::new()
190        };
191
192        Ok(())
193    }
194
195    pub async fn handle_room_response(
196        &mut self,
197        response: &http::Response,
198        requested_required_states: &RequestedRequiredStates,
199    ) -> Result<()> {
200        subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
201
202        let previously_joined_rooms = self
203            .client
204            .joined_rooms()
205            .into_iter()
206            .map(|r| r.room_id().to_owned())
207            .collect::<BTreeSet<_>>();
208
209        let mut sync_response = self
210            .client
211            .base_client()
212            .process_sliding_sync(response, requested_required_states)
213            .await?;
214
215        handle_receipts_extension(&self.client, response, &mut sync_response).await?;
216
217        update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
218
219        self.response = Some(sync_response);
220
221        Ok(())
222    }
223
224    pub async fn handle_thread_subscriptions(
225        &mut self,
226        previous_pos: Option<&str>,
227        thread_subs: response::ThreadSubscriptions,
228    ) -> Result<()> {
229        let catchup_token =
230            thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
231                from: prev_batch,
232                to: previous_pos.map(|s| s.to_owned()),
233            });
234
235        self.client
236            .thread_subscription_catchup()
237            .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
238            .await?;
239
240        Ok(())
241    }
242
243    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
244        let mut response = self.response.take().unwrap_or_default();
245
246        response.to_device.extend(self.to_device_events);
247
248        self.client.call_sync_response_handlers(&response).await?;
249
250        Ok(response)
251    }
252}
253
254/// Update the caches for the rooms that received updates.
255///
256/// This will only fill the in-memory caches, not save the info on disk.
257async fn update_in_memory_caches(
258    client: &Client,
259    previously_joined_rooms: &BTreeSet<OwnedRoomId>,
260    response: &SyncResponse,
261) {
262    let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
263
264    // If the push rules have changed, update the cached notification mode for *all*
265    // the joined rooms.
266    if response.account_data.iter().any(|event| {
267        event
268            .get_field::<GlobalAccountDataEventType>("type")
269            .ok()
270            .flatten()
271            .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
272    }) {
273        let notification_settings = client.notification_settings().await;
274        let rules = notification_settings.rules().await;
275
276        // Update all joined rooms.
277        for room in client.joined_rooms() {
278            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
279                room.update_cached_user_defined_notification_mode(mode);
280            }
281        }
282    } else {
283        // Otherwise, precompute the cached user-defined notification mode only for the
284        // newly joined rooms.
285
286        // We'll compute the rules only once, lazily, if needs be.
287        let mut rules = None;
288
289        for room_id in response
290            .rooms
291            .joined
292            .keys()
293            .filter(|room_id| !previously_joined_rooms.contains(*room_id))
294        {
295            let Some(room) = client.get_room(room_id) else {
296                error!(?room_id, "The room must exist since it has been joined");
297                continue;
298            };
299
300            // Reuse the previous `Rules` instance, or compute it once and for all.
301            let rules = if let Some(rules) = &mut rules {
302                rules
303            } else {
304                rules.insert(client.notification_settings().await.rules().await.clone())
305            };
306
307            // Define an initial value for the cached user-defined notification mode.
308            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
309                room.update_cached_user_defined_notification_mode(mode);
310            }
311        }
312    }
313}
314
315/// Update the receipts extension and compute the read receipt accordingly.
316async fn handle_receipts_extension(
317    client: &Client,
318    response: &http::Response,
319    sync_response: &mut SyncResponse,
320) -> Result<()> {
321    let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
322
323    // We need to compute read receipts for each joined room that has received an
324    // update, or from each room that has received a receipt ephemeral event.
325    let room_ids = BTreeSet::from_iter(
326        sync_response
327            .rooms
328            .joined
329            .keys()
330            .cloned()
331            .chain(response.extensions.receipts.rooms.keys().cloned()),
332    );
333
334    // Process each room concurrently.
335    let futures = room_ids.into_iter().map(|room_id| {
336        let new_sync_events = sync_response
337            .rooms
338            .joined
339            .entry(room_id.to_owned())
340            .or_default()
341            .timeline
342            .events
343            .clone();
344
345        async {
346            let Ok((room_event_cache, _drop_handle)) =
347                client.event_cache().for_room(&room_id).await
348            else {
349                tracing::info!(
350                    ?room_id,
351                    "Failed to fetch the `RoomEventCache` when computing unread counts"
352                );
353                return Ok::<_, crate::Error>(None);
354            };
355
356            let previous_events = room_event_cache.events().await;
357
358            let receipt_event = client
359                .base_client()
360                .process_sliding_sync_receipts_extension_for_room(
361                    &room_id,
362                    response,
363                    new_sync_events,
364                    previous_events,
365                )
366                .await?;
367
368            Ok(Some((room_id, receipt_event)))
369        }
370    });
371
372    let updates = try_join_all(futures).await?;
373
374    for (room_id, receipt_event_content) in updates.into_iter().flatten() {
375        if let Some(event) = receipt_event_content {
376            sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
377        }
378    }
379
380    Ok(())
381}
382
383#[cfg(all(test, not(target_family = "wasm")))]
384mod tests {
385    use std::{collections::BTreeMap, ops::Not};
386
387    use assert_matches::assert_matches;
388    use matrix_sdk_base::{
389        RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
390        notification_settings::RoomNotificationMode,
391    };
392    use matrix_sdk_test::async_test;
393    use ruma::{
394        api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
395        room_id, serde::Raw,
396    };
397    use serde_json::json;
398
399    use super::{Version, VersionBuilder};
400    use crate::{
401        SlidingSyncList, SlidingSyncMode,
402        error::Result,
403        sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
404        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
405    };
406
407    #[test]
408    fn test_version_builder_none() {
409        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
410    }
411
412    #[test]
413    fn test_version_builder_native() {
414        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
415    }
416
417    #[test]
418    fn test_version_builder_discover_native() {
419        let mut response = get_supported_versions::Response::new(vec![]);
420        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
421
422        assert_matches!(
423            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
424            Ok(Version::Native)
425        );
426    }
427
428    #[test]
429    fn test_version_builder_discover_native_no_supported_versions() {
430        assert_matches!(
431            VersionBuilder::DiscoverNative.build(None),
432            Err(VersionBuilderError::MissingVersionsResponse)
433        );
434    }
435
436    #[test]
437    fn test_version_builder_discover_native_unstable_features_is_disabled() {
438        let mut response = get_supported_versions::Response::new(vec![]);
439        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
440
441        assert_matches!(
442            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
443            Err(VersionBuilderError::NativeVersionIsUnset)
444        );
445    }
446
447    #[async_test]
448    async fn test_available_sliding_sync_versions_none() {
449        let client = MockClientBuilder::new(None).build().await;
450        let available_versions = client.available_sliding_sync_versions().await;
451
452        // `.well-known` and `/versions` aren't available. It's impossible to find any
453        // versions.
454        assert!(available_versions.is_empty());
455    }
456
457    #[async_test]
458    async fn test_available_sliding_sync_versions_native() {
459        let server = MatrixMockServer::new().await;
460        let client = server.client_builder().no_server_versions().build().await;
461
462        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
463
464        let available_versions = client.available_sliding_sync_versions().await;
465
466        // `/versions` is available.
467        assert_eq!(available_versions.len(), 1);
468        assert_matches!(available_versions[0], Version::Native);
469    }
470
471    #[async_test]
472    async fn test_cache_user_defined_notification_mode() -> Result<()> {
473        let client = MockClientBuilder::new(None).build().await;
474        let room_id = room_id!("!r0:matrix.org");
475
476        let sliding_sync = client
477            .sliding_sync("test")?
478            .with_account_data_extension(
479                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
480            )
481            .add_list(
482                SlidingSyncList::builder("all")
483                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
484            )
485            .build()
486            .await?;
487
488        // Mock a sync response.
489        // A `m.push_rules` with `room` is cached during the sync.
490        {
491            let server_response = assign!(http::Response::new("0".to_owned()), {
492                rooms: BTreeMap::from([(
493                    room_id.to_owned(),
494                    http::response::Room::default(),
495                )]),
496                extensions: assign!(http::response::Extensions::default(), {
497                    account_data: assign!(http::response::AccountData::default(), {
498                        global: vec![
499                            Raw::from_json_string(
500                                json!({
501                                    "type": "m.push_rules",
502                                    "content": {
503                                        "global": {
504                                            "room": [
505                                                {
506                                                    "actions": ["notify"],
507                                                    "rule_id": room_id,
508                                                    "default": false,
509                                                    "enabled": true,
510                                                },
511                                            ],
512                                        },
513                                    },
514                                })
515                                .to_string(),
516                            ).unwrap()
517                        ]
518                    })
519                })
520            });
521
522            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
523            sliding_sync
524                .handle_response(
525                    server_response.clone(),
526                    &mut pos_guard,
527                    RequestedRequiredStates::default(),
528                )
529                .await?;
530        }
531
532        // The room must exist, since it's been synced.
533        let room = client.get_room(room_id).unwrap();
534
535        // The room has a cached user-defined notification mode.
536        assert_eq!(
537            room.cached_user_defined_notification_mode(),
538            Some(RoomNotificationMode::AllMessages),
539        );
540
541        // Mock a sync response.
542        // A `m.push_rules` with `room` is cached during the sync.
543        // It overwrites the previous cache.
544        {
545            let server_response = assign!(http::Response::new("0".to_owned()), {
546                rooms: BTreeMap::from([(
547                    room_id.to_owned(),
548                    http::response::Room::default(),
549                )]),
550                extensions: assign!(http::response::Extensions::default(), {
551                    account_data: assign!(http::response::AccountData::default(), {
552                        global: vec![
553                            Raw::from_json_string(
554                                json!({
555                                    "type": "m.push_rules",
556                                    "content": {
557                                        "global": {
558                                            "room": [
559                                                {
560                                                    "actions": [],
561                                                    "rule_id": room_id,
562                                                    "default": false,
563                                                    "enabled": true,
564                                                },
565                                            ],
566                                        },
567                                    },
568                                })
569                                .to_string(),
570                            ).unwrap()
571                        ]
572                    })
573                })
574            });
575
576            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
577            sliding_sync
578                .handle_response(
579                    server_response.clone(),
580                    &mut pos_guard,
581                    RequestedRequiredStates::default(),
582                )
583                .await?;
584        }
585
586        // The room has an updated cached user-defined notification mode.
587        assert_eq!(
588            room.cached_user_defined_notification_mode(),
589            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
590        );
591
592        // Mock a sync response.
593        // Even if the room doesn't appear in the response, its notification mode will
594        // be updated immediately if a new `m.push_rules` is received.
595        {
596            let server_response = assign!(http::Response::new("0".to_owned()), {
597                extensions: assign!(http::response::Extensions::default(), {
598                    account_data: assign!(http::response::AccountData::default(), {
599                        global: vec![
600                            Raw::from_json_string(
601                                json!({
602                                    "type": "m.push_rules",
603                                    "content": {
604                                        "global": {
605                                            "room": [
606                                                {
607                                                    "actions": ["notify"],
608                                                    "rule_id": room_id,
609                                                    "default": false,
610                                                    "enabled": true,
611                                                },
612                                            ],
613                                        },
614                                    },
615                                })
616                                .to_string(),
617                            ).unwrap()
618                        ]
619                    })
620                })
621            });
622
623            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
624            sliding_sync
625                .handle_response(
626                    server_response.clone(),
627                    &mut pos_guard,
628                    RequestedRequiredStates::default(),
629                )
630                .await?;
631        }
632
633        // The room notification mode has been updated again!
634        assert_eq!(
635            room.cached_user_defined_notification_mode(),
636            Some(RoomNotificationMode::AllMessages),
637        );
638
639        Ok(())
640    }
641
642    #[async_test]
643    async fn test_auto_listen_to_latest_events() -> Result<()> {
644        let client = MockClientBuilder::new(None).build().await;
645        let room_id = room_id!("!r0");
646
647        // Create the room beforehand.
648        client.base_client().get_or_create_room(room_id, RoomState::Joined);
649
650        // Enable the event cache (required for the latest events).
651        client.event_cache().subscribe()?;
652
653        // The latest event “listener” for this room has NOT been enabled.
654        assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
655
656        // Create the sliding sync client.
657        let sliding_sync = client
658            .sliding_sync("test")?
659            .add_list(
660                SlidingSyncList::builder("all")
661                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
662            )
663            .build()
664            .await?;
665
666        // Receive a (mocked) response.
667        {
668            let server_response = assign!(http::Response::new("0".to_owned()), {
669                rooms: BTreeMap::from([(
670                    room_id.to_owned(),
671                    http::response::Room::default(),
672                )]),
673            });
674
675            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
676
677            sliding_sync
678                .handle_response(
679                    server_response.clone(),
680                    &mut pos_guard,
681                    RequestedRequiredStates::default(),
682                )
683                .await?;
684        }
685
686        // The room still exists.
687        assert!(client.get_room(room_id).is_some());
688
689        // The latest event “listener” for this room has been enabled.
690        assert!(client.latest_events().await.is_listening_to_room(room_id).await);
691
692        Ok(())
693    }
694
695    #[async_test]
696    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
697        use ruma::api::client::sync::sync_events::v5 as http;
698
699        // Given a logged-in client.
700        let client = MockClientBuilder::new(None).build().await;
701        client.event_cache().subscribe().unwrap();
702
703        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
704
705        // When I send sliding sync response containing a new room.
706        let room_id = room_id!("!r:e.uk");
707        let room = http::response::Room::new();
708        let mut response = http::Response::new("5".to_owned());
709        response.rooms.insert(room_id.to_owned(), room);
710
711        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
712        processor
713            .handle_room_response(&response, &RequestedRequiredStates::default())
714            .await
715            .expect("Failed to process sync");
716        processor.process_and_take_response().await.expect("Failed to finish processing sync");
717
718        // Then room info notable updates are received.
719        assert_matches!(
720            room_info_notable_update_stream.recv().await,
721            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
722                assert_eq!(received_room_id, room_id);
723                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
724            }
725        );
726        assert_matches!(
727            room_info_notable_update_stream.recv().await,
728            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
729                assert_eq!(received_room_id, room_id);
730                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
731            }
732        );
733        assert!(room_info_notable_update_stream.is_empty());
734
735        // When I send sliding sync response containing a couple of events with no read
736        // receipt.
737        let room_id = room_id!("!r:e.uk");
738        let events = vec![
739            make_raw_event("m.room.message", "$3"),
740            make_raw_event("m.room.message", "$4"),
741            make_raw_event("m.read", "$5"),
742        ];
743        let room = assign!(http::response::Room::new(), {
744            timeline: events,
745        });
746        let mut response = http::Response::new("5".to_owned());
747        response.rooms.insert(room_id.to_owned(), room);
748
749        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
750        processor
751            .handle_room_response(&response, &RequestedRequiredStates::default())
752            .await
753            .expect("Failed to process sync");
754        processor.process_and_take_response().await.expect("Failed to finish processing sync");
755
756        // Then room info notable updates are received.
757        //
758        // `NONE` because the regular sync process ends up to updating nothing.
759        assert_matches!(
760            room_info_notable_update_stream.recv().await,
761            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
762                assert_eq!(received_room_id, room_id);
763                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
764            }
765        );
766        // `READ_RECEIPT` because this is what we expect.
767        assert_matches!(
768            room_info_notable_update_stream.recv().await,
769            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
770                assert_eq!(received_room_id, room_id);
771                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
772            }
773        );
774        assert!(room_info_notable_update_stream.is_empty());
775    }
776
777    fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
778        Raw::from_json_string(
779            json!({
780                "type": event_type,
781                "event_id": id,
782                "content": { "msgtype": "m.text", "body": "my msg" },
783                "sender": "@u:h.uk",
784                "origin_server_ts": 12344445,
785            })
786            .to_string(),
787        )
788        .unwrap()
789    }
790}