matrix_sdk_ui/room_list_service/mod.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//! list. Its goal is to load all the user' rooms. It starts with a
37//! [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//! set of rooms) to load the first rooms quickly, and then updates to a
39//! [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//! background”: it will sync the existing rooms and will fetch new rooms, by
41//! a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room;
56mod room_list;
57pub mod sorters;
58mod state;
59
60use std::{sync::Arc, time::Duration};
61
62use async_stream::stream;
63use eyeball::Subscriber;
64use futures_util::{pin_mut, Stream, StreamExt};
65use matrix_sdk::{
66 event_cache::EventCacheError, timeout::timeout, Client, Error as SlidingSyncError, SlidingSync,
67 SlidingSyncList, SlidingSyncMode,
68};
69pub use room::*;
70pub use room_list::*;
71use ruma::{
72 api::client::sync::sync_events::v5 as http, assign, directory::RoomTypeFilter,
73 events::StateEventType, OwnedRoomId, RoomId, UInt,
74};
75pub use state::*;
76use thiserror::Error;
77use tracing::debug;
78
79use crate::timeline;
80
81/// The default `required_state` constant value for sliding sync lists and
82/// sliding sync room subscriptions.
83const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
84 (StateEventType::RoomName, ""),
85 (StateEventType::RoomEncryption, ""),
86 (StateEventType::RoomMember, "$LAZY"),
87 (StateEventType::RoomMember, "$ME"),
88 (StateEventType::RoomTopic, ""),
89 (StateEventType::RoomCanonicalAlias, ""),
90 (StateEventType::RoomPowerLevels, ""),
91 (StateEventType::CallMember, "*"),
92 (StateEventType::RoomJoinRules, ""),
93 // Those two events are required to properly compute room previews.
94 (StateEventType::RoomCreate, ""),
95 (StateEventType::RoomHistoryVisibility, ""),
96 // Required to correctly calculate the room display name.
97 (StateEventType::MemberHints, ""),
98];
99
100/// The default `required_state` constant value for sliding sync room
101/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
102const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
103 &[(StateEventType::RoomPinnedEvents, "")];
104
105/// The default `timeline_limit` value when used with room subscriptions.
106const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
107
108/// The [`RoomListService`] type. See the module's documentation to learn more.
109#[derive(Debug)]
110pub struct RoomListService {
111 /// Client that has created this [`RoomListService`].
112 client: Client,
113
114 /// The Sliding Sync instance.
115 sliding_sync: Arc<SlidingSync>,
116
117 /// The current state of the `RoomListService`.
118 ///
119 /// `RoomListService` is a simple state-machine.
120 state_machine: StateMachine,
121}
122
123impl RoomListService {
124 /// Create a new `RoomList`.
125 ///
126 /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
127 /// already pre-configured.
128 ///
129 /// This won't start an encryption sync, and it's the user's responsibility
130 /// to create one in this case using
131 /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
132 pub async fn new(client: Client) -> Result<Self, Error> {
133 let builder = client
134 .sliding_sync("room-list")
135 .map_err(Error::SlidingSync)?
136 .with_account_data_extension(
137 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
138 )
139 .with_receipt_extension(assign!(http::request::Receipts::default(), {
140 enabled: Some(true),
141 rooms: Some(vec![http::request::ReceiptsRoom::AllSubscribed])
142 }))
143 .with_typing_extension(assign!(http::request::Typing::default(), {
144 enabled: Some(true),
145 }));
146 // TODO: Re-enable once we know it creates slowness.
147 // // We don't deal with encryption device messages here so this is safe
148 // .share_pos();
149
150 let sliding_sync = builder
151 .add_cached_list(
152 SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
153 .sync_mode(
154 SlidingSyncMode::new_selective()
155 .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
156 )
157 .timeline_limit(1)
158 .required_state(
159 DEFAULT_REQUIRED_STATE
160 .iter()
161 .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
162 .collect(),
163 )
164 .include_heroes(Some(true))
165 .filters(Some(assign!(http::request::ListFilters::default(), {
166 // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
167 // If unset, both invited and joined rooms are returned. If false, no invited rooms are
168 // returned. If true, only invited rooms are returned.
169 is_invite: None,
170 not_room_types: vec![RoomTypeFilter::Space],
171 }))),
172 )
173 .await
174 .map_err(Error::SlidingSync)?
175 .build()
176 .await
177 .map(Arc::new)
178 .map_err(Error::SlidingSync)?;
179
180 // Eagerly subscribe the event cache to sync responses.
181 client.event_cache().subscribe()?;
182
183 Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
184 }
185
186 /// Start to sync the room list.
187 ///
188 /// It's the main method of this entire API. Calling `sync` allows to
189 /// receive updates on the room list: new rooms, rooms updates etc. Those
190 /// updates can be read with [`RoomList::entries`] for example. This method
191 /// returns a [`Stream`] where produced items only hold an empty value
192 /// in case of a sync success, otherwise an error.
193 ///
194 /// The `RoomListService`' state machine is run by this method.
195 ///
196 /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
197 /// calling [`Self::sync`] again will resume from the previous state of
198 /// the state machine.
199 ///
200 /// This should be used only for testing. In practice, most users should be
201 /// using the [`SyncService`] instead.
202 #[doc(hidden)]
203 pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
204 stream! {
205 let sync = self.sliding_sync.sync();
206 pin_mut!(sync);
207
208 // This is a state machine implementation.
209 // Things happen in this order:
210 //
211 // 1. The next state is calculated,
212 // 2. The actions associated to the next state are run,
213 // 3. A sync is done,
214 // 4. The next state is stored.
215 loop {
216 debug!("Run a sync iteration");
217
218 // Calculate the next state, and run the associated actions.
219 let next_state = self.state_machine.next(&self.sliding_sync).await?;
220
221 // Do the sync.
222 match sync.next().await {
223 // Got a successful result while syncing.
224 Some(Ok(_update_summary)) => {
225 debug!(state = ?next_state, "New state");
226
227 // Update the state.
228 self.state_machine.set(next_state);
229
230 yield Ok(());
231 }
232
233 // Got an error while syncing.
234 Some(Err(error)) => {
235 debug!(expected_state = ?next_state, "New state is an error");
236
237 let next_state = State::Error { from: Box::new(next_state) };
238 self.state_machine.set(next_state);
239
240 yield Err(Error::SlidingSync(error));
241
242 break;
243 }
244
245 // Sync loop has terminated.
246 None => {
247 debug!(expected_state = ?next_state, "New state is a termination");
248
249 let next_state = State::Terminated { from: Box::new(next_state) };
250 self.state_machine.set(next_state);
251
252 break;
253 }
254 }
255 }
256 }
257 }
258
259 /// Force to stop the sync of the `RoomListService` started by
260 /// [`Self::sync`].
261 ///
262 /// It's of utter importance to call this method rather than stop polling
263 /// the `Stream` returned by [`Self::sync`] because it will force the
264 /// cancellation and exit the sync loop, i.e. it will cancel any
265 /// in-flight HTTP requests, cancel any pending futures etc. and put the
266 /// service into a termination state.
267 ///
268 /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
269 /// until it returns `None`, because of [`Self::stop_sync`], so that it
270 /// ensures the states are correctly placed.
271 ///
272 /// Stopping the sync of the room list via this method will put the
273 /// state-machine into the [`State::Terminated`] state.
274 ///
275 /// This should be used only for testing. In practice, most users should be
276 /// using the [`SyncService`] instead.
277 #[doc(hidden)]
278 pub fn stop_sync(&self) -> Result<(), Error> {
279 self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
280 }
281
282 /// Force the sliding sync session to expire.
283 ///
284 /// This is used by [`SyncService`][crate::SyncService].
285 ///
286 /// **Warning**: This method **must not** be called while the sync loop is
287 /// running!
288 pub(crate) async fn expire_sync_session(&self) {
289 self.sliding_sync.expire_session().await;
290
291 // Usually, when the session expires, it leads the state to be `Error`,
292 // thus some actions (like refreshing the lists) are executed. However,
293 // if the sync loop has been stopped manually, the state is `Terminated`, and
294 // when the session is forced to expire, the state remains `Terminated`, thus
295 // the actions aren't executed as expected. Consequently, let's update the
296 // state.
297 if let State::Terminated { from } = self.state_machine.get() {
298 self.state_machine.set(State::Error { from });
299 }
300 }
301
302 /// Get a [`Stream`] of [`SyncIndicator`].
303 ///
304 /// Read the documentation of [`SyncIndicator`] to learn more about it.
305 pub fn sync_indicator(
306 &self,
307 delay_before_showing: Duration,
308 delay_before_hiding: Duration,
309 ) -> impl Stream<Item = SyncIndicator> {
310 let mut state = self.state();
311
312 stream! {
313 // Ensure the `SyncIndicator` is always hidden to start with.
314 yield SyncIndicator::Hide;
315
316 // Let's not wait for an update to happen. The `SyncIndicator` must be
317 // computed as fast as possible.
318 let mut current_state = state.next_now();
319
320 loop {
321 let (sync_indicator, yield_delay) = match current_state {
322 State::Init | State::Error { .. } => {
323 (SyncIndicator::Show, delay_before_showing)
324 }
325
326 State::SettingUp | State::Recovering | State::Running | State::Terminated { .. } => {
327 (SyncIndicator::Hide, delay_before_hiding)
328 }
329 };
330
331 // `state.next().await` has a maximum of `yield_delay` time to execute…
332 let next_state = match timeout(state.next(), yield_delay).await {
333 // A new state has been received before `yield_delay` time. The new
334 // `sync_indicator` value won't be yielded.
335 Ok(next_state) => next_state,
336
337 // No new state has been received before `yield_delay` time. The
338 // `sync_indicator` value can be yielded.
339 Err(_) => {
340 yield sync_indicator;
341
342 // Now that `sync_indicator` has been yielded, let's wait on
343 // the next state again.
344 state.next().await
345 }
346 };
347
348 if let Some(next_state) = next_state {
349 // Update the `current_state`.
350 current_state = next_state;
351 } else {
352 // Something is broken with the state. Let's stop this stream too.
353 break;
354 }
355 }
356 }
357 }
358
359 /// Get the [`Client`] that has been used to create [`Self`].
360 pub fn client(&self) -> &Client {
361 &self.client
362 }
363
364 /// Get a subscriber to the state.
365 pub fn state(&self) -> Subscriber<State> {
366 self.state_machine.subscribe()
367 }
368
369 async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
370 RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
371 }
372
373 /// Get a [`RoomList`] for all rooms.
374 pub async fn all_rooms(&self) -> Result<RoomList, Error> {
375 self.list_for(ALL_ROOMS_LIST_NAME).await
376 }
377
378 /// Get a [`Room`] if it exists.
379 pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
380 Ok(Room::new(
381 self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))?,
382 &self.sliding_sync,
383 ))
384 }
385
386 /// Subscribe to rooms.
387 ///
388 /// It means that all events from these rooms will be received every time,
389 /// no matter how the `RoomList` is configured.
390 pub fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
391 let settings = assign!(http::request::RoomSubscription::default(), {
392 required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
393 (state_event.clone(), (*value).to_owned())
394 })
395 .chain(
396 DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
397 (state_event.clone(), (*value).to_owned())
398 })
399 )
400 .collect(),
401 timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
402 });
403
404 let cancel_in_flight_request = match self.state_machine.get() {
405 State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
406 false
407 }
408 State::SettingUp | State::Running => true,
409 };
410
411 self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
412 }
413
414 #[cfg(test)]
415 pub fn sliding_sync(&self) -> &SlidingSync {
416 &self.sliding_sync
417 }
418}
419
420/// [`RoomList`]'s errors.
421#[derive(Debug, Error)]
422pub enum Error {
423 /// Error from [`matrix_sdk::SlidingSync`].
424 #[error(transparent)]
425 SlidingSync(SlidingSyncError),
426
427 /// An operation has been requested on an unknown list.
428 #[error("Unknown list `{0}`")]
429 UnknownList(String),
430
431 /// The requested room doesn't exist.
432 #[error("Room `{0}` not found")]
433 RoomNotFound(OwnedRoomId),
434
435 #[error("A timeline instance already exists for room {0}")]
436 TimelineAlreadyExists(OwnedRoomId),
437
438 #[error(transparent)]
439 InitializingTimeline(#[from] timeline::Error),
440
441 #[error(transparent)]
442 EventCache(#[from] EventCacheError),
443}
444
445/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
446/// user, indicating that the [`RoomListService`] is syncing.
447///
448/// This is entirely arbitrary and optinionated. Of course, once
449/// [`RoomListService::sync`] has been called, it's going to be constantly
450/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
451/// happened. But in some cases, it's better for the user experience to prompt
452/// to the user that a sync is happening. It's usually the first sync, or the
453/// recovering sync. However, the sync indicator must be prompted if the
454/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
455/// pretty fast, which can be very confusing. It's also common to indicate to
456/// the user that a syncing is happening in case of a network error, that
457/// something is catching up etc.
458#[derive(Debug, Eq, PartialEq)]
459pub enum SyncIndicator {
460 /// Show the sync indicator.
461 Show,
462
463 /// Hide the sync indicator.
464 Hide,
465}
466
467#[cfg(test)]
468mod tests {
469 use std::future::ready;
470
471 use futures_util::{pin_mut, StreamExt};
472 use matrix_sdk::{
473 authentication::matrix::{MatrixSession, MatrixSessionTokens},
474 config::RequestConfig,
475 Client, SlidingSyncMode,
476 };
477 use matrix_sdk_base::SessionMeta;
478 use matrix_sdk_test::async_test;
479 use ruma::{api::MatrixVersion, device_id, user_id};
480 use serde_json::json;
481 use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
482
483 use super::{Error, RoomListService, State, ALL_ROOMS_LIST_NAME};
484
485 async fn new_client() -> (Client, MockServer) {
486 let session = MatrixSession {
487 meta: SessionMeta {
488 user_id: user_id!("@example:localhost").to_owned(),
489 device_id: device_id!("DEVICEID").to_owned(),
490 },
491 tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
492 };
493
494 let server = MockServer::start().await;
495 let client = Client::builder()
496 .homeserver_url(server.uri())
497 .server_versions([MatrixVersion::V1_0])
498 .request_config(RequestConfig::new().disable_retry())
499 .build()
500 .await
501 .unwrap();
502 client.restore_session(session).await.unwrap();
503
504 (client, server)
505 }
506
507 pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
508 let (client, _) = new_client().await;
509
510 RoomListService::new(client).await
511 }
512
513 struct SlidingSyncMatcher;
514
515 impl Match for SlidingSyncMatcher {
516 fn matches(&self, request: &Request) -> bool {
517 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
518 && request.method == Method::POST
519 }
520 }
521
522 #[async_test]
523 async fn test_all_rooms_are_declared() -> Result<(), Error> {
524 let room_list = new_room_list().await?;
525 let sliding_sync = room_list.sliding_sync();
526
527 // List is present, in Selective mode.
528 assert_eq!(
529 sliding_sync
530 .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
531 list.sync_mode(),
532 SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
533 )))
534 .await,
535 Some(true)
536 );
537
538 Ok(())
539 }
540
541 #[async_test]
542 async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
543 let (client, server) = new_client().await;
544
545 let room_list = RoomListService::new(client).await?;
546
547 let sync = room_list.sync();
548 pin_mut!(sync);
549
550 // Run a first sync.
551 {
552 let _mock_guard = Mock::given(SlidingSyncMatcher)
553 .respond_with(move |_request: &Request| {
554 ResponseTemplate::new(200).set_body_json(json!({
555 "pos": "0",
556 "lists": {
557 ALL_ROOMS_LIST_NAME: {
558 "count": 0,
559 "ops": [],
560 },
561 },
562 "rooms": {},
563 }))
564 })
565 .mount_as_scoped(&server)
566 .await;
567
568 let _ = sync.next().await;
569 }
570
571 assert_eq!(room_list.state().get(), State::SettingUp);
572
573 // Stop the sync.
574 room_list.stop_sync()?;
575
576 // Do another sync.
577 let _ = sync.next().await;
578
579 // State is `Terminated`, as expected!
580 assert_eq!(
581 room_list.state_machine.get(),
582 State::Terminated { from: Box::new(State::Running) }
583 );
584
585 // Now, let's make the sliding sync session to expire.
586 room_list.expire_sync_session().await;
587
588 // State is `Error`, as a regular session expiration would generate!
589 assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
590
591 Ok(())
592 }
593}