matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5    RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9    OwnedRoomId,
10    api::{
11        FeatureFlag, SupportedVersions,
12        client::sync::sync_events::v5::{self as http, response},
13    },
14    events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result, sync::subscribe_to_room_latest_events};
20
21/// A sliding sync version.
22#[derive(Clone, Debug)]
23pub enum Version {
24    /// No version. Useful to represent that sliding sync is disabled for
25    /// example, and that the version is unknown.
26    None,
27
28    /// Use the version of the sliding sync implementation inside Synapse, i.e.
29    /// MSC4186.
30    Native,
31}
32
33impl Version {
34    #[cfg(test)]
35    pub(crate) fn is_native(&self) -> bool {
36        matches!(self, Self::Native)
37    }
38}
39
40/// An error when building a version.
41#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43    /// The `.well-known` response is not set.
44    #[error("`.well-known` is not set")]
45    WellKnownNotSet,
46
47    /// The `/versions` response is not set.
48    #[error("The `/versions` response is not set")]
49    MissingVersionsResponse,
50
51    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
52    /// `unstable_features`, or it's not set to true.
53    #[error(
54        "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55         or it's not set to true."
56    )]
57    NativeVersionIsUnset,
58}
59
60/// A builder for [`Version`].
61#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63    /// Build a [`Version::None`].
64    None,
65
66    /// Build a [`Version::Native`].
67    Native,
68
69    /// Build a [`Version::Native`] by auto-discovering it.
70    ///
71    /// It is available if the server enables it via `/versions`.
72    DiscoverNative,
73}
74
75impl VersionBuilder {
76    pub(crate) fn needs_get_supported_versions(&self) -> bool {
77        matches!(self, Self::DiscoverNative)
78    }
79
80    /// Build a [`Version`].
81    ///
82    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
83    /// invalid data.
84    pub fn build(
85        self,
86        supported: Option<&SupportedVersions>,
87    ) -> Result<Version, VersionBuilderError> {
88        Ok(match self {
89            Self::None => Version::None,
90
91            Self::Native => Version::Native,
92
93            Self::DiscoverNative => {
94                let Some(supported) = supported else {
95                    return Err(VersionBuilderError::MissingVersionsResponse);
96                };
97
98                if supported.features.contains(&FeatureFlag::Msc4186) {
99                    Version::Native
100                } else {
101                    return Err(VersionBuilderError::NativeVersionIsUnset);
102                }
103            }
104        })
105    }
106}
107
108impl Client {
109    /// Find all sliding sync versions that are available.
110    ///
111    /// Be careful: This method may hit the store and will send new requests for
112    /// each call. It can be costly to call it repeatedly.
113    ///
114    /// If `.well-known` or `/versions` is unreachable, it will simply move
115    /// potential sliding sync versions aside. No error will be reported.
116    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117        let supported_versions = self.supported_versions().await.ok();
118
119        [VersionBuilder::DiscoverNative]
120            .into_iter()
121            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122            .collect()
123    }
124
125    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
126    /// identifier.
127    ///
128    /// Note: the identifier must not be more than 16 chars long!
129    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130        Ok(SlidingSync::builder(id.into(), self.clone())?)
131    }
132
133    /// Handle all the information provided in a sliding sync response, except
134    /// for the e2ee bits.
135    ///
136    /// If you need to handle encryption too, use the internal
137    /// `SlidingSyncResponseProcessor` instead.
138    #[cfg(any(test, feature = "testing"))]
139    #[tracing::instrument(skip(self, response))]
140    pub async fn process_sliding_sync_test_helper(
141        &self,
142        response: &http::Response,
143        requested_required_states: &RequestedRequiredStates,
144    ) -> Result<SyncResponse> {
145        let response =
146            self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148        tracing::debug!("done processing on base_client");
149        self.call_sync_response_handlers(&response).await?;
150
151        Ok(response)
152    }
153}
154
155/// Small helper to handle a `SlidingSync` response's sub parts.
156///
157/// This will properly handle the encryption and the room response
158/// independently, if needs be, making sure that both are properly processed by
159/// event handlers.
160#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162    client: Client,
163    to_device_events: Vec<ProcessedToDeviceEvent>,
164    response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168    pub fn new(client: Client) -> Self {
169        Self { client, to_device_events: Vec::new(), response: None }
170    }
171
172    #[cfg(feature = "e2e-encryption")]
173    pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174        // This is an internal API misuse if this is triggered (calling
175        // `handle_room_response` before this function), so panic is fine.
176        assert!(self.response.is_none());
177
178        self.to_device_events = if let Some(to_device_events) = self
179            .client
180            .base_client()
181            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182            .await?
183        {
184            // Some new keys might have been received, so trigger a backup if needed.
185            self.client.encryption().backups().maybe_trigger_backup();
186
187            to_device_events
188        } else {
189            Vec::new()
190        };
191
192        Ok(())
193    }
194
195    pub async fn handle_room_response(
196        &mut self,
197        response: &http::Response,
198        requested_required_states: &RequestedRequiredStates,
199    ) -> Result<()> {
200        subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
201
202        let previously_joined_rooms = self
203            .client
204            .joined_rooms()
205            .into_iter()
206            .map(|r| r.room_id().to_owned())
207            .collect::<BTreeSet<_>>();
208
209        let mut sync_response = self
210            .client
211            .base_client()
212            .process_sliding_sync(response, requested_required_states)
213            .await?;
214
215        handle_receipts_extension(&self.client, response, &mut sync_response).await?;
216
217        update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
218
219        self.response = Some(sync_response);
220
221        Ok(())
222    }
223
224    pub async fn handle_thread_subscriptions(
225        &mut self,
226        previous_pos: Option<&str>,
227        thread_subs: response::ThreadSubscriptions,
228    ) -> Result<()> {
229        let catchup_token =
230            thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
231                from: prev_batch,
232                to: previous_pos.map(|s| s.to_owned()),
233            });
234
235        self.client
236            .thread_subscription_catchup()
237            .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
238            .await?;
239
240        Ok(())
241    }
242
243    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
244        let mut response = self.response.take().unwrap_or_default();
245
246        response.to_device.extend(self.to_device_events);
247
248        self.client.call_sync_response_handlers(&response).await?;
249
250        Ok(response)
251    }
252}
253
254/// Update the caches for the rooms that received updates.
255///
256/// This will only fill the in-memory caches, not save the info on disk.
257async fn update_in_memory_caches(
258    client: &Client,
259    previously_joined_rooms: &BTreeSet<OwnedRoomId>,
260    response: &SyncResponse,
261) {
262    let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
263
264    // If the push rules have changed, update the cached notification mode for *all*
265    // the joined rooms.
266    if response.account_data.iter().any(|event| {
267        event
268            .get_field::<GlobalAccountDataEventType>("type")
269            .ok()
270            .flatten()
271            .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
272    }) {
273        let notification_settings = client.notification_settings().await;
274        let rules = notification_settings.rules().await;
275
276        // Update all joined rooms.
277        for room in client.joined_rooms() {
278            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
279                room.update_cached_user_defined_notification_mode(mode);
280            } else {
281                room.clear_user_defined_notification_mode();
282            }
283        }
284    } else {
285        // Otherwise, precompute the cached user-defined notification mode only for the
286        // newly joined rooms.
287
288        // We'll compute the rules only once, lazily, if needs be.
289        let mut rules = None;
290
291        for room_id in response
292            .rooms
293            .joined
294            .keys()
295            .filter(|room_id| !previously_joined_rooms.contains(*room_id))
296        {
297            let Some(room) = client.get_room(room_id) else {
298                error!(?room_id, "The room must exist since it has been joined");
299                continue;
300            };
301
302            // Reuse the previous `Rules` instance, or compute it once and for all.
303            let rules = if let Some(rules) = &mut rules {
304                rules
305            } else {
306                rules.insert(client.notification_settings().await.rules().await.clone())
307            };
308
309            // Define an initial value for the cached user-defined notification mode.
310            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
311                room.update_cached_user_defined_notification_mode(mode);
312            }
313        }
314    }
315}
316
317/// Update the receipts extension and compute the read receipt accordingly.
318async fn handle_receipts_extension(
319    client: &Client,
320    response: &http::Response,
321    sync_response: &mut SyncResponse,
322) -> Result<()> {
323    let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
324
325    // We need to compute read receipts for each joined room that has received an
326    // update, or from each room that has received a receipt ephemeral event.
327    let room_ids = BTreeSet::from_iter(
328        sync_response
329            .rooms
330            .joined
331            .keys()
332            .cloned()
333            .chain(response.extensions.receipts.rooms.keys().cloned()),
334    );
335
336    // Process each room concurrently.
337    let futures = room_ids.into_iter().map(|room_id| {
338        let new_sync_events = sync_response
339            .rooms
340            .joined
341            .entry(room_id.to_owned())
342            .or_default()
343            .timeline
344            .events
345            .clone();
346
347        async {
348            let Ok((room_event_cache, _drop_handle)) =
349                client.event_cache().for_room(&room_id).await
350            else {
351                tracing::info!(
352                    ?room_id,
353                    "Failed to fetch the `RoomEventCache` when computing unread counts"
354                );
355                return Ok::<_, crate::Error>(None);
356            };
357
358            let previous_events = room_event_cache.events().await;
359
360            let receipt_event = client
361                .base_client()
362                .process_sliding_sync_receipts_extension_for_room(
363                    &room_id,
364                    response,
365                    new_sync_events,
366                    previous_events,
367                )
368                .await?;
369
370            Ok(Some((room_id, receipt_event)))
371        }
372    });
373
374    let updates = try_join_all(futures).await?;
375
376    for (room_id, receipt_event_content) in updates.into_iter().flatten() {
377        if let Some(event) = receipt_event_content {
378            sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
379        }
380    }
381
382    Ok(())
383}
384
385#[cfg(all(test, not(target_family = "wasm")))]
386mod tests {
387    use std::{collections::BTreeMap, ops::Not};
388
389    use assert_matches::assert_matches;
390    use matrix_sdk_base::{
391        RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
392        notification_settings::RoomNotificationMode,
393    };
394    use matrix_sdk_test::async_test;
395    use ruma::{
396        api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
397        room_id, serde::Raw,
398    };
399    use serde_json::json;
400
401    use super::{Version, VersionBuilder};
402    use crate::{
403        SlidingSyncList, SlidingSyncMode,
404        error::Result,
405        sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
406        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
407    };
408
409    #[test]
410    fn test_version_builder_none() {
411        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
412    }
413
414    #[test]
415    fn test_version_builder_native() {
416        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
417    }
418
419    #[test]
420    fn test_version_builder_discover_native() {
421        let mut response = get_supported_versions::Response::new(vec![]);
422        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
423
424        assert_matches!(
425            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
426            Ok(Version::Native)
427        );
428    }
429
430    #[test]
431    fn test_version_builder_discover_native_no_supported_versions() {
432        assert_matches!(
433            VersionBuilder::DiscoverNative.build(None),
434            Err(VersionBuilderError::MissingVersionsResponse)
435        );
436    }
437
438    #[test]
439    fn test_version_builder_discover_native_unstable_features_is_disabled() {
440        let mut response = get_supported_versions::Response::new(vec![]);
441        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
442
443        assert_matches!(
444            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
445            Err(VersionBuilderError::NativeVersionIsUnset)
446        );
447    }
448
449    #[async_test]
450    async fn test_available_sliding_sync_versions_none() {
451        let client = MockClientBuilder::new(None).build().await;
452        let available_versions = client.available_sliding_sync_versions().await;
453
454        // `.well-known` and `/versions` aren't available. It's impossible to find any
455        // versions.
456        assert!(available_versions.is_empty());
457    }
458
459    #[async_test]
460    async fn test_available_sliding_sync_versions_native() {
461        let server = MatrixMockServer::new().await;
462        let client = server.client_builder().no_server_versions().build().await;
463
464        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
465
466        let available_versions = client.available_sliding_sync_versions().await;
467
468        // `/versions` is available.
469        assert_eq!(available_versions.len(), 1);
470        assert_matches!(available_versions[0], Version::Native);
471    }
472
473    #[async_test]
474    async fn test_cache_user_defined_notification_mode() -> Result<()> {
475        let client = MockClientBuilder::new(None).build().await;
476        let room_id = room_id!("!r0:matrix.org");
477
478        let sliding_sync = client
479            .sliding_sync("test")?
480            .with_account_data_extension(
481                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
482            )
483            .add_list(
484                SlidingSyncList::builder("all")
485                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
486            )
487            .build()
488            .await?;
489
490        // Mock a sync response.
491        // A `m.push_rules` with `room` is cached during the sync.
492        {
493            let server_response = assign!(http::Response::new("0".to_owned()), {
494                rooms: BTreeMap::from([(
495                    room_id.to_owned(),
496                    http::response::Room::default(),
497                )]),
498                extensions: assign!(http::response::Extensions::default(), {
499                    account_data: assign!(http::response::AccountData::default(), {
500                        global: vec![
501                            Raw::from_json_string(
502                                json!({
503                                    "type": "m.push_rules",
504                                    "content": {
505                                        "global": {
506                                            "room": [
507                                                {
508                                                    "actions": ["notify"],
509                                                    "rule_id": room_id,
510                                                    "default": false,
511                                                    "enabled": true,
512                                                },
513                                            ],
514                                        },
515                                    },
516                                })
517                                .to_string(),
518                            ).unwrap()
519                        ]
520                    })
521                })
522            });
523
524            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
525            sliding_sync
526                .handle_response(
527                    server_response.clone(),
528                    &mut pos_guard,
529                    RequestedRequiredStates::default(),
530                )
531                .await?;
532        }
533
534        // The room must exist, since it's been synced.
535        let room = client.get_room(room_id).unwrap();
536
537        // The room has a cached user-defined notification mode.
538        assert_eq!(
539            room.cached_user_defined_notification_mode(),
540            Some(RoomNotificationMode::AllMessages),
541        );
542
543        // Mock a sync response.
544        // A `m.push_rules` with `room` is cached during the sync.
545        // It overwrites the previous cache.
546        {
547            let server_response = assign!(http::Response::new("0".to_owned()), {
548                rooms: BTreeMap::from([(
549                    room_id.to_owned(),
550                    http::response::Room::default(),
551                )]),
552                extensions: assign!(http::response::Extensions::default(), {
553                    account_data: assign!(http::response::AccountData::default(), {
554                        global: vec![
555                            Raw::from_json_string(
556                                json!({
557                                    "type": "m.push_rules",
558                                    "content": {
559                                        "global": {
560                                            "room": [
561                                                {
562                                                    "actions": [],
563                                                    "rule_id": room_id,
564                                                    "default": false,
565                                                    "enabled": true,
566                                                },
567                                            ],
568                                        },
569                                    },
570                                })
571                                .to_string(),
572                            ).unwrap()
573                        ]
574                    })
575                })
576            });
577
578            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
579            sliding_sync
580                .handle_response(
581                    server_response.clone(),
582                    &mut pos_guard,
583                    RequestedRequiredStates::default(),
584                )
585                .await?;
586        }
587
588        // The room has an updated cached user-defined notification mode.
589        assert_eq!(
590            room.cached_user_defined_notification_mode(),
591            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
592        );
593
594        // Mock a sync response.
595        // Even if the room doesn't appear in the response, its notification mode will
596        // be updated immediately if a new `m.push_rules` is received.
597        {
598            let server_response = assign!(http::Response::new("0".to_owned()), {
599                extensions: assign!(http::response::Extensions::default(), {
600                    account_data: assign!(http::response::AccountData::default(), {
601                        global: vec![
602                            Raw::from_json_string(
603                                json!({
604                                    "type": "m.push_rules",
605                                    "content": {
606                                        "global": {
607                                            "room": [
608                                                {
609                                                    "actions": ["notify"],
610                                                    "rule_id": room_id,
611                                                    "default": false,
612                                                    "enabled": true,
613                                                },
614                                            ],
615                                        },
616                                    },
617                                })
618                                .to_string(),
619                            ).unwrap()
620                        ]
621                    })
622                })
623            });
624
625            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
626            sliding_sync
627                .handle_response(
628                    server_response.clone(),
629                    &mut pos_guard,
630                    RequestedRequiredStates::default(),
631                )
632                .await?;
633        }
634
635        // The room notification mode has been updated again!
636        assert_eq!(
637            room.cached_user_defined_notification_mode(),
638            Some(RoomNotificationMode::AllMessages),
639        );
640
641        Ok(())
642    }
643
644    #[async_test]
645    async fn test_auto_listen_to_latest_events() -> Result<()> {
646        let client = MockClientBuilder::new(None).build().await;
647        let room_id = room_id!("!r0");
648
649        // Create the room beforehand.
650        client.base_client().get_or_create_room(room_id, RoomState::Joined);
651
652        // Enable the event cache (required for the latest events).
653        client.event_cache().subscribe()?;
654
655        // The latest event “listener” for this room has NOT been enabled.
656        assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
657
658        // Create the sliding sync client.
659        let sliding_sync = client
660            .sliding_sync("test")?
661            .add_list(
662                SlidingSyncList::builder("all")
663                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
664            )
665            .build()
666            .await?;
667
668        // Receive a (mocked) response.
669        {
670            let server_response = assign!(http::Response::new("0".to_owned()), {
671                rooms: BTreeMap::from([(
672                    room_id.to_owned(),
673                    http::response::Room::default(),
674                )]),
675            });
676
677            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
678
679            sliding_sync
680                .handle_response(
681                    server_response.clone(),
682                    &mut pos_guard,
683                    RequestedRequiredStates::default(),
684                )
685                .await?;
686        }
687
688        // The room still exists.
689        assert!(client.get_room(room_id).is_some());
690
691        // The latest event “listener” for this room has been enabled.
692        assert!(client.latest_events().await.is_listening_to_room(room_id).await);
693
694        Ok(())
695    }
696
697    #[async_test]
698    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
699        use ruma::api::client::sync::sync_events::v5 as http;
700
701        // Given a logged-in client.
702        let client = MockClientBuilder::new(None).build().await;
703        client.event_cache().subscribe().unwrap();
704
705        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
706
707        // When I send sliding sync response containing a new room.
708        let room_id = room_id!("!r:e.uk");
709        let room = http::response::Room::new();
710        let mut response = http::Response::new("5".to_owned());
711        response.rooms.insert(room_id.to_owned(), room);
712
713        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
714        processor
715            .handle_room_response(&response, &RequestedRequiredStates::default())
716            .await
717            .expect("Failed to process sync");
718        processor.process_and_take_response().await.expect("Failed to finish processing sync");
719
720        // Then room info notable updates are received.
721        assert_matches!(
722            room_info_notable_update_stream.recv().await,
723            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
724                assert_eq!(received_room_id, room_id);
725                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
726            }
727        );
728        assert_matches!(
729            room_info_notable_update_stream.recv().await,
730            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
731                assert_eq!(received_room_id, room_id);
732                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
733            }
734        );
735        assert!(room_info_notable_update_stream.is_empty());
736
737        // When I send sliding sync response containing a couple of events with no read
738        // receipt.
739        let room_id = room_id!("!r:e.uk");
740        let events = vec![
741            make_raw_event("m.room.message", "$3"),
742            make_raw_event("m.room.message", "$4"),
743            make_raw_event("m.read", "$5"),
744        ];
745        let room = assign!(http::response::Room::new(), {
746            timeline: events,
747        });
748        let mut response = http::Response::new("5".to_owned());
749        response.rooms.insert(room_id.to_owned(), room);
750
751        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
752        processor
753            .handle_room_response(&response, &RequestedRequiredStates::default())
754            .await
755            .expect("Failed to process sync");
756        processor.process_and_take_response().await.expect("Failed to finish processing sync");
757
758        // Then room info notable updates are received.
759        //
760        // `NONE` because the regular sync process ends up to updating nothing.
761        assert_matches!(
762            room_info_notable_update_stream.recv().await,
763            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
764                assert_eq!(received_room_id, room_id);
765                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
766            }
767        );
768        // `READ_RECEIPT` because this is what we expect.
769        assert_matches!(
770            room_info_notable_update_stream.recv().await,
771            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
772                assert_eq!(received_room_id, room_id);
773                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
774            }
775        );
776        assert!(room_info_notable_update_stream.is_empty());
777    }
778
779    fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
780        Raw::from_json_string(
781            json!({
782                "type": event_type,
783                "event_id": id,
784                "content": { "msgtype": "m.text", "body": "my msg" },
785                "sender": "@u:h.uk",
786                "origin_server_ts": 12344445,
787            })
788            .to_string(),
789        )
790        .unwrap()
791    }
792}