Skip to main content

matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5    RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9    OwnedRoomId,
10    api::{
11        FeatureFlag, SupportedVersions,
12        client::sync::sync_events::v5::{self as http, response},
13    },
14    events::GlobalAccountDataEventType,
15};
16use tokio::sync::MutexGuard;
17use tracing::error;
18
19use super::{SlidingSync, SlidingSyncBuilder};
20use crate::{Client, Result, sync::subscribe_to_room_latest_events};
21
22/// A sliding sync version.
23#[derive(Clone, Debug)]
24pub enum Version {
25    /// No version. Useful to represent that sliding sync is disabled for
26    /// example, and that the version is unknown.
27    None,
28
29    /// Use the version of the sliding sync implementation inside Synapse, i.e.
30    /// MSC4186.
31    Native,
32}
33
34impl Version {
35    #[cfg(test)]
36    pub(crate) fn is_native(&self) -> bool {
37        matches!(self, Self::Native)
38    }
39}
40
41/// An error when building a version.
42#[derive(thiserror::Error, Debug)]
43pub enum VersionBuilderError {
44    /// The `.well-known` response is not set.
45    #[error("`.well-known` is not set")]
46    WellKnownNotSet,
47
48    /// The `/versions` response is not set.
49    #[error("The `/versions` response is not set")]
50    MissingVersionsResponse,
51
52    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
53    /// `unstable_features`, or it's not set to true.
54    #[error(
55        "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
56         or it's not set to true."
57    )]
58    NativeVersionIsUnset,
59}
60
61/// A builder for [`Version`].
62#[derive(Clone, Debug)]
63pub enum VersionBuilder {
64    /// Build a [`Version::None`].
65    None,
66
67    /// Build a [`Version::Native`].
68    Native,
69
70    /// Build a [`Version::Native`] by auto-discovering it.
71    ///
72    /// It is available if the server enables it via `/versions`.
73    DiscoverNative,
74}
75
76impl VersionBuilder {
77    pub(crate) fn needs_get_supported_versions(&self) -> bool {
78        matches!(self, Self::DiscoverNative)
79    }
80
81    /// Build a [`Version`].
82    ///
83    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
84    /// invalid data.
85    pub fn build(
86        self,
87        supported: Option<&SupportedVersions>,
88    ) -> Result<Version, VersionBuilderError> {
89        Ok(match self {
90            Self::None => Version::None,
91
92            Self::Native => Version::Native,
93
94            Self::DiscoverNative => {
95                let Some(supported) = supported else {
96                    return Err(VersionBuilderError::MissingVersionsResponse);
97                };
98
99                if supported.features.contains(&FeatureFlag::Msc4186) {
100                    Version::Native
101                } else {
102                    return Err(VersionBuilderError::NativeVersionIsUnset);
103                }
104            }
105        })
106    }
107}
108
109impl Client {
110    /// Find all sliding sync versions that are available.
111    ///
112    /// Be careful: This method may hit the store and will send new requests for
113    /// each call. It can be costly to call it repeatedly.
114    ///
115    /// If `.well-known` or `/versions` is unreachable, it will simply move
116    /// potential sliding sync versions aside. No error will be reported.
117    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
118        let supported_versions = self.supported_versions().await.ok();
119
120        [VersionBuilder::DiscoverNative]
121            .into_iter()
122            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
123            .collect()
124    }
125
126    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
127    /// identifier.
128    ///
129    /// Note: the identifier must not be more than 16 chars long!
130    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
131        Ok(SlidingSync::builder(id.into(), self.clone())?)
132    }
133
134    /// Handle all the information provided in a sliding sync response, except
135    /// for the e2ee bits.
136    ///
137    /// If you need to handle encryption too, use the internal
138    /// `SlidingSyncResponseProcessor` instead.
139    #[cfg(any(test, feature = "testing"))]
140    #[tracing::instrument(skip(self, response))]
141    pub async fn process_sliding_sync_test_helper(
142        &self,
143        response: &http::Response,
144        requested_required_states: &RequestedRequiredStates,
145    ) -> Result<SyncResponse> {
146        let response = self
147            .base_client()
148            .process_sliding_sync(
149                response,
150                requested_required_states,
151                &self.base_client().state_store_lock().lock().await,
152            )
153            .await?;
154
155        tracing::debug!("done processing on base_client");
156        self.call_sync_response_handlers(&response).await?;
157
158        Ok(response)
159    }
160}
161
162/// Small helper to handle a `SlidingSync` response's sub parts.
163///
164/// This will properly handle the encryption and the room response
165/// independently, if needs be, making sure that both are properly processed by
166/// event handlers.
167#[must_use]
168pub(crate) struct SlidingSyncResponseProcessor {
169    client: Client,
170    to_device_events: Vec<ProcessedToDeviceEvent>,
171    response: Option<SyncResponse>,
172}
173
174impl SlidingSyncResponseProcessor {
175    pub fn new(client: Client) -> Self {
176        Self { client, to_device_events: Vec::new(), response: None }
177    }
178
179    #[cfg(feature = "e2e-encryption")]
180    pub async fn handle_encryption(
181        &mut self,
182        extensions: &response::Extensions,
183        state_store_guard: &MutexGuard<'_, ()>,
184    ) -> Result<()> {
185        // This is an internal API misuse if this is triggered (calling
186        // `handle_room_response` before this function), so panic is fine.
187        assert!(self.response.is_none());
188
189        self.to_device_events = if let Some(to_device_events) = self
190            .client
191            .base_client()
192            .process_sliding_sync_e2ee(
193                extensions.to_device.as_ref(),
194                &extensions.e2ee,
195                state_store_guard,
196            )
197            .await?
198        {
199            // Some new keys might have been received, so trigger a backup if needed.
200            self.client.encryption().backups().maybe_trigger_backup();
201
202            to_device_events
203        } else {
204            Vec::new()
205        };
206
207        Ok(())
208    }
209
210    pub async fn handle_room_response(
211        &mut self,
212        response: &http::Response,
213        requested_required_states: &RequestedRequiredStates,
214        state_store_guard: &MutexGuard<'_, ()>,
215    ) -> Result<()> {
216        subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
217
218        let previously_joined_rooms = self
219            .client
220            .joined_rooms()
221            .into_iter()
222            .map(|r| r.room_id().to_owned())
223            .collect::<BTreeSet<_>>();
224
225        let mut sync_response = self
226            .client
227            .base_client()
228            .process_sliding_sync(response, requested_required_states, state_store_guard)
229            .await?;
230
231        handle_receipts_extension(&self.client, response, &mut sync_response, state_store_guard)
232            .await?;
233
234        update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
235
236        self.response = Some(sync_response);
237
238        Ok(())
239    }
240
241    pub async fn handle_thread_subscriptions(
242        &mut self,
243        previous_pos: Option<&str>,
244        thread_subs: response::ThreadSubscriptions,
245    ) -> Result<()> {
246        let catchup_token =
247            thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
248                from: prev_batch,
249                to: previous_pos.map(|s| s.to_owned()),
250            });
251
252        self.client
253            .thread_subscription_catchup()
254            .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
255            .await?;
256
257        Ok(())
258    }
259
260    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
261        let mut response = self.response.take().unwrap_or_default();
262
263        response.to_device.extend(self.to_device_events);
264
265        self.client.call_sync_response_handlers(&response).await?;
266
267        Ok(response)
268    }
269}
270
271/// Update the caches for the rooms that received updates.
272///
273/// This will only fill the in-memory caches, not save the info on disk.
274async fn update_in_memory_caches(
275    client: &Client,
276    previously_joined_rooms: &BTreeSet<OwnedRoomId>,
277    response: &SyncResponse,
278) {
279    let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
280
281    // If the push rules have changed, update the cached notification mode for *all*
282    // the joined rooms.
283    if response.account_data.iter().any(|event| {
284        event
285            .get_field::<GlobalAccountDataEventType>("type")
286            .ok()
287            .flatten()
288            .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
289    }) {
290        let notification_settings = client.notification_settings().await;
291        let rules = notification_settings.rules().await;
292
293        // Update all joined rooms.
294        for room in client.joined_rooms() {
295            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
296                room.update_cached_user_defined_notification_mode(mode);
297            } else {
298                room.clear_user_defined_notification_mode();
299            }
300        }
301    } else {
302        // Otherwise, precompute the cached user-defined notification mode only for the
303        // newly joined rooms.
304
305        // We'll compute the rules only once, lazily, if needs be.
306        let mut rules = None;
307
308        for room_id in response
309            .rooms
310            .joined
311            .keys()
312            .filter(|room_id| !previously_joined_rooms.contains(*room_id))
313        {
314            let Some(room) = client.get_room(room_id) else {
315                error!(?room_id, "The room must exist since it has been joined");
316                continue;
317            };
318
319            // Reuse the previous `Rules` instance, or compute it once and for all.
320            let rules = if let Some(rules) = &mut rules {
321                rules
322            } else {
323                rules.insert(client.notification_settings().await.rules().await.clone())
324            };
325
326            // Define an initial value for the cached user-defined notification mode.
327            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
328                room.update_cached_user_defined_notification_mode(mode);
329            }
330        }
331    }
332}
333
334/// Update the receipts extension and compute the read receipt accordingly.
335async fn handle_receipts_extension(
336    client: &Client,
337    response: &http::Response,
338    sync_response: &mut SyncResponse,
339    state_store_guard: &MutexGuard<'_, ()>,
340) -> Result<()> {
341    let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
342
343    // We need to compute read receipts for each joined room that has received an
344    // update, or from each room that has received a receipt ephemeral event.
345    let room_ids = BTreeSet::from_iter(
346        sync_response
347            .rooms
348            .joined
349            .keys()
350            .cloned()
351            .chain(response.extensions.receipts.rooms.keys().cloned()),
352    );
353
354    // Process each room concurrently.
355    let futures = room_ids.into_iter().map(|room_id| async {
356        let receipt_event = client
357            .base_client()
358            .process_sliding_sync_receipts_extension_for_room(&room_id, response, state_store_guard)
359            .await?;
360
361        Result::<_, crate::Error>::Ok(Some((room_id, receipt_event)))
362    });
363
364    let updates = try_join_all(futures).await?;
365
366    for (room_id, receipt_event_content) in updates.into_iter().flatten() {
367        if let Some(event) = receipt_event_content {
368            sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
369        }
370    }
371
372    Ok(())
373}
374
375#[cfg(all(test, not(target_family = "wasm")))]
376mod tests {
377    use std::{collections::BTreeMap, ops::Not};
378
379    use assert_matches::assert_matches;
380    use matrix_sdk_base::{
381        RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
382        notification_settings::RoomNotificationMode,
383    };
384    use matrix_sdk_test::{async_test, event_factory::EventFactory};
385    use ruma::{
386        api::client::discovery::get_supported_versions, assign, event_id, room_id, serde::Raw,
387        user_id,
388    };
389    use serde_json::json;
390    use tokio::task::yield_now;
391
392    use super::{Version, VersionBuilder};
393    use crate::{
394        SlidingSyncList, SlidingSyncMode,
395        error::Result,
396        sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
397        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
398    };
399
400    #[test]
401    fn test_version_builder_none() {
402        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
403    }
404
405    #[test]
406    fn test_version_builder_native() {
407        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
408    }
409
410    #[test]
411    fn test_version_builder_discover_native() {
412        let mut response = get_supported_versions::Response::new(vec![]);
413        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
414
415        assert_matches!(
416            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
417            Ok(Version::Native)
418        );
419    }
420
421    #[test]
422    fn test_version_builder_discover_native_no_supported_versions() {
423        assert_matches!(
424            VersionBuilder::DiscoverNative.build(None),
425            Err(VersionBuilderError::MissingVersionsResponse)
426        );
427    }
428
429    #[test]
430    fn test_version_builder_discover_native_unstable_features_is_disabled() {
431        let mut response = get_supported_versions::Response::new(vec![]);
432        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
433
434        assert_matches!(
435            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
436            Err(VersionBuilderError::NativeVersionIsUnset)
437        );
438    }
439
440    #[async_test]
441    async fn test_available_sliding_sync_versions_none() {
442        let client = MockClientBuilder::new(None).build().await;
443        let available_versions = client.available_sliding_sync_versions().await;
444
445        // `.well-known` and `/versions` aren't available. It's impossible to find any
446        // versions.
447        assert!(available_versions.is_empty());
448    }
449
450    #[async_test]
451    async fn test_available_sliding_sync_versions_native() {
452        let server = MatrixMockServer::new().await;
453        let client = server.client_builder().no_server_versions().build().await;
454
455        server.mock_versions().with_simplified_sliding_sync().ok().mock_once().mount().await;
456
457        let available_versions = client.available_sliding_sync_versions().await;
458
459        // `/versions` is available.
460        assert_eq!(available_versions.len(), 1);
461        assert_matches!(available_versions[0], Version::Native);
462    }
463
464    #[async_test]
465    async fn test_cache_user_defined_notification_mode() -> Result<()> {
466        let client = MockClientBuilder::new(None).build().await;
467        let room_id = room_id!("!r0:matrix.org");
468
469        let sliding_sync = client
470            .sliding_sync("test")?
471            .with_account_data_extension(
472                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
473            )
474            .add_list(
475                SlidingSyncList::builder("all")
476                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
477            )
478            .build()
479            .await?;
480
481        // Mock a sync response.
482        // A `m.push_rules` with `room` is cached during the sync.
483        {
484            let server_response = assign!(http::Response::new("0".to_owned()), {
485                rooms: BTreeMap::from([(
486                    room_id.to_owned(),
487                    http::response::Room::default(),
488                )]),
489                extensions: assign!(http::response::Extensions::default(), {
490                    account_data: assign!(http::response::AccountData::default(), {
491                        global: vec![
492                            Raw::from_json_string(
493                                json!({
494                                    "type": "m.push_rules",
495                                    "content": {
496                                        "global": {
497                                            "room": [
498                                                {
499                                                    "actions": ["notify"],
500                                                    "rule_id": room_id,
501                                                    "default": false,
502                                                    "enabled": true,
503                                                },
504                                            ],
505                                        },
506                                    },
507                                })
508                                .to_string(),
509                            ).unwrap()
510                        ]
511                    })
512                })
513            });
514
515            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
516            sliding_sync
517                .handle_response(
518                    server_response.clone(),
519                    &mut pos_guard,
520                    RequestedRequiredStates::default(),
521                )
522                .await?;
523        }
524
525        // The room must exist, since it's been synced.
526        let room = client.get_room(room_id).unwrap();
527
528        // The room has a cached user-defined notification mode.
529        assert_eq!(
530            room.cached_user_defined_notification_mode(),
531            Some(RoomNotificationMode::AllMessages),
532        );
533
534        // Mock a sync response.
535        // A `m.push_rules` with `room` is cached during the sync.
536        // It overwrites the previous cache.
537        {
538            let server_response = assign!(http::Response::new("0".to_owned()), {
539                rooms: BTreeMap::from([(
540                    room_id.to_owned(),
541                    http::response::Room::default(),
542                )]),
543                extensions: assign!(http::response::Extensions::default(), {
544                    account_data: assign!(http::response::AccountData::default(), {
545                        global: vec![
546                            Raw::from_json_string(
547                                json!({
548                                    "type": "m.push_rules",
549                                    "content": {
550                                        "global": {
551                                            "room": [
552                                                {
553                                                    "actions": [],
554                                                    "rule_id": room_id,
555                                                    "default": false,
556                                                    "enabled": true,
557                                                },
558                                            ],
559                                        },
560                                    },
561                                })
562                                .to_string(),
563                            ).unwrap()
564                        ]
565                    })
566                })
567            });
568
569            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
570            sliding_sync
571                .handle_response(
572                    server_response.clone(),
573                    &mut pos_guard,
574                    RequestedRequiredStates::default(),
575                )
576                .await?;
577        }
578
579        // The room has an updated cached user-defined notification mode.
580        assert_eq!(
581            room.cached_user_defined_notification_mode(),
582            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
583        );
584
585        // Mock a sync response.
586        // Even if the room doesn't appear in the response, its notification mode will
587        // be updated immediately if a new `m.push_rules` is received.
588        {
589            let server_response = assign!(http::Response::new("0".to_owned()), {
590                extensions: assign!(http::response::Extensions::default(), {
591                    account_data: assign!(http::response::AccountData::default(), {
592                        global: vec![
593                            Raw::from_json_string(
594                                json!({
595                                    "type": "m.push_rules",
596                                    "content": {
597                                        "global": {
598                                            "room": [
599                                                {
600                                                    "actions": ["notify"],
601                                                    "rule_id": room_id,
602                                                    "default": false,
603                                                    "enabled": true,
604                                                },
605                                            ],
606                                        },
607                                    },
608                                })
609                                .to_string(),
610                            ).unwrap()
611                        ]
612                    })
613                })
614            });
615
616            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
617            sliding_sync
618                .handle_response(
619                    server_response.clone(),
620                    &mut pos_guard,
621                    RequestedRequiredStates::default(),
622                )
623                .await?;
624        }
625
626        // The room notification mode has been updated again!
627        assert_eq!(
628            room.cached_user_defined_notification_mode(),
629            Some(RoomNotificationMode::AllMessages),
630        );
631
632        Ok(())
633    }
634
635    #[async_test]
636    async fn test_auto_listen_to_latest_events() -> Result<()> {
637        let client = MockClientBuilder::new(None).build().await;
638        let room_id = room_id!("!r0");
639
640        // Create the room beforehand.
641        client.base_client().get_or_create_room(room_id, RoomState::Joined);
642
643        // Enable the event cache (required for the latest events).
644        client.event_cache().subscribe()?;
645
646        // The latest event “listener” for this room has NOT been enabled.
647        assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
648
649        // Create the sliding sync client.
650        let sliding_sync = client
651            .sliding_sync("test")?
652            .add_list(
653                SlidingSyncList::builder("all")
654                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
655            )
656            .build()
657            .await?;
658
659        // Receive a (mocked) response.
660        {
661            let server_response = assign!(http::Response::new("0".to_owned()), {
662                rooms: BTreeMap::from([(
663                    room_id.to_owned(),
664                    http::response::Room::default(),
665                )]),
666            });
667
668            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
669
670            sliding_sync
671                .handle_response(
672                    server_response.clone(),
673                    &mut pos_guard,
674                    RequestedRequiredStates::default(),
675                )
676                .await?;
677        }
678
679        // The room still exists.
680        assert!(client.get_room(room_id).is_some());
681
682        // The latest event “listener” for this room has been enabled.
683        assert!(client.latest_events().await.is_listening_to_room(room_id).await);
684
685        Ok(())
686    }
687
688    #[async_test]
689    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
690        use ruma::api::client::sync::sync_events::v5 as http;
691
692        // Given a logged-in client.
693        let client = MockClientBuilder::new(None).build().await;
694        client.event_cache().subscribe().unwrap();
695
696        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
697
698        // When I send sliding sync response containing a new room.
699        let room_id = room_id!("!r:e.uk");
700        let room = http::response::Room::new();
701        let mut response = http::Response::new("5".to_owned());
702        response.rooms.insert(room_id.to_owned(), room);
703
704        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
705        {
706            let state_store_guard = client.base_client().state_store_lock().lock().await;
707            processor
708                .handle_room_response(
709                    &response,
710                    &RequestedRequiredStates::default(),
711                    &state_store_guard,
712                )
713                .await
714                .expect("Failed to process sync");
715        }
716        processor.process_and_take_response().await.expect("Failed to finish processing sync");
717
718        // Then room info notable updates are received.
719        assert_matches!(
720            room_info_notable_update_stream.recv().await,
721            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
722                assert_eq!(received_room_id, room_id);
723                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
724            }
725        );
726        assert_matches!(
727            room_info_notable_update_stream.recv().await,
728            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
729                assert_eq!(received_room_id, room_id);
730                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
731            }
732        );
733        assert!(room_info_notable_update_stream.is_empty());
734
735        // When I send sliding sync response containing a couple of events with no read
736        // receipt.
737        let room_id = room_id!("!r:e.uk");
738        let f = EventFactory::new().room(room_id).sender(user_id!("@u:h.uk"));
739        let events = vec![
740            f.text_msg("hi").event_id(event_id!("$3")).into_raw_sync(),
741            f.text_msg("hi").event_id(event_id!("$4")).into_raw_sync(),
742        ];
743        let room = assign!(http::response::Room::new(), {
744            timeline: events,
745        });
746        let mut response = http::Response::new("5".to_owned());
747        response.rooms.insert(room_id.to_owned(), room);
748
749        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
750        {
751            let state_store_guard = client.base_client().state_store_lock().lock().await;
752            processor
753                .handle_room_response(
754                    &response,
755                    &RequestedRequiredStates::default(),
756                    &state_store_guard,
757                )
758                .await
759                .expect("Failed to process sync");
760        }
761        processor.process_and_take_response().await.expect("Failed to finish processing sync");
762
763        // Then room info notable updates are received.
764        //
765        // `NONE` because the regular sync process ends up to updating nothing.
766        assert_matches!(
767            room_info_notable_update_stream.recv().await,
768            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
769                assert_eq!(received_room_id, room_id);
770                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
771            }
772        );
773        // `READ_RECEIPT` because this is what we expect.
774        assert_matches!(
775            room_info_notable_update_stream.recv().await,
776            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
777                assert_eq!(received_room_id, room_id);
778                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
779            }
780        );
781
782        // At some point, we receive an update for the `LATEST_EVENT` too, since this is
783        // enabled by default.
784        assert_matches!(
785            room_info_notable_update_stream.recv().await,
786            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
787                assert_eq!(received_room_id, room_id);
788                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT), "{received_reasons:?}");
789            }
790        );
791
792        yield_now().await;
793
794        // Then the stream gets quiet.
795        assert!(room_info_notable_update_stream.is_empty());
796    }
797}