Skip to main content

matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5    RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9    OwnedRoomId,
10    api::{
11        FeatureFlag, SupportedVersions,
12        client::sync::sync_events::v5::{self as http, response},
13    },
14    events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result, sync::subscribe_to_room_latest_events};
20
21/// A sliding sync version.
22#[derive(Clone, Debug)]
23pub enum Version {
24    /// No version. Useful to represent that sliding sync is disabled for
25    /// example, and that the version is unknown.
26    None,
27
28    /// Use the version of the sliding sync implementation inside Synapse, i.e.
29    /// MSC4186.
30    Native,
31}
32
33impl Version {
34    #[cfg(test)]
35    pub(crate) fn is_native(&self) -> bool {
36        matches!(self, Self::Native)
37    }
38}
39
40/// An error when building a version.
41#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43    /// The `.well-known` response is not set.
44    #[error("`.well-known` is not set")]
45    WellKnownNotSet,
46
47    /// The `/versions` response is not set.
48    #[error("The `/versions` response is not set")]
49    MissingVersionsResponse,
50
51    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
52    /// `unstable_features`, or it's not set to true.
53    #[error(
54        "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55         or it's not set to true."
56    )]
57    NativeVersionIsUnset,
58}
59
60/// A builder for [`Version`].
61#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63    /// Build a [`Version::None`].
64    None,
65
66    /// Build a [`Version::Native`].
67    Native,
68
69    /// Build a [`Version::Native`] by auto-discovering it.
70    ///
71    /// It is available if the server enables it via `/versions`.
72    DiscoverNative,
73}
74
75impl VersionBuilder {
76    pub(crate) fn needs_get_supported_versions(&self) -> bool {
77        matches!(self, Self::DiscoverNative)
78    }
79
80    /// Build a [`Version`].
81    ///
82    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
83    /// invalid data.
84    pub fn build(
85        self,
86        supported: Option<&SupportedVersions>,
87    ) -> Result<Version, VersionBuilderError> {
88        Ok(match self {
89            Self::None => Version::None,
90
91            Self::Native => Version::Native,
92
93            Self::DiscoverNative => {
94                let Some(supported) = supported else {
95                    return Err(VersionBuilderError::MissingVersionsResponse);
96                };
97
98                if supported.features.contains(&FeatureFlag::Msc4186) {
99                    Version::Native
100                } else {
101                    return Err(VersionBuilderError::NativeVersionIsUnset);
102                }
103            }
104        })
105    }
106}
107
108impl Client {
109    /// Find all sliding sync versions that are available.
110    ///
111    /// Be careful: This method may hit the store and will send new requests for
112    /// each call. It can be costly to call it repeatedly.
113    ///
114    /// If `.well-known` or `/versions` is unreachable, it will simply move
115    /// potential sliding sync versions aside. No error will be reported.
116    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117        let supported_versions = self.supported_versions().await.ok();
118
119        [VersionBuilder::DiscoverNative]
120            .into_iter()
121            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122            .collect()
123    }
124
125    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
126    /// identifier.
127    ///
128    /// Note: the identifier must not be more than 16 chars long!
129    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130        Ok(SlidingSync::builder(id.into(), self.clone())?)
131    }
132
133    /// Handle all the information provided in a sliding sync response, except
134    /// for the e2ee bits.
135    ///
136    /// If you need to handle encryption too, use the internal
137    /// `SlidingSyncResponseProcessor` instead.
138    #[cfg(any(test, feature = "testing"))]
139    #[tracing::instrument(skip(self, response))]
140    pub async fn process_sliding_sync_test_helper(
141        &self,
142        response: &http::Response,
143        requested_required_states: &RequestedRequiredStates,
144    ) -> Result<SyncResponse> {
145        let response =
146            self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148        tracing::debug!("done processing on base_client");
149        self.call_sync_response_handlers(&response).await?;
150
151        Ok(response)
152    }
153}
154
155/// Small helper to handle a `SlidingSync` response's sub parts.
156///
157/// This will properly handle the encryption and the room response
158/// independently, if needs be, making sure that both are properly processed by
159/// event handlers.
160#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162    client: Client,
163    to_device_events: Vec<ProcessedToDeviceEvent>,
164    response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168    pub fn new(client: Client) -> Self {
169        Self { client, to_device_events: Vec::new(), response: None }
170    }
171
172    #[cfg(feature = "e2e-encryption")]
173    pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174        // This is an internal API misuse if this is triggered (calling
175        // `handle_room_response` before this function), so panic is fine.
176        assert!(self.response.is_none());
177
178        self.to_device_events = if let Some(to_device_events) = self
179            .client
180            .base_client()
181            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182            .await?
183        {
184            // Some new keys might have been received, so trigger a backup if needed.
185            self.client.encryption().backups().maybe_trigger_backup();
186
187            to_device_events
188        } else {
189            Vec::new()
190        };
191
192        Ok(())
193    }
194
195    pub async fn handle_room_response(
196        &mut self,
197        response: &http::Response,
198        requested_required_states: &RequestedRequiredStates,
199    ) -> Result<()> {
200        subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
201
202        let previously_joined_rooms = self
203            .client
204            .joined_rooms()
205            .into_iter()
206            .map(|r| r.room_id().to_owned())
207            .collect::<BTreeSet<_>>();
208
209        let mut sync_response = self
210            .client
211            .base_client()
212            .process_sliding_sync(response, requested_required_states)
213            .await?;
214
215        handle_receipts_extension(&self.client, response, &mut sync_response).await?;
216
217        update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
218
219        self.response = Some(sync_response);
220
221        Ok(())
222    }
223
224    pub async fn handle_thread_subscriptions(
225        &mut self,
226        previous_pos: Option<&str>,
227        thread_subs: response::ThreadSubscriptions,
228    ) -> Result<()> {
229        let catchup_token =
230            thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
231                from: prev_batch,
232                to: previous_pos.map(|s| s.to_owned()),
233            });
234
235        self.client
236            .thread_subscription_catchup()
237            .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
238            .await?;
239
240        Ok(())
241    }
242
243    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
244        let mut response = self.response.take().unwrap_or_default();
245
246        response.to_device.extend(self.to_device_events);
247
248        self.client.call_sync_response_handlers(&response).await?;
249
250        Ok(response)
251    }
252}
253
254/// Update the caches for the rooms that received updates.
255///
256/// This will only fill the in-memory caches, not save the info on disk.
257async fn update_in_memory_caches(
258    client: &Client,
259    previously_joined_rooms: &BTreeSet<OwnedRoomId>,
260    response: &SyncResponse,
261) {
262    let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
263
264    // If the push rules have changed, update the cached notification mode for *all*
265    // the joined rooms.
266    if response.account_data.iter().any(|event| {
267        event
268            .get_field::<GlobalAccountDataEventType>("type")
269            .ok()
270            .flatten()
271            .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
272    }) {
273        let notification_settings = client.notification_settings().await;
274        let rules = notification_settings.rules().await;
275
276        // Update all joined rooms.
277        for room in client.joined_rooms() {
278            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
279                room.update_cached_user_defined_notification_mode(mode);
280            } else {
281                room.clear_user_defined_notification_mode();
282            }
283        }
284    } else {
285        // Otherwise, precompute the cached user-defined notification mode only for the
286        // newly joined rooms.
287
288        // We'll compute the rules only once, lazily, if needs be.
289        let mut rules = None;
290
291        for room_id in response
292            .rooms
293            .joined
294            .keys()
295            .filter(|room_id| !previously_joined_rooms.contains(*room_id))
296        {
297            let Some(room) = client.get_room(room_id) else {
298                error!(?room_id, "The room must exist since it has been joined");
299                continue;
300            };
301
302            // Reuse the previous `Rules` instance, or compute it once and for all.
303            let rules = if let Some(rules) = &mut rules {
304                rules
305            } else {
306                rules.insert(client.notification_settings().await.rules().await.clone())
307            };
308
309            // Define an initial value for the cached user-defined notification mode.
310            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
311                room.update_cached_user_defined_notification_mode(mode);
312            }
313        }
314    }
315}
316
317/// Update the receipts extension and compute the read receipt accordingly.
318async fn handle_receipts_extension(
319    client: &Client,
320    response: &http::Response,
321    sync_response: &mut SyncResponse,
322) -> Result<()> {
323    let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
324
325    // We need to compute read receipts for each joined room that has received an
326    // update, or from each room that has received a receipt ephemeral event.
327    let room_ids = BTreeSet::from_iter(
328        sync_response
329            .rooms
330            .joined
331            .keys()
332            .cloned()
333            .chain(response.extensions.receipts.rooms.keys().cloned()),
334    );
335
336    // Process each room concurrently.
337    let futures = room_ids.into_iter().map(|room_id| async {
338        let receipt_event = client
339            .base_client()
340            .process_sliding_sync_receipts_extension_for_room(&room_id, response)
341            .await?;
342
343        Result::<_, crate::Error>::Ok(Some((room_id, receipt_event)))
344    });
345
346    let updates = try_join_all(futures).await?;
347
348    for (room_id, receipt_event_content) in updates.into_iter().flatten() {
349        if let Some(event) = receipt_event_content {
350            sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
351        }
352    }
353
354    Ok(())
355}
356
357#[cfg(all(test, not(target_family = "wasm")))]
358mod tests {
359    use std::{collections::BTreeMap, ops::Not};
360
361    use assert_matches::assert_matches;
362    use matrix_sdk_base::{
363        RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
364        notification_settings::RoomNotificationMode,
365    };
366    use matrix_sdk_test::{async_test, event_factory::EventFactory};
367    use ruma::{
368        api::client::discovery::get_supported_versions, assign, event_id, room_id, serde::Raw,
369        user_id,
370    };
371    use serde_json::json;
372    use tokio::task::yield_now;
373
374    use super::{Version, VersionBuilder};
375    use crate::{
376        SlidingSyncList, SlidingSyncMode,
377        error::Result,
378        sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
379        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
380    };
381
382    #[test]
383    fn test_version_builder_none() {
384        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
385    }
386
387    #[test]
388    fn test_version_builder_native() {
389        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
390    }
391
392    #[test]
393    fn test_version_builder_discover_native() {
394        let mut response = get_supported_versions::Response::new(vec![]);
395        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
396
397        assert_matches!(
398            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
399            Ok(Version::Native)
400        );
401    }
402
403    #[test]
404    fn test_version_builder_discover_native_no_supported_versions() {
405        assert_matches!(
406            VersionBuilder::DiscoverNative.build(None),
407            Err(VersionBuilderError::MissingVersionsResponse)
408        );
409    }
410
411    #[test]
412    fn test_version_builder_discover_native_unstable_features_is_disabled() {
413        let mut response = get_supported_versions::Response::new(vec![]);
414        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
415
416        assert_matches!(
417            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
418            Err(VersionBuilderError::NativeVersionIsUnset)
419        );
420    }
421
422    #[async_test]
423    async fn test_available_sliding_sync_versions_none() {
424        let client = MockClientBuilder::new(None).build().await;
425        let available_versions = client.available_sliding_sync_versions().await;
426
427        // `.well-known` and `/versions` aren't available. It's impossible to find any
428        // versions.
429        assert!(available_versions.is_empty());
430    }
431
432    #[async_test]
433    async fn test_available_sliding_sync_versions_native() {
434        let server = MatrixMockServer::new().await;
435        let client = server.client_builder().no_server_versions().build().await;
436
437        server.mock_versions().with_simplified_sliding_sync().ok().mock_once().mount().await;
438
439        let available_versions = client.available_sliding_sync_versions().await;
440
441        // `/versions` is available.
442        assert_eq!(available_versions.len(), 1);
443        assert_matches!(available_versions[0], Version::Native);
444    }
445
446    #[async_test]
447    async fn test_cache_user_defined_notification_mode() -> Result<()> {
448        let client = MockClientBuilder::new(None).build().await;
449        let room_id = room_id!("!r0:matrix.org");
450
451        let sliding_sync = client
452            .sliding_sync("test")?
453            .with_account_data_extension(
454                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
455            )
456            .add_list(
457                SlidingSyncList::builder("all")
458                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
459            )
460            .build()
461            .await?;
462
463        // Mock a sync response.
464        // A `m.push_rules` with `room` is cached during the sync.
465        {
466            let server_response = assign!(http::Response::new("0".to_owned()), {
467                rooms: BTreeMap::from([(
468                    room_id.to_owned(),
469                    http::response::Room::default(),
470                )]),
471                extensions: assign!(http::response::Extensions::default(), {
472                    account_data: assign!(http::response::AccountData::default(), {
473                        global: vec![
474                            Raw::from_json_string(
475                                json!({
476                                    "type": "m.push_rules",
477                                    "content": {
478                                        "global": {
479                                            "room": [
480                                                {
481                                                    "actions": ["notify"],
482                                                    "rule_id": room_id,
483                                                    "default": false,
484                                                    "enabled": true,
485                                                },
486                                            ],
487                                        },
488                                    },
489                                })
490                                .to_string(),
491                            ).unwrap()
492                        ]
493                    })
494                })
495            });
496
497            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
498            sliding_sync
499                .handle_response(
500                    server_response.clone(),
501                    &mut pos_guard,
502                    RequestedRequiredStates::default(),
503                )
504                .await?;
505        }
506
507        // The room must exist, since it's been synced.
508        let room = client.get_room(room_id).unwrap();
509
510        // The room has a cached user-defined notification mode.
511        assert_eq!(
512            room.cached_user_defined_notification_mode(),
513            Some(RoomNotificationMode::AllMessages),
514        );
515
516        // Mock a sync response.
517        // A `m.push_rules` with `room` is cached during the sync.
518        // It overwrites the previous cache.
519        {
520            let server_response = assign!(http::Response::new("0".to_owned()), {
521                rooms: BTreeMap::from([(
522                    room_id.to_owned(),
523                    http::response::Room::default(),
524                )]),
525                extensions: assign!(http::response::Extensions::default(), {
526                    account_data: assign!(http::response::AccountData::default(), {
527                        global: vec![
528                            Raw::from_json_string(
529                                json!({
530                                    "type": "m.push_rules",
531                                    "content": {
532                                        "global": {
533                                            "room": [
534                                                {
535                                                    "actions": [],
536                                                    "rule_id": room_id,
537                                                    "default": false,
538                                                    "enabled": true,
539                                                },
540                                            ],
541                                        },
542                                    },
543                                })
544                                .to_string(),
545                            ).unwrap()
546                        ]
547                    })
548                })
549            });
550
551            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
552            sliding_sync
553                .handle_response(
554                    server_response.clone(),
555                    &mut pos_guard,
556                    RequestedRequiredStates::default(),
557                )
558                .await?;
559        }
560
561        // The room has an updated cached user-defined notification mode.
562        assert_eq!(
563            room.cached_user_defined_notification_mode(),
564            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
565        );
566
567        // Mock a sync response.
568        // Even if the room doesn't appear in the response, its notification mode will
569        // be updated immediately if a new `m.push_rules` is received.
570        {
571            let server_response = assign!(http::Response::new("0".to_owned()), {
572                extensions: assign!(http::response::Extensions::default(), {
573                    account_data: assign!(http::response::AccountData::default(), {
574                        global: vec![
575                            Raw::from_json_string(
576                                json!({
577                                    "type": "m.push_rules",
578                                    "content": {
579                                        "global": {
580                                            "room": [
581                                                {
582                                                    "actions": ["notify"],
583                                                    "rule_id": room_id,
584                                                    "default": false,
585                                                    "enabled": true,
586                                                },
587                                            ],
588                                        },
589                                    },
590                                })
591                                .to_string(),
592                            ).unwrap()
593                        ]
594                    })
595                })
596            });
597
598            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
599            sliding_sync
600                .handle_response(
601                    server_response.clone(),
602                    &mut pos_guard,
603                    RequestedRequiredStates::default(),
604                )
605                .await?;
606        }
607
608        // The room notification mode has been updated again!
609        assert_eq!(
610            room.cached_user_defined_notification_mode(),
611            Some(RoomNotificationMode::AllMessages),
612        );
613
614        Ok(())
615    }
616
617    #[async_test]
618    async fn test_auto_listen_to_latest_events() -> Result<()> {
619        let client = MockClientBuilder::new(None).build().await;
620        let room_id = room_id!("!r0");
621
622        // Create the room beforehand.
623        client.base_client().get_or_create_room(room_id, RoomState::Joined);
624
625        // Enable the event cache (required for the latest events).
626        client.event_cache().subscribe()?;
627
628        // The latest event “listener” for this room has NOT been enabled.
629        assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
630
631        // Create the sliding sync client.
632        let sliding_sync = client
633            .sliding_sync("test")?
634            .add_list(
635                SlidingSyncList::builder("all")
636                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
637            )
638            .build()
639            .await?;
640
641        // Receive a (mocked) response.
642        {
643            let server_response = assign!(http::Response::new("0".to_owned()), {
644                rooms: BTreeMap::from([(
645                    room_id.to_owned(),
646                    http::response::Room::default(),
647                )]),
648            });
649
650            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
651
652            sliding_sync
653                .handle_response(
654                    server_response.clone(),
655                    &mut pos_guard,
656                    RequestedRequiredStates::default(),
657                )
658                .await?;
659        }
660
661        // The room still exists.
662        assert!(client.get_room(room_id).is_some());
663
664        // The latest event “listener” for this room has been enabled.
665        assert!(client.latest_events().await.is_listening_to_room(room_id).await);
666
667        Ok(())
668    }
669
670    #[async_test]
671    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
672        use ruma::api::client::sync::sync_events::v5 as http;
673
674        // Given a logged-in client.
675        let client = MockClientBuilder::new(None).build().await;
676        client.event_cache().subscribe().unwrap();
677
678        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
679
680        // When I send sliding sync response containing a new room.
681        let room_id = room_id!("!r:e.uk");
682        let room = http::response::Room::new();
683        let mut response = http::Response::new("5".to_owned());
684        response.rooms.insert(room_id.to_owned(), room);
685
686        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
687        processor
688            .handle_room_response(&response, &RequestedRequiredStates::default())
689            .await
690            .expect("Failed to process sync");
691        processor.process_and_take_response().await.expect("Failed to finish processing sync");
692
693        // Then room info notable updates are received.
694        assert_matches!(
695            room_info_notable_update_stream.recv().await,
696            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
697                assert_eq!(received_room_id, room_id);
698                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
699            }
700        );
701        assert_matches!(
702            room_info_notable_update_stream.recv().await,
703            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
704                assert_eq!(received_room_id, room_id);
705                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
706            }
707        );
708        assert!(room_info_notable_update_stream.is_empty());
709
710        // When I send sliding sync response containing a couple of events with no read
711        // receipt.
712        let room_id = room_id!("!r:e.uk");
713        let f = EventFactory::new().room(room_id).sender(user_id!("@u:h.uk"));
714        let events = vec![
715            f.text_msg("hi").event_id(event_id!("$3")).into_raw_sync(),
716            f.text_msg("hi").event_id(event_id!("$4")).into_raw_sync(),
717        ];
718        let room = assign!(http::response::Room::new(), {
719            timeline: events,
720        });
721        let mut response = http::Response::new("5".to_owned());
722        response.rooms.insert(room_id.to_owned(), room);
723
724        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
725        processor
726            .handle_room_response(&response, &RequestedRequiredStates::default())
727            .await
728            .expect("Failed to process sync");
729        processor.process_and_take_response().await.expect("Failed to finish processing sync");
730
731        // Then room info notable updates are received.
732        //
733        // `NONE` because the regular sync process ends up to updating nothing.
734        assert_matches!(
735            room_info_notable_update_stream.recv().await,
736            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
737                assert_eq!(received_room_id, room_id);
738                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
739            }
740        );
741        // `READ_RECEIPT` because this is what we expect.
742        assert_matches!(
743            room_info_notable_update_stream.recv().await,
744            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
745                assert_eq!(received_room_id, room_id);
746                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
747            }
748        );
749
750        // At some point, we receive an update for the `LATEST_EVENT` too, since this is
751        // enabled by default.
752        assert_matches!(
753            room_info_notable_update_stream.recv().await,
754            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
755                assert_eq!(received_room_id, room_id);
756                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT), "{received_reasons:?}");
757            }
758        );
759
760        // And another one.
761        // TODO: maybe something to investigate why we receive two in a row?
762        assert_matches!(
763            room_info_notable_update_stream.recv().await,
764            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
765                assert_eq!(received_room_id, room_id);
766                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT), "{received_reasons:?}");
767            }
768        );
769
770        yield_now().await;
771
772        // Then the stream gets quiet.
773        assert!(room_info_notable_update_stream.is_empty());
774    }
775}