matrix_sdk_ui/room_list_service/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15//! `RoomListService` API.
16//!
17//! The `RoomListService` is a UI API dedicated to present a list of Matrix
18//! rooms to the user. The syncing is handled by [`SlidingSync`]. The idea is to
19//! expose a simple API to handle most of the client app use cases, like:
20//! Showing and updating a list of rooms, filtering a list of rooms, handling
21//! particular updates of a range of rooms (the ones the client app is showing
22//! to the view, i.e. the rooms present in the viewport) etc.
23//!
24//! As such, the `RoomListService` works as an opinionated state machine. The
25//! states are defined by [`State`]. Actions are attached to the each state
26//! transition.
27//!
28//! The API is purposely small. Sliding Sync is versatile. `RoomListService` is
29//! _one_ specific usage of Sliding Sync.
30//!
31//! # Basic principle
32//!
33//! `RoomListService` works with 1 Sliding Sync List:
34//!
35//! * `all_rooms` (referred by the constant [`ALL_ROOMS_LIST_NAME`]) is the only
36//!   list. Its goal is to load all the user' rooms. It starts with a
37//!   [`SlidingSyncMode::Selective`] sync-mode with a small range (i.e. a small
38//!   set of rooms) to load the first rooms quickly, and then updates to a
39//!   [`SlidingSyncMode::Growing`] sync-mode to load the remaining rooms “in the
40//!   background”: it will sync the existing rooms and will fetch new rooms, by
41//!   a certain batch size.
42//!
43//! This behavior has proven to be empirically satisfying to provide a fast and
44//! fluid user experience for a Matrix client.
45//!
46//! [`RoomListService::all_rooms`] provides a way to get a [`RoomList`] for all
47//! the rooms. From that, calling [`RoomList::entries_with_dynamic_adapters`]
48//! provides a way to get a stream of rooms. This stream is sorted, can be
49//! filtered, and the filter can be changed over time.
50//!
51//! [`RoomListService::state`] provides a way to get a stream of the state
52//! machine's state, which can be pretty helpful for the client app.
53
54pub mod filters;
55mod room;
56mod room_list;
57pub mod sorters;
58mod state;
59
60use std::{sync::Arc, time::Duration};
61
62use async_stream::stream;
63use eyeball::Subscriber;
64use futures_util::{pin_mut, Stream, StreamExt};
65use matrix_sdk::{
66    event_cache::EventCacheError, timeout::timeout, Client, Error as SlidingSyncError, SlidingSync,
67    SlidingSyncList, SlidingSyncMode,
68};
69pub use room::*;
70pub use room_list::*;
71use ruma::{
72    api::client::sync::sync_events::v5 as http, assign, directory::RoomTypeFilter,
73    events::StateEventType, OwnedRoomId, RoomId, UInt,
74};
75pub use state::*;
76use thiserror::Error;
77use tracing::debug;
78
79use crate::timeline;
80
81/// The default `required_state` constant value for sliding sync lists and
82/// sliding sync room subscriptions.
83const DEFAULT_REQUIRED_STATE: &[(StateEventType, &str)] = &[
84    (StateEventType::RoomName, ""),
85    (StateEventType::RoomEncryption, ""),
86    (StateEventType::RoomMember, "$LAZY"),
87    (StateEventType::RoomMember, "$ME"),
88    (StateEventType::RoomTopic, ""),
89    (StateEventType::RoomCanonicalAlias, ""),
90    (StateEventType::RoomPowerLevels, ""),
91    (StateEventType::CallMember, "*"),
92    (StateEventType::RoomJoinRules, ""),
93    // Those two events are required to properly compute room previews.
94    (StateEventType::RoomCreate, ""),
95    (StateEventType::RoomHistoryVisibility, ""),
96    // Required to correctly calculate the room display name.
97    (StateEventType::MemberHints, ""),
98];
99
100/// The default `required_state` constant value for sliding sync room
101/// subscriptions that must be added to `DEFAULT_REQUIRED_STATE`.
102const DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE: &[(StateEventType, &str)] =
103    &[(StateEventType::RoomPinnedEvents, "")];
104
105/// The default `timeline_limit` value when used with room subscriptions.
106const DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT: u32 = 20;
107
108/// The [`RoomListService`] type. See the module's documentation to learn more.
109#[derive(Debug)]
110pub struct RoomListService {
111    /// Client that has created this [`RoomListService`].
112    client: Client,
113
114    /// The Sliding Sync instance.
115    sliding_sync: Arc<SlidingSync>,
116
117    /// The current state of the `RoomListService`.
118    ///
119    /// `RoomListService` is a simple state-machine.
120    state_machine: StateMachine,
121}
122
123impl RoomListService {
124    /// Create a new `RoomList`.
125    ///
126    /// A [`matrix_sdk::SlidingSync`] client will be created, with a cached list
127    /// already pre-configured.
128    ///
129    /// This won't start an encryption sync, and it's the user's responsibility
130    /// to create one in this case using
131    /// [`EncryptionSyncService`][crate::encryption_sync_service::EncryptionSyncService].
132    pub async fn new(client: Client) -> Result<Self, Error> {
133        let builder = client
134            .sliding_sync("room-list")
135            .map_err(Error::SlidingSync)?
136            .with_account_data_extension(
137                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
138            )
139            .with_receipt_extension(assign!(http::request::Receipts::default(), {
140                enabled: Some(true),
141                rooms: Some(vec![http::request::ReceiptsRoom::AllSubscribed])
142            }))
143            .with_typing_extension(assign!(http::request::Typing::default(), {
144                enabled: Some(true),
145            }));
146        // TODO: Re-enable once we know it creates slowness.
147        // // We don't deal with encryption device messages here so this is safe
148        // .share_pos();
149
150        let sliding_sync = builder
151            .add_cached_list(
152                SlidingSyncList::builder(ALL_ROOMS_LIST_NAME)
153                    .sync_mode(
154                        SlidingSyncMode::new_selective()
155                            .add_range(ALL_ROOMS_DEFAULT_SELECTIVE_RANGE),
156                    )
157                    .timeline_limit(1)
158                    .required_state(
159                        DEFAULT_REQUIRED_STATE
160                            .iter()
161                            .map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
162                            .collect(),
163                    )
164                    .include_heroes(Some(true))
165                    .filters(Some(assign!(http::request::ListFilters::default(), {
166                        // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
167                        // If unset, both invited and joined rooms are returned. If false, no invited rooms are
168                        // returned. If true, only invited rooms are returned.
169                        is_invite: None,
170                        not_room_types: vec![RoomTypeFilter::Space],
171                    }))),
172            )
173            .await
174            .map_err(Error::SlidingSync)?
175            .build()
176            .await
177            .map(Arc::new)
178            .map_err(Error::SlidingSync)?;
179
180        // Eagerly subscribe the event cache to sync responses.
181        client.event_cache().subscribe()?;
182
183        Ok(Self { client, sliding_sync, state_machine: StateMachine::new() })
184    }
185
186    /// Start to sync the room list.
187    ///
188    /// It's the main method of this entire API. Calling `sync` allows to
189    /// receive updates on the room list: new rooms, rooms updates etc. Those
190    /// updates can be read with [`RoomList::entries`] for example. This method
191    /// returns a [`Stream`] where produced items only hold an empty value
192    /// in case of a sync success, otherwise an error.
193    ///
194    /// The `RoomListService`' state machine is run by this method.
195    ///
196    /// Stopping the [`Stream`] (i.e. by calling [`Self::stop_sync`]), and
197    /// calling [`Self::sync`] again will resume from the previous state of
198    /// the state machine.
199    ///
200    /// This should be used only for testing. In practice, most users should be
201    /// using the [`SyncService`] instead.
202    #[doc(hidden)]
203    pub fn sync(&self) -> impl Stream<Item = Result<(), Error>> + '_ {
204        stream! {
205            let sync = self.sliding_sync.sync();
206            pin_mut!(sync);
207
208            // This is a state machine implementation.
209            // Things happen in this order:
210            //
211            // 1. The next state is calculated,
212            // 2. The actions associated to the next state are run,
213            // 3. A sync is done,
214            // 4. The next state is stored.
215            loop {
216                debug!("Run a sync iteration");
217
218                // Calculate the next state, and run the associated actions.
219                let next_state = self.state_machine.next(&self.sliding_sync).await?;
220
221                // Do the sync.
222                match sync.next().await {
223                    // Got a successful result while syncing.
224                    Some(Ok(_update_summary)) => {
225                        debug!(state = ?next_state, "New state");
226
227                        // Update the state.
228                        self.state_machine.set(next_state);
229
230                        yield Ok(());
231                    }
232
233                    // Got an error while syncing.
234                    Some(Err(error)) => {
235                        debug!(expected_state = ?next_state, "New state is an error");
236
237                        let next_state = State::Error { from: Box::new(next_state) };
238                        self.state_machine.set(next_state);
239
240                        yield Err(Error::SlidingSync(error));
241
242                        break;
243                    }
244
245                    // Sync loop has terminated.
246                    None => {
247                        debug!(expected_state = ?next_state, "New state is a termination");
248
249                        let next_state = State::Terminated { from: Box::new(next_state) };
250                        self.state_machine.set(next_state);
251
252                        break;
253                    }
254                }
255            }
256        }
257    }
258
259    /// Force to stop the sync of the `RoomListService` started by
260    /// [`Self::sync`].
261    ///
262    /// It's of utter importance to call this method rather than stop polling
263    /// the `Stream` returned by [`Self::sync`] because it will force the
264    /// cancellation and exit the sync loop, i.e. it will cancel any
265    /// in-flight HTTP requests, cancel any pending futures etc. and put the
266    /// service into a termination state.
267    ///
268    /// Ideally, one wants to consume the `Stream` returned by [`Self::sync`]
269    /// until it returns `None`, because of [`Self::stop_sync`], so that it
270    /// ensures the states are correctly placed.
271    ///
272    /// Stopping the sync of the room list via this method will put the
273    /// state-machine into the [`State::Terminated`] state.
274    ///
275    /// This should be used only for testing. In practice, most users should be
276    /// using the [`SyncService`] instead.
277    #[doc(hidden)]
278    pub fn stop_sync(&self) -> Result<(), Error> {
279        self.sliding_sync.stop_sync().map_err(Error::SlidingSync)
280    }
281
282    /// Force the sliding sync session to expire.
283    ///
284    /// This is used by [`SyncService`][crate::SyncService].
285    ///
286    /// **Warning**: This method **must not** be called while the sync loop is
287    /// running!
288    pub(crate) async fn expire_sync_session(&self) {
289        self.sliding_sync.expire_session().await;
290
291        // Usually, when the session expires, it leads the state to be `Error`,
292        // thus some actions (like refreshing the lists) are executed. However,
293        // if the sync loop has been stopped manually, the state is `Terminated`, and
294        // when the session is forced to expire, the state remains `Terminated`, thus
295        // the actions aren't executed as expected. Consequently, let's update the
296        // state.
297        if let State::Terminated { from } = self.state_machine.get() {
298            self.state_machine.set(State::Error { from });
299        }
300    }
301
302    /// Get a [`Stream`] of [`SyncIndicator`].
303    ///
304    /// Read the documentation of [`SyncIndicator`] to learn more about it.
305    pub fn sync_indicator(
306        &self,
307        delay_before_showing: Duration,
308        delay_before_hiding: Duration,
309    ) -> impl Stream<Item = SyncIndicator> {
310        let mut state = self.state();
311
312        stream! {
313            // Ensure the `SyncIndicator` is always hidden to start with.
314            yield SyncIndicator::Hide;
315
316            // Let's not wait for an update to happen. The `SyncIndicator` must be
317            // computed as fast as possible.
318            let mut current_state = state.next_now();
319
320            loop {
321                let (sync_indicator, yield_delay) = match current_state {
322                    State::Init | State::Error { .. } => {
323                        (SyncIndicator::Show, delay_before_showing)
324                    }
325
326                    State::SettingUp | State::Recovering | State::Running | State::Terminated { .. } => {
327                        (SyncIndicator::Hide, delay_before_hiding)
328                    }
329                };
330
331                // `state.next().await` has a maximum of `yield_delay` time to execute…
332                let next_state = match timeout(state.next(), yield_delay).await {
333                    // A new state has been received before `yield_delay` time. The new
334                    // `sync_indicator` value won't be yielded.
335                    Ok(next_state) => next_state,
336
337                    // No new state has been received before `yield_delay` time. The
338                    // `sync_indicator` value can be yielded.
339                    Err(_) => {
340                        yield sync_indicator;
341
342                        // Now that `sync_indicator` has been yielded, let's wait on
343                        // the next state again.
344                        state.next().await
345                    }
346                };
347
348                if let Some(next_state) = next_state {
349                    // Update the `current_state`.
350                    current_state = next_state;
351                } else {
352                    // Something is broken with the state. Let's stop this stream too.
353                    break;
354                }
355            }
356        }
357    }
358
359    /// Get the [`Client`] that has been used to create [`Self`].
360    pub fn client(&self) -> &Client {
361        &self.client
362    }
363
364    /// Get a subscriber to the state.
365    pub fn state(&self) -> Subscriber<State> {
366        self.state_machine.subscribe()
367    }
368
369    async fn list_for(&self, sliding_sync_list_name: &str) -> Result<RoomList, Error> {
370        RoomList::new(&self.client, &self.sliding_sync, sliding_sync_list_name, self.state()).await
371    }
372
373    /// Get a [`RoomList`] for all rooms.
374    pub async fn all_rooms(&self) -> Result<RoomList, Error> {
375        self.list_for(ALL_ROOMS_LIST_NAME).await
376    }
377
378    /// Get a [`Room`] if it exists.
379    pub fn room(&self, room_id: &RoomId) -> Result<Room, Error> {
380        Ok(Room::new(
381            self.client.get_room(room_id).ok_or_else(|| Error::RoomNotFound(room_id.to_owned()))?,
382            &self.sliding_sync,
383        ))
384    }
385
386    /// Subscribe to rooms.
387    ///
388    /// It means that all events from these rooms will be received every time,
389    /// no matter how the `RoomList` is configured.
390    pub fn subscribe_to_rooms(&self, room_ids: &[&RoomId]) {
391        let settings = assign!(http::request::RoomSubscription::default(), {
392            required_state: DEFAULT_REQUIRED_STATE.iter().map(|(state_event, value)| {
393                (state_event.clone(), (*value).to_owned())
394            })
395            .chain(
396                DEFAULT_ROOM_SUBSCRIPTION_EXTRA_REQUIRED_STATE.iter().map(|(state_event, value)| {
397                    (state_event.clone(), (*value).to_owned())
398                })
399            )
400            .collect(),
401            timeline_limit: UInt::from(DEFAULT_ROOM_SUBSCRIPTION_TIMELINE_LIMIT),
402        });
403
404        let cancel_in_flight_request = match self.state_machine.get() {
405            State::Init | State::Recovering | State::Error { .. } | State::Terminated { .. } => {
406                false
407            }
408            State::SettingUp | State::Running => true,
409        };
410
411        self.sliding_sync.subscribe_to_rooms(room_ids, Some(settings), cancel_in_flight_request)
412    }
413
414    #[cfg(test)]
415    pub fn sliding_sync(&self) -> &SlidingSync {
416        &self.sliding_sync
417    }
418}
419
420/// [`RoomList`]'s errors.
421#[derive(Debug, Error)]
422pub enum Error {
423    /// Error from [`matrix_sdk::SlidingSync`].
424    #[error(transparent)]
425    SlidingSync(SlidingSyncError),
426
427    /// An operation has been requested on an unknown list.
428    #[error("Unknown list `{0}`")]
429    UnknownList(String),
430
431    /// The requested room doesn't exist.
432    #[error("Room `{0}` not found")]
433    RoomNotFound(OwnedRoomId),
434
435    #[error("A timeline instance already exists for room {0}")]
436    TimelineAlreadyExists(OwnedRoomId),
437
438    #[error(transparent)]
439    InitializingTimeline(#[from] timeline::Error),
440
441    #[error(transparent)]
442    EventCache(#[from] EventCacheError),
443}
444
445/// An hint whether a _sync spinner/loader/toaster_ should be prompted to the
446/// user, indicating that the [`RoomListService`] is syncing.
447///
448/// This is entirely arbitrary and optinionated. Of course, once
449/// [`RoomListService::sync`] has been called, it's going to be constantly
450/// syncing, until [`RoomListService::stop_sync`] is called, or until an error
451/// happened. But in some cases, it's better for the user experience to prompt
452/// to the user that a sync is happening. It's usually the first sync, or the
453/// recovering sync. However, the sync indicator must be prompted if the
454/// aforementioned sync is “slow”, otherwise the indicator is likely to “blink”
455/// pretty fast, which can be very confusing. It's also common to indicate to
456/// the user that a syncing is happening in case of a network error, that
457/// something is catching up etc.
458#[derive(Debug, Eq, PartialEq)]
459pub enum SyncIndicator {
460    /// Show the sync indicator.
461    Show,
462
463    /// Hide the sync indicator.
464    Hide,
465}
466
467#[cfg(test)]
468mod tests {
469    use std::future::ready;
470
471    use futures_util::{pin_mut, StreamExt};
472    use matrix_sdk::{
473        authentication::matrix::{MatrixSession, MatrixSessionTokens},
474        config::RequestConfig,
475        Client, SlidingSyncMode,
476    };
477    use matrix_sdk_base::SessionMeta;
478    use matrix_sdk_test::async_test;
479    use ruma::{api::MatrixVersion, device_id, user_id};
480    use serde_json::json;
481    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
482
483    use super::{Error, RoomListService, State, ALL_ROOMS_LIST_NAME};
484
485    async fn new_client() -> (Client, MockServer) {
486        let session = MatrixSession {
487            meta: SessionMeta {
488                user_id: user_id!("@example:localhost").to_owned(),
489                device_id: device_id!("DEVICEID").to_owned(),
490            },
491            tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
492        };
493
494        let server = MockServer::start().await;
495        let client = Client::builder()
496            .homeserver_url(server.uri())
497            .server_versions([MatrixVersion::V1_0])
498            .request_config(RequestConfig::new().disable_retry())
499            .build()
500            .await
501            .unwrap();
502        client.restore_session(session).await.unwrap();
503
504        (client, server)
505    }
506
507    pub(super) async fn new_room_list() -> Result<RoomListService, Error> {
508        let (client, _) = new_client().await;
509
510        RoomListService::new(client).await
511    }
512
513    struct SlidingSyncMatcher;
514
515    impl Match for SlidingSyncMatcher {
516        fn matches(&self, request: &Request) -> bool {
517            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
518                && request.method == Method::POST
519        }
520    }
521
522    #[async_test]
523    async fn test_all_rooms_are_declared() -> Result<(), Error> {
524        let room_list = new_room_list().await?;
525        let sliding_sync = room_list.sliding_sync();
526
527        // List is present, in Selective mode.
528        assert_eq!(
529            sliding_sync
530                .on_list(ALL_ROOMS_LIST_NAME, |list| ready(matches!(
531                    list.sync_mode(),
532                    SlidingSyncMode::Selective { ranges } if ranges == vec![0..=19]
533                )))
534                .await,
535            Some(true)
536        );
537
538        Ok(())
539    }
540
541    #[async_test]
542    async fn test_expire_sliding_sync_session_manually() -> Result<(), Error> {
543        let (client, server) = new_client().await;
544
545        let room_list = RoomListService::new(client).await?;
546
547        let sync = room_list.sync();
548        pin_mut!(sync);
549
550        // Run a first sync.
551        {
552            let _mock_guard = Mock::given(SlidingSyncMatcher)
553                .respond_with(move |_request: &Request| {
554                    ResponseTemplate::new(200).set_body_json(json!({
555                        "pos": "0",
556                        "lists": {
557                            ALL_ROOMS_LIST_NAME: {
558                                "count": 0,
559                                "ops": [],
560                            },
561                        },
562                        "rooms": {},
563                    }))
564                })
565                .mount_as_scoped(&server)
566                .await;
567
568            let _ = sync.next().await;
569        }
570
571        assert_eq!(room_list.state().get(), State::SettingUp);
572
573        // Stop the sync.
574        room_list.stop_sync()?;
575
576        // Do another sync.
577        let _ = sync.next().await;
578
579        // State is `Terminated`, as expected!
580        assert_eq!(
581            room_list.state_machine.get(),
582            State::Terminated { from: Box::new(State::Running) }
583        );
584
585        // Now, let's make the sliding sync session to expire.
586        room_list.expire_sync_session().await;
587
588        // State is `Error`, as a regular session expiration would generate!
589        assert_eq!(room_list.state_machine.get(), State::Error { from: Box::new(State::Running) });
590
591        Ok(())
592    }
593}