matrix_sdk_ui/room_list_service/mod.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//! list. Its goal is to load all the user' rooms. It starts with a
37//! [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//! set of rooms) to load the first rooms quickly, and then updates to a
39//! [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//! background”: it will sync the existing rooms and will fetch new rooms, by
41//! a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room_list;
56pub mod sorters;
57mod state;
58
59use std::{sync::Arc, time::Duration};
60
61use async_stream::stream;
62use eyeball::Subscriber;
63use futures_util::{Stream, StreamExt, pin_mut};
64use matrix_sdk::{
65 Client, Error as SlidingSyncError, Room, SlidingSync, SlidingSyncList, SlidingSyncMode,
66 event_cache::EventCacheError, sliding_sync::PollTimeout, timeout::timeout,
67};
68pub use room_list::*;
69use ruma::{
70 OwnedRoomId, RoomId, UInt,
71 api::{FeatureFlag, client::sync::sync_events::v5 as http},
72 assign,
73 events::StateEventType,
74};
75pub use state::*;
76use thiserror::Error;
77use tracing::{debug, error, warn};
78
79/// The default `required_state` constant value for sliding sync lists and
80/// sliding sync room subscriptions.
81const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
82 (StateEventType::RoomName, ""),
83 (StateEventType::RoomEncryption, ""),
84 (StateEventType::RoomMember, "$LAZY"),
85 (StateEventType::RoomMember, "$ME"),
86 (StateEventType::RoomTopic, ""),
87 // Temporary workaround for https://github.com/matrix-org/matrix-rust-sdk/issues/5285
88 (StateEventType::RoomAvatar, ""),
89 (StateEventType::RoomCanonicalAlias, ""),
90 (StateEventType::RoomPowerLevels, ""),
91 (StateEventType::CallMember, "*"),
92 (StateEventType::RoomJoinRules, ""),
93 (StateEventType::RoomTombstone, ""),
94 // Those two events are required to properly compute room previews.
95 // `StateEventType::RoomCreate` is also necessary to compute the room
96 // version, and thus handling the tombstoned room correctly.
97 (StateEventType::RoomCreate, ""),
98 (StateEventType::RoomHistoryVisibility, ""),
99 // Required to correctly calculate the room display name.
100 (StateEventType::MemberHints, ""),
101 (StateEventType::SpaceParent, "*"),
102 (StateEventType::SpaceChild, "*"),
103];
104
105/// The default `required_state` constant value for sliding sync room
106/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
107const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
108 &[(StateEventType::RoomPinnedEvents, "")];
109
110/// The default `timeline_limit` value when used with room subscriptions.
111const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
112
113/// The [`RoomListService`] type. See the module's documentation to learn more.
114#[derive(Debug)]
115pub struct RoomListService {
116 /// Client that has created this [`RoomListService`].
117 client: Client,
118
119 /// The Sliding Sync instance.
120 sliding_sync: Arc<SlidingSync>,
121
122 /// The current state of the `RoomListService`.
123 ///
124 /// `RoomListService` is a simple state-machine.
125 state_machine: StateMachine,
126}
127
128impl RoomListService {
129 /// Create a new `RoomList`.
130 ///
131 /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
132 /// already pre-configured.
133 ///
134 /// This won't start an encryption sync, and it's the user's responsibility
135 /// to create one in this case using
136 /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
137 pub async fn new(client: Client) -> Result<Self, Error> {
138 Self::new_with_share_pos(client, true).await
139 }
140
141 /// Like [`RoomListService::new`] but with a flag to turn the
142 /// [`SlidingSyncBuilder::share_pos`] on and off.
143 ///
144 /// [`SlidingSyncBuilder::share_pos`]: matrix_sdk::sliding_sync::SlidingSyncBuilder::share_pos
145 pub async fn new_with_share_pos(client: Client, share_pos: bool) -> Result<Self, Error> {
146 let mut builder = client
147 .sliding_sync("room-list")
148 .map_err(Error::SlidingSync)?
149 .with_account_data_extension(
150 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
151 )
152 .with_receipt_extension(assign!(http::request::Receipts::default(), {
153 enabled: Some(true),
154 rooms: Some(vec![http::request::ExtensionRoomConfig::AllSubscribed])
155 }))
156 .with_typing_extension(assign!(http::request::Typing::default(), {
157 enabled: Some(true),
158 }));
159
160 if client.enabled_thread_subscriptions() {
161 let server_features = if let Some(cached) = client
162 .supported_versions_cached()
163 .await
164 .map_err(|e| Error::SlidingSync(e.into()))?
165 {
166 cached.features
167 } else {
168 client
169 .fetch_server_versions(None)
170 .await
171 .map_err(|e| Error::SlidingSync(e.into()))?
172 .as_supported_versions()
173 .features
174 };
175
176 if !server_features.contains(&FeatureFlag::from("org.matrix.msc4306")) {
177 warn!(
178 "Thread subscriptions extension is requested on the client, but the server doesn't advertise support for it: not enabling."
179 );
180 } else {
181 debug!("Enabling the thread subscriptions extension");
182 builder = builder.with_thread_subscriptions_extension(
183 assign!(http::request::ThreadSubscriptions::default(), {
184 enabled: Some(true),
185 limit: Some(ruma::uint!(10))
186 }),
187 );
188 }
189 }
190
191 if share_pos {
192 // The e2ee extensions aren't enabled in this sliding sync instance, and this is
193 // the only one that could be used from a different process. So it's
194 // fine to enable position sharing (i.e. reloading it from disk),
195 // since it's always exclusively owned by the current process.
196 debug!("Enabling `share_pos` for the room list sliding sync");
197 builder = builder.share_pos();
198 }
199
200 let state_machine = StateMachine::new();
201 let observable_state = state_machine.cloned_state();
202
203 let sliding_sync = builder
204 .add_cached_list(
205 SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
206 .sync_mode(
207 SlidingSyncMode::new_selective()
208 .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
209 )
210 .timeline_limit(1)
211 .required_state(
212 DEFAULT_REQUIRED_STATE
213 .iter()
214 .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
215 .collect(),
216 )
217 .filters(Some(assign!(http::request::ListFilters::default(), {
218 // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
219 // If unset, both invited and joined rooms are returned. If false, no invited rooms are
220 // returned. If true, only invited rooms are returned.
221 is_invite: None,
222 })))
223 .requires_timeout(move |request_generator| {
224 // We want Sliding Sync to apply the poll + network timeout —i.e. to do the
225 // long-polling— in some particular cases. Let's define them.
226 match observable_state.get() {
227 // These are the states where we want an immediate response from the
228 // server, with no long-polling.
229 State::Init
230 | State::SettingUp
231 | State::Recovering
232 | State::Error { .. }
233 | State::Terminated { .. } => PollTimeout::Some(0),
234
235 // Otherwise we want long-polling if the list is fully-loaded.
236 State::Running => {
237 if request_generator.is_fully_loaded() {
238 // Long-polling.
239 PollTimeout::Default
240 } else {
241 // No long-polling yet.
242 PollTimeout::Some(0)
243 }
244 }
245 }
246 }),
247 )
248 .await
249 .map_err(Error::SlidingSync)?
250 .build()
251 .await
252 .map(Arc::new)
253 .map_err(Error::SlidingSync)?;
254
255 // Eagerly subscribe the event cache to sync responses.
256 client.event_cache().subscribe()?;
257
258 Ok(Self { client, sliding_sync, state_machine })
259 }
260
261 /// Start to sync the room list.
262 ///
263 /// It's the main method of this entire API. Calling `sync` allows to
264 /// receive updates on the room list: new rooms, rooms updates etc. Those
265 /// updates can be read with `RoomList::entries` for example. This method
266 /// returns a [`Stream`] where produced items only hold an empty value
267 /// in case of a sync success, otherwise an error.
268 ///
269 /// The `RoomListService`' state machine is run by this method.
270 ///
271 /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
272 /// calling [`Self::sync`] again will resume from the previous state of
273 /// the state machine.
274 ///
275 /// This should be used only for testing. In practice, most users should be
276 /// using the [`SyncService`](crate::sync_service::SyncService) instead.
277 #[doc(hidden)]
278 pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
279 stream! {
280 let sync = self.sliding_sync.sync();
281 pin_mut!(sync);
282
283 // This is a state machine implementation.
284 // Things happen in this order:
285 //
286 // 1. The next state is calculated,
287 // 2. The actions associated to the next state are run,
288 // 3. A sync is done,
289 // 4. The next state is stored.
290 loop {
291 debug!("Run a sync iteration");
292
293 // Calculate the next state, and run the associated actions.
294 let next_state = self.state_machine.next(&self.sliding_sync).await?;
295
296 // Do the sync.
297 match sync.next().await {
298 // Got a successful result while syncing.
299 Some(Ok(_update_summary)) => {
300 debug!(state = ?next_state, "New state");
301
302 // Update the state.
303 self.state_machine.set(next_state);
304
305 yield Ok(());
306 }
307
308 // Got an error while syncing.
309 Some(Err(error)) => {
310 debug!(expected_state = ?next_state, "New state is an error");
311
312 let next_state = State::Error { from: Box::new(next_state) };
313 self.state_machine.set(next_state);
314
315 yield Err(Error::SlidingSync(error));
316
317 break;
318 }
319
320 // Sync loop has terminated.
321 None => {
322 debug!(expected_state = ?next_state, "New state is a termination");
323
324 let next_state = State::Terminated { from: Box::new(next_state) };
325 self.state_machine.set(next_state);
326
327 break;
328 }
329 }
330 }
331 }
332 }
333
334 /// Force to stop the sync of the `RoomListService` started by
335 /// [`Self::sync`].
336 ///
337 /// It's of utter importance to call this method rather than stop polling
338 /// the `Stream` returned by [`Self::sync`] because it will force the
339 /// cancellation and exit the sync loop, i.e. it will cancel any
340 /// in-flight HTTP requests, cancel any pending futures etc. and put the
341 /// service into a termination state.
342 ///
343 /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
344 /// until it returns `None`, because of [`Self::stop_sync`], so that it
345 /// ensures the states are correctly placed.
346 ///
347 /// Stopping the sync of the room list via this method will put the
348 /// state-machine into the [`State::Terminated`] state.
349 ///
350 /// This should be used only for testing. In practice, most users should be
351 /// using the [`SyncService`](crate::sync_service::SyncService) instead.
352 #[doc(hidden)]
353 pub fn stop_sync(&self) -> Result<(), Error> {
354 self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
355 }
356
357 /// Force the sliding sync session to expire.
358 ///
359 /// This is used by [`SyncService`](crate::sync_service::SyncService).
360 ///
361 /// **Warning**: This method **must not** be called while the sync loop is
362 /// running!
363 pub(crate) async fn expire_sync_session(&self) {
364 self.sliding_sync.expire_session().await;
365
366 // Usually, when the session expires, it leads the state to be `Error`,
367 // thus some actions (like refreshing the lists) are executed. However,
368 // if the sync loop has been stopped manually, the state is `Terminated`, and
369 // when the session is forced to expire, the state remains `Terminated`, thus
370 // the actions aren't executed as expected. Consequently, let's update the
371 // state.
372 if let State::Terminated { from } = self.state_machine.get() {
373 self.state_machine.set(State::Error { from });
374 }
375 }
376
377 /// Get a [`Stream`] of [`SyncIndicator`].
378 ///
379 /// Read the documentation of [`SyncIndicator`] to learn more about it.
380 pub fn sync_indicator(
381 &self,
382 delay_before_showing: Duration,
383 delay_before_hiding: Duration,
384 ) -> impl Stream<Item = SyncIndicator> + use<> {
385 let mut state = self.state();
386
387 stream! {
388 // Ensure the `SyncIndicator` is always hidden to start with.
389 yield SyncIndicator::Hide;
390
391 // Let's not wait for an update to happen. The `SyncIndicator` must be
392 // computed as fast as possible.
393 let mut current_state = state.next_now();
394
395 loop {
396 let (sync_indicator, yield_delay) = match current_state {
397 State::SettingUp | State::Error { .. } => {
398 (SyncIndicator::Show, delay_before_showing)
399 }
400
401 State::Init | State::Recovering | State::Running | State::Terminated { .. } => {
402 (SyncIndicator::Hide, delay_before_hiding)
403 }
404 };
405
406 // `state.next().await` has a maximum of `yield_delay` time to execute…
407 let next_state = match timeout(state.next(), yield_delay).await {
408 // A new state has been received before `yield_delay` time. The new
409 // `sync_indicator` value won't be yielded.
410 Ok(next_state) => next_state,
411
412 // No new state has been received before `yield_delay` time. The
413 // `sync_indicator` value can be yielded.
414 Err(_) => {
415 yield sync_indicator;
416
417 // Now that `sync_indicator` has been yielded, let's wait on
418 // the next state again.
419 state.next().await
420 }
421 };
422
423 if let Some(next_state) = next_state {
424 // Update the `current_state`.
425 current_state = next_state;
426 } else {
427 // Something is broken with the state. Let's stop this stream too.
428 break;
429 }
430 }
431 }
432 }
433
434 /// Get the [`Client`] that has been used to create [`Self`].
435 pub fn client(&self) -> &Client {
436 &self.client
437 }
438
439 /// Get a subscriber to the state.
440 pub fn state(&self) -> Subscriber<State> {
441 self.state_machine.subscribe()
442 }
443
444 async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
445 RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
446 }
447
448 /// Get a [`RoomList`] for all rooms.
449 pub async fn all_rooms(&self) -> Result<RoomList, Error> {
450 self.list_for(ALL_ROOMS_LIST_NAME).await
451 }
452
453 /// Get a [`Room`] if it exists.
454 pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
455 self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))
456 }
457
458 /// Subscribe to rooms.
459 ///
460 /// It means that all events from these rooms will be received every time,
461 /// no matter how the `RoomList` is configured.
462 ///
463 /// [`LatestEvents::listen_to_room`][listen_to_room] will be called for each
464 /// room in `room_ids`, so that the [`LatestEventValue`] will automatically
465 /// be calculated and updated for these rooms, for free.
466 ///
467 /// All previous room subscriptions will be forgotten.
468 ///
469 /// [listen_to_room]: matrix_sdk::latest_events::LatestEvents::listen_to_room
470 /// [`LatestEventValue`]: matrix_sdk::latest_events::LatestEventValue
471 pub async fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
472 // Calculate the settings for the room subscriptions.
473 let settings = assign!(http::request::RoomSubscription::default(), {
474 required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
475 (state_event.clone(), (*value).to_owned())
476 })
477 .chain(
478 DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
479 (state_event.clone(), (*value).to_owned())
480 })
481 )
482 .collect(),
483 timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
484 });
485
486 // Decide whether the in-flight request (if any) should be cancelled if needed.
487 let cancel_in_flight_request = match self.state_machine.get() {
488 State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
489 false
490 }
491 State::SettingUp | State::Running => true,
492 };
493
494 // Before subscribing, let's listen these rooms to calculate their latest
495 // events.
496 if self.client.event_cache().has_subscribed() {
497 let latest_events = self.client.latest_events().await;
498
499 for room_id in room_ids {
500 if let Err(error) = latest_events.listen_to_room(room_id).await {
501 // Let's not fail the room subscription. Instead, emit a log because it's very
502 // unlikely to happen.
503 error!(?error, ?room_id, "Failed to listen to the latest event for this room");
504 }
505 }
506 }
507
508 // Subscribe to the rooms.
509 self.sliding_sync.clear_and_subscribe_to_rooms(
510 room_ids,
511 Some(settings),
512 cancel_in_flight_request,
513 )
514 }
515
516 #[cfg(test)]
517 pub fn sliding_sync(&self) -> &SlidingSync {
518 &self.sliding_sync
519 }
520}
521
522/// [`RoomList`]'s errors.
523#[derive(Debug, Error)]
524pub enum Error {
525 /// Error from [`matrix_sdk::SlidingSync`].
526 #[error(transparent)]
527 SlidingSync(SlidingSyncError),
528
529 /// An operation has been requested on an unknown list.
530 #[error("Unknown list `{0}`")]
531 UnknownList(String),
532
533 /// The requested room doesn't exist.
534 #[error("Room `{0}` not found")]
535 RoomNotFound(OwnedRoomId),
536
537 #[error(transparent)]
538 EventCache(#[from] EventCacheError),
539}
540
541/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
542/// user, indicating that the [`RoomListService`] is syncing.
543///
544/// This is entirely arbitrary and optinionated. Of course, once
545/// [`RoomListService::sync`] has been called, it's going to be constantly
546/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
547/// happened. But in some cases, it's better for the user experience to prompt
548/// to the user that a sync is happening. It's usually the first sync, or the
549/// recovering sync. However, the sync indicator must be prompted if the
550/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
551/// pretty fast, which can be very confusing. It's also common to indicate to
552/// the user that a syncing is happening in case of a network error, that
553/// something is catching up etc.
554#[derive(Debug, Eq, PartialEq)]
555pub enum SyncIndicator {
556 /// Show the sync indicator.
557 Show,
558
559 /// Hide the sync indicator.
560 Hide,
561}
562
563#[cfg(test)]
564mod tests {
565 use std::future::ready;
566
567 use futures_util::{StreamExt, pin_mut};
568 use matrix_sdk::{
569 Client, SlidingSyncMode, config::RequestConfig, test_utils::client::mock_matrix_session,
570 };
571 use matrix_sdk_test::async_test;
572 use ruma::api::MatrixVersion;
573 use serde_json::json;
574 use wiremock::{Match, Mock, MockServer, Request, ResponseTemplate, http::Method};
575
576 use super::{ALL_ROOMS_LIST_NAME, Error, RoomListService, State};
577
578 async fn new_client() -> (Client, MockServer) {
579 let session = mock_matrix_session();
580
581 let server = MockServer::start().await;
582 let client = Client::builder()
583 .homeserver_url(server.uri())
584 .server_versions([MatrixVersion::V1_0])
585 .request_config(RequestConfig::new().disable_retry())
586 .build()
587 .await
588 .unwrap();
589 client.restore_session(session).await.unwrap();
590
591 (client, server)
592 }
593
594 pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
595 let (client, _) = new_client().await;
596
597 RoomListService::new(client).await
598 }
599
600 struct SlidingSyncMatcher;
601
602 impl Match for SlidingSyncMatcher {
603 fn matches(&self, request: &Request) -> bool {
604 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
605 && request.method == Method::POST
606 }
607 }
608
609 #[async_test]
610 async fn test_all_rooms_are_declared() -> Result<(), Error> {
611 let room_list = new_room_list().await?;
612 let sliding_sync = room_list.sliding_sync();
613
614 // List is present, in Selective mode.
615 assert_eq!(
616 sliding_sync
617 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
618 list.sync_mode(),
619 SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
620 )))
621 .await,
622 Some(true)
623 );
624
625 Ok(())
626 }
627
628 #[async_test]
629 async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
630 let (client, server) = new_client().await;
631
632 let room_list = RoomListService::new(client).await?;
633
634 let sync = room_list.sync();
635 pin_mut!(sync);
636
637 // Run a first sync.
638 {
639 let _mock_guard = Mock::given(SlidingSyncMatcher)
640 .respond_with(move |_request: &Request| {
641 ResponseTemplate::new(200).set_body_json(json!({
642 "pos": "0",
643 "lists": {
644 ALL_ROOMS_LIST_NAME: {
645 "count": 0,
646 "ops": [],
647 },
648 },
649 "rooms": {},
650 }))
651 })
652 .mount_as_scoped(&server)
653 .await;
654
655 let _ = sync.next().await;
656 }
657
658 assert_eq!(room_list.state().get(), State::SettingUp);
659
660 // Stop the sync.
661 room_list.stop_sync()?;
662
663 // Do another sync.
664 let _ = sync.next().await;
665
666 // State is `Terminated`, as expected!
667 assert_eq!(
668 room_list.state_machine.get(),
669 State::Terminated { from: Box::new(State::Running) }
670 );
671
672 // Now, let's make the sliding sync session to expire.
673 room_list.expire_sync_session().await;
674
675 // State is `Error`, as a regular session expiration would generate!
676 assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
677
678 Ok(())
679 }
680}