Skip to main content

matrix_sdk_ui/room_list_service/
room_list.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{future::ready, ops::Deref, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{Stream, StreamExt as _, pin_mut, stream};
24use matrix_sdk::{
25    Client, Room, RoomRecencyStamp, RoomState, SlidingSync, SlidingSyncList,
26    task_monitor::BackgroundTaskHandle,
27};
28use matrix_sdk_base::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons};
29use ruma::MilliSecondsSinceUnixEpoch;
30use tokio::{
31    select,
32    sync::broadcast::{self, error::RecvError},
33};
34use tracing::{error, trace};
35
36use super::{
37    Error, State,
38    filters::BoxedFilterFn,
39    sorters::{
40        new_sorter_latest_event, new_sorter_lexicographic, new_sorter_name, new_sorter_recency,
41    },
42};
43
44/// A `RoomList` represents a list of rooms, from a
45/// [`RoomListService`](super::RoomListService).
46#[derive(Debug)]
47pub struct RoomList {
48    client: Client,
49    sliding_sync_list: SlidingSyncList,
50    loading_state: SharedObservable<RoomListLoadingState>,
51    _loading_state_task: BackgroundTaskHandle,
52}
53
54impl RoomList {
55    pub(super) async fn new(
56        client: &Client,
57        sliding_sync: &Arc<SlidingSync>,
58        sliding_sync_list_name: &str,
59        room_list_service_state: Subscriber<State>,
60    ) -> Result<Self, Error> {
61        let sliding_sync_list = sliding_sync
62            .on_list(sliding_sync_list_name, |list| ready(list.clone()))
63            .await
64            .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
65
66        let loading_state =
67            SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
68                Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
69                    maximum_number_of_rooms: Some(maximum_number_of_rooms),
70                },
71                None => RoomListLoadingState::NotLoaded,
72            });
73
74        Ok(Self {
75            client: client.clone(),
76            sliding_sync_list: sliding_sync_list.clone(),
77            loading_state: loading_state.clone(),
78            _loading_state_task: client
79                .task_monitor()
80                .spawn_infinite_task("room_list::loading_state_task", async move {
81                    pin_mut!(room_list_service_state);
82
83                    // As soon as `RoomListService` changes its state, if it isn't
84                    // `Terminated` nor `Error`, we know we have fetched something,
85                    // so the room list is loaded.
86                    while let Some(state) = room_list_service_state.next().await {
87                        use State::*;
88
89                        match state {
90                            Terminated { .. } | Error { .. } | Init => (),
91                            SettingUp | Recovering | Running => break,
92                        }
93                    }
94
95                    // Let's jump from `NotLoaded` to `Loaded`.
96                    let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
97
98                    loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
99
100                    // Wait for updates on the maximum number of rooms to update again.
101                    let mut maximum_number_of_rooms_stream =
102                        sliding_sync_list.maximum_number_of_rooms_stream();
103
104                    while let Some(maximum_number_of_rooms) =
105                        maximum_number_of_rooms_stream.next().await
106                    {
107                        loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
108                    }
109                })
110                .abort_on_drop(),
111        })
112    }
113
114    /// Get a subscriber to the room list loading state.
115    ///
116    /// This method will send out the current loading state as the first update.
117    ///
118    /// See [`RoomListLoadingState`].
119    pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
120        self.loading_state.subscribe_reset()
121    }
122
123    /// Get a stream of rooms.
124    fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
125        self.client.rooms_stream()
126    }
127
128    /// Get a configurable stream of rooms.
129    ///
130    /// It's possible to provide a filter that will filter out room list
131    /// entries, and that it's also possible to “paginate” over the entries by
132    /// `page_size`. The rooms are also sorted.
133    ///
134    /// The returned stream will only start yielding diffs once a filter is set
135    /// through the returned [`RoomListDynamicEntriesController`]. For every
136    /// call to [`RoomListDynamicEntriesController::set_filter`], the stream
137    /// will yield a [`VectorDiff::Reset`] followed by any updates of the
138    /// room list under that filter (until the next reset).
139    pub fn entries_with_dynamic_adapters(
140        &self,
141        page_size: usize,
142    ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
143    {
144        let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
145        let list = self.sliding_sync_list.clone();
146
147        let filter_fn_cell = AsyncCell::shared();
148
149        let limit = SharedObservable::<usize>::new(page_size);
150        let limit_stream = limit.subscribe();
151
152        let dynamic_entries_controller = RoomListDynamicEntriesController::new(
153            filter_fn_cell.clone(),
154            page_size,
155            limit,
156            list.maximum_number_of_rooms_stream(),
157        );
158
159        let stream = stream! {
160            loop {
161                let filter_fn = filter_fn_cell.take().await;
162
163                let (raw_values, raw_stream) = self.entries();
164                let values = raw_values.into_iter().map(Into::into).collect::<Vector<RoomListItem>>();
165
166                // Combine normal stream events with other updates from rooms
167                let stream = merge_stream_and_receiver(values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
168
169                let (values, stream) = (values, stream)
170                    .filter(filter_fn)
171                    .sort_by(new_sorter_lexicographic(vec![
172                        // Sort by latest event's kind, i.e. put the rooms with a
173                        // **local** latest event first.
174                        Box::new(new_sorter_latest_event()),
175                        // Sort rooms by their recency (either by looking
176                        // at their latest event's timestamp, or their
177                        // `recency_stamp`).
178                        Box::new(new_sorter_recency()),
179                        // Finally, sort by name.
180                        Box::new(new_sorter_name()),
181                    ]))
182                    .dynamic_head_with_initial_value(page_size, limit_stream.clone());
183
184                // Clearing the stream before chaining with the real stream.
185                yield stream::once(ready(vec![VectorDiff::Reset { values }]))
186                    .chain(stream);
187            }
188        }
189        .fuse()
190        .switch();
191
192        (stream, dynamic_entries_controller)
193    }
194}
195
196/// This function remembers the current state of the unfiltered room list, so it
197/// knows where all rooms are. When the receiver is triggered, a Set operation
198/// for the room position is inserted to the stream.
199fn merge_stream_and_receiver(
200    mut current_values: Vector<RoomListItem>,
201    raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
202    mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
203) -> impl Stream<Item = Vec<VectorDiff<RoomListItem>>> {
204    stream! {
205        pin_mut!(raw_stream);
206
207        loop {
208            select! {
209                // We want to give priority on updates from `raw_stream` as it will necessarily trigger a “refresh” of the rooms.
210                biased;
211
212                diffs = raw_stream.next() => {
213                    if let Some(diffs) = diffs {
214                        let diffs = diffs.into_iter().map(|diff| diff.map(RoomListItem::from)).collect::<Vec<_>>();
215
216                        for diff in &diffs {
217                            diff.clone().map(|room| {
218                                trace!(room = %room.room_id(), "updated in response");
219                                room
220                            }).apply(&mut current_values);
221                        }
222
223                        yield diffs;
224                    } else {
225                        // Restart immediately, don't keep on waiting for the receiver
226                        break;
227                    }
228                }
229
230                update = room_info_notable_update_receiver.recv() => {
231                    match update {
232                        Ok(update) => {
233                            // Filter which _reason_ can trigger an update of
234                            // the room list.
235                            //
236                            // If the update is strictly about the
237                            // `RECENCY_STAMP`, let's ignore it, because the
238                            // Latest Event type is used to sort the room list
239                            // by recency already. We don't want to trigger an
240                            // update because of `RECENCY_STAMP`.
241                            //
242                            // If the update contains more reasons than
243                            // `RECENCY_STAMP`, then it's fine. That's why we
244                            // are using `==` instead of `contains`.
245                            if update.reasons == RoomInfoNotableUpdateReasons::RECENCY_STAMP {
246                                continue;
247                            }
248
249                            // Emit a `VectorDiff::Set` for the specific rooms.
250                            if let Some(index) = current_values.iter().position(|room| room.room_id() == update.room_id) {
251                                let mut room = current_values[index].clone();
252                                room.refresh_cached_data();
253
254                                yield vec![VectorDiff::Set { index, value: room }];
255                            }
256                        }
257
258                        Err(RecvError::Closed) => {
259                            error!("Cannot receive room info notable updates because the sender has been closed");
260
261                            break;
262                        }
263
264                        Err(RecvError::Lagged(n)) => {
265                            error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
266                        }
267                    }
268                }
269            }
270        }
271    }
272}
273
274/// The loading state of a [`RoomList`].
275///
276/// When a [`RoomList`] is displayed to the user, it can be in various states.
277/// This enum tries to represent those states with a correct level of
278/// abstraction.
279///
280/// See [`RoomList::loading_state`].
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub enum RoomListLoadingState {
283    /// The [`RoomList`] has not been loaded yet, i.e. a sync might run
284    /// or not run at all, there is nothing to show in this `RoomList` yet.
285    /// It's a good opportunity to show a placeholder to the user.
286    ///
287    /// From [`Self::NotLoaded`], it's only possible to move to
288    /// [`Self::Loaded`].
289    NotLoaded,
290
291    /// The [`RoomList`] has been loaded, i.e. a sync has been run, or more
292    /// syncs are running, there is probably something to show to the user.
293    /// Either the user has 0 room, in this case, it's a good opportunity to
294    /// show a special screen for that, or the user has multiple rooms, and it's
295    /// the classical room list.
296    ///
297    /// The number of rooms is represented by `maximum_number_of_rooms`.
298    ///
299    /// From [`Self::Loaded`], it's not possible to move back to
300    /// [`Self::NotLoaded`].
301    Loaded {
302        /// The maximum number of rooms a [`RoomList`] contains.
303        ///
304        /// It does not mean that there are exactly this many rooms to display.
305        /// The room entries are represented by [`RoomListItem`]. The room entry
306        /// might have been synced or not synced yet, but we know for sure
307        /// (from the server), that there will be this amount of rooms in the
308        /// list at the end.
309        ///
310        /// Note that it's an `Option`, because it may be possible that the
311        /// server did miss to send us this value. It's up to you, dear reader,
312        /// to know which default to adopt in case of `None`.
313        maximum_number_of_rooms: Option<u32>,
314    },
315}
316
317/// Controller for the [`RoomList`] dynamic entries.
318///
319/// To get one value of this type, use
320/// [`RoomList::entries_with_dynamic_adapters`]
321pub struct RoomListDynamicEntriesController {
322    filter: Arc<AsyncCell<BoxedFilterFn>>,
323    page_size: usize,
324    limit: SharedObservable<usize>,
325    maximum_number_of_rooms: Subscriber<Option<u32>>,
326}
327
328impl RoomListDynamicEntriesController {
329    fn new(
330        filter: Arc<AsyncCell<BoxedFilterFn>>,
331        page_size: usize,
332        limit_stream: SharedObservable<usize>,
333        maximum_number_of_rooms: Subscriber<Option<u32>>,
334    ) -> Self {
335        Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
336    }
337
338    /// Set the filter.
339    ///
340    /// If the associated stream has been dropped, returns `false` to indicate
341    /// the operation didn't have an effect.
342    pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
343        if Arc::strong_count(&self.filter) == 1 {
344            // there is no other reference to the boxed filter fn, setting it
345            // would be pointless (no new references can be created from self,
346            // either)
347            false
348        } else {
349            self.filter.set(filter);
350            true
351        }
352    }
353
354    /// Add one page, i.e. view `page_size` more entries in the room list if
355    /// any.
356    pub fn add_one_page(&self) {
357        let Some(max) = self.maximum_number_of_rooms.get() else {
358            return;
359        };
360
361        let max: usize = max.try_into().unwrap();
362        let limit = self.limit.get();
363
364        if limit < max {
365            // With this logic, it is possible that `limit` becomes greater than `max` if
366            // `max - limit < page_size`, and that's perfectly fine. It's OK to have a
367            // `limit` greater than `max`, but it's not OK to increase the limit
368            // indefinitely.
369            self.limit.set_if_not_eq(limit + self.page_size);
370        }
371    }
372
373    /// Reset the one page, i.e. forget all pages and move back to the first
374    /// page.
375    pub fn reset_to_one_page(&self) {
376        self.limit.set_if_not_eq(self.page_size);
377    }
378}
379
380/// A facade type that derefs to [`Room`] and that caches data from
381/// [`RoomInfo`].
382///
383/// Why caching data? [`RoomInfo`] is behind a lock. Every time a filter or a
384/// sorter calls a method on [`Room`], it's likely to hit the lock in front of
385/// [`RoomInfo`]. It creates a big contention. By caching the data, it avoids
386/// hitting the lock, improving the performance greatly.
387///
388/// Data are refreshed in `merge_stream_and_receiver` (private function).
389///
390/// [`RoomInfo`]: matrix_sdk::RoomInfo
391#[derive(Clone, Debug)]
392pub struct RoomListItem {
393    /// The inner room.
394    inner: Room,
395
396    /// Cache of `Room::latest_event_timestamp`.
397    pub(super) cached_latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
398
399    /// Cache of `Room::latest_event_is_unsent`.
400    pub(super) cached_latest_event_is_unsent: bool,
401
402    /// Cache of `Room::recency_stamp`.
403    pub(super) cached_recency_stamp: Option<RoomRecencyStamp>,
404
405    /// Cache of `Room::cached_display_name`, already as a string.
406    pub(super) cached_display_name: Option<String>,
407
408    /// Cache of `Room::is_space`.
409    pub(super) cached_is_space: bool,
410
411    // Cache of `Room::state`.
412    pub(super) cached_state: RoomState,
413}
414
415impl RoomListItem {
416    /// Deconstruct to the inner room value.
417    pub fn into_inner(self) -> Room {
418        self.inner
419    }
420
421    /// Refresh the cached data.
422    pub(super) fn refresh_cached_data(&mut self) {
423        self.cached_latest_event_timestamp = self.inner.latest_event_timestamp();
424        self.cached_latest_event_is_unsent = self.inner.latest_event_is_unsent();
425        self.cached_recency_stamp = self.inner.recency_stamp();
426        self.cached_display_name = self.inner.cached_display_name().map(|name| name.to_string());
427        self.cached_is_space = self.inner.is_space();
428        self.cached_state = self.inner.state();
429    }
430}
431
432impl From<Room> for RoomListItem {
433    fn from(inner: Room) -> Self {
434        let cached_latest_event_timestamp = inner.latest_event_timestamp();
435        let cached_latest_event_is_unsent = inner.latest_event_is_unsent();
436        let cached_recency_stamp = inner.recency_stamp();
437        let cached_display_name = inner.cached_display_name().map(|name| name.to_string());
438        let cached_is_space = inner.is_space();
439        let cached_state = inner.state();
440
441        Self {
442            inner,
443            cached_latest_event_timestamp,
444            cached_latest_event_is_unsent,
445            cached_recency_stamp,
446            cached_display_name,
447            cached_is_space,
448            cached_state,
449        }
450    }
451}
452
453impl Deref for RoomListItem {
454    type Target = Room;
455
456    fn deref(&self) -> &Self::Target {
457        &self.inner
458    }
459}