matrix_sdk_ui/room_list_service/
room_list.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{future::ready, ops::Deref, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{Stream, StreamExt as _, pin_mut, stream};
24use matrix_sdk::{
25    Client, Room, RoomRecencyStamp, RoomState, SlidingSync, SlidingSyncList,
26    executor::{JoinHandle, spawn},
27};
28use matrix_sdk_base::RoomInfoNotableUpdate;
29use ruma::MilliSecondsSinceUnixEpoch;
30use tokio::{
31    select,
32    sync::broadcast::{self, error::RecvError},
33};
34use tracing::{error, trace};
35
36use super::{
37    Error, State,
38    filters::BoxedFilterFn,
39    sorters::{
40        BoxedSorterFn, new_sorter_latest_event, new_sorter_lexicographic, new_sorter_name,
41        new_sorter_recency,
42    },
43};
44
45/// A `RoomList` represents a list of rooms, from a
46/// [`RoomListService`](super::RoomListService).
47#[derive(Debug)]
48pub struct RoomList {
49    client: Client,
50    sliding_sync_list: SlidingSyncList,
51    loading_state: SharedObservable<RoomListLoadingState>,
52    loading_state_task: JoinHandle<()>,
53}
54
55impl Drop for RoomList {
56    fn drop(&mut self) {
57        self.loading_state_task.abort();
58    }
59}
60
61impl RoomList {
62    pub(super) async fn new(
63        client: &Client,
64        sliding_sync: &Arc<SlidingSync>,
65        sliding_sync_list_name: &str,
66        room_list_service_state: Subscriber<State>,
67    ) -> Result<Self, Error> {
68        let sliding_sync_list = sliding_sync
69            .on_list(sliding_sync_list_name, |list| ready(list.clone()))
70            .await
71            .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
72
73        let loading_state =
74            SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
75                Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
76                    maximum_number_of_rooms: Some(maximum_number_of_rooms),
77                },
78                None => RoomListLoadingState::NotLoaded,
79            });
80
81        Ok(Self {
82            client: client.clone(),
83            sliding_sync_list: sliding_sync_list.clone(),
84            loading_state: loading_state.clone(),
85            loading_state_task: spawn(async move {
86                pin_mut!(room_list_service_state);
87
88                // As soon as `RoomListService` changes its state, if it isn't
89                // `Terminated` nor `Error`, we know we have fetched something,
90                // so the room list is loaded.
91                while let Some(state) = room_list_service_state.next().await {
92                    use State::*;
93
94                    match state {
95                        Terminated { .. } | Error { .. } | Init => (),
96                        SettingUp | Recovering | Running => break,
97                    }
98                }
99
100                // Let's jump from `NotLoaded` to `Loaded`.
101                let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
102
103                loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
104
105                // Wait for updates on the maximum number of rooms to update again.
106                let mut maximum_number_of_rooms_stream =
107                    sliding_sync_list.maximum_number_of_rooms_stream();
108
109                while let Some(maximum_number_of_rooms) =
110                    maximum_number_of_rooms_stream.next().await
111                {
112                    loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
113                }
114            }),
115        })
116    }
117
118    /// Get a subscriber to the room list loading state.
119    ///
120    /// This method will send out the current loading state as the first update.
121    ///
122    /// See [`RoomListLoadingState`].
123    pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
124        self.loading_state.subscribe_reset()
125    }
126
127    /// Get a stream of rooms.
128    fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
129        self.client.rooms_stream()
130    }
131
132    /// Get a configurable stream of rooms.
133    ///
134    /// It's possible to provide a filter that will filter out room list
135    /// entries, and that it's also possible to “paginate” over the entries by
136    /// `page_size`. The rooms are also sorted.
137    ///
138    /// The returned stream will only start yielding diffs once a filter is set
139    /// through the returned [`RoomListDynamicEntriesController`]. For every
140    /// call to [`RoomListDynamicEntriesController::set_filter`], the stream
141    /// will yield a [`VectorDiff::Reset`] followed by any updates of the
142    /// room list under that filter (until the next reset).
143    pub fn entries_with_dynamic_adapters(
144        &self,
145        page_size: usize,
146    ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
147    {
148        self.entries_with_dynamic_adapters_impl(page_size, false)
149    }
150
151    #[doc(hidden)]
152    pub fn entries_with_dynamic_adapters_with(
153        &self,
154        page_size: usize,
155        enable_latest_event_sorter: bool,
156    ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
157    {
158        self.entries_with_dynamic_adapters_impl(page_size, enable_latest_event_sorter)
159    }
160
161    fn entries_with_dynamic_adapters_impl(
162        &self,
163        page_size: usize,
164        enable_latest_event_sorter: bool,
165    ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
166    {
167        let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
168        let list = self.sliding_sync_list.clone();
169
170        let filter_fn_cell = AsyncCell::shared();
171
172        let limit = SharedObservable::<usize>::new(page_size);
173        let limit_stream = limit.subscribe();
174
175        let dynamic_entries_controller = RoomListDynamicEntriesController::new(
176            filter_fn_cell.clone(),
177            page_size,
178            limit,
179            list.maximum_number_of_rooms_stream(),
180        );
181
182        let stream = stream! {
183            loop {
184                let filter_fn = filter_fn_cell.take().await;
185
186                let (raw_values, raw_stream) = self.entries();
187                let values = raw_values.into_iter().map(Into::into).collect::<Vector<RoomListItem>>();
188
189                // Combine normal stream events with other updates from rooms
190                let stream = merge_stream_and_receiver(values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
191
192                let mut sorters: Vec<BoxedSorterFn> = Vec::with_capacity(3);
193
194                if enable_latest_event_sorter {
195                    // Sort by latest event's kind, i.e. put the rooms with a
196                    // **local** latest event first.
197                    sorters.push(Box::new(new_sorter_latest_event()));
198                }
199
200                // Sort rooms by their recency (either by looking
201                // at their latest event's timestamp, or their
202                // `recency_stamp`).
203                sorters.push(Box::new(new_sorter_recency()));
204                // Finally, sort by name.
205                sorters.push(Box::new(new_sorter_name()));
206
207                let (values, stream) = (values, stream)
208                    .filter(filter_fn)
209                    .sort_by(new_sorter_lexicographic(sorters))
210                    .dynamic_head_with_initial_value(page_size, limit_stream.clone());
211
212                // Clearing the stream before chaining with the real stream.
213                yield stream::once(ready(vec![VectorDiff::Reset { values }]))
214                    .chain(stream);
215            }
216        }
217        .switch();
218
219        (stream, dynamic_entries_controller)
220    }
221}
222
223/// This function remembers the current state of the unfiltered room list, so it
224/// knows where all rooms are. When the receiver is triggered, a Set operation
225/// for the room position is inserted to the stream.
226fn merge_stream_and_receiver(
227    mut current_values: Vector<RoomListItem>,
228    raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
229    mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
230) -> impl Stream<Item = Vec<VectorDiff<RoomListItem>>> {
231    stream! {
232        pin_mut!(raw_stream);
233
234        loop {
235            select! {
236                // We want to give priority on updates from `raw_stream` as it will necessarily trigger a “refresh” of the rooms.
237                biased;
238
239                diffs = raw_stream.next() => {
240                    if let Some(diffs) = diffs {
241                        let diffs = diffs.into_iter().map(|diff| diff.map(RoomListItem::from)).collect::<Vec<_>>();
242
243                        for diff in &diffs {
244                            diff.clone().map(|room| {
245                                trace!(room = %room.room_id(), "updated in response");
246                                room
247                            }).apply(&mut current_values);
248                        }
249
250                        yield diffs;
251                    } else {
252                        // Restart immediately, don't keep on waiting for the receiver
253                        break;
254                    }
255                }
256
257                update = room_info_notable_update_receiver.recv() => {
258                    match update {
259                        Ok(update) => {
260                            // Emit a `VectorDiff::Set` for the specific rooms.
261                            if let Some(index) = current_values.iter().position(|room| room.room_id() == update.room_id) {
262                                let mut room = current_values[index].clone();
263                                room.refresh_cached_data();
264
265                                yield vec![VectorDiff::Set { index, value: room }];
266                            }
267                        }
268
269                        Err(RecvError::Closed) => {
270                            error!("Cannot receive room info notable updates because the sender has been closed");
271
272                            break;
273                        }
274
275                        Err(RecvError::Lagged(n)) => {
276                            error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
277                        }
278                    }
279                }
280            }
281        }
282    }
283}
284
285/// The loading state of a [`RoomList`].
286///
287/// When a [`RoomList`] is displayed to the user, it can be in various states.
288/// This enum tries to represent those states with a correct level of
289/// abstraction.
290///
291/// See [`RoomList::loading_state`].
292#[derive(Clone, Debug, PartialEq, Eq)]
293pub enum RoomListLoadingState {
294    /// The [`RoomList`] has not been loaded yet, i.e. a sync might run
295    /// or not run at all, there is nothing to show in this `RoomList` yet.
296    /// It's a good opportunity to show a placeholder to the user.
297    ///
298    /// From [`Self::NotLoaded`], it's only possible to move to
299    /// [`Self::Loaded`].
300    NotLoaded,
301
302    /// The [`RoomList`] has been loaded, i.e. a sync has been run, or more
303    /// syncs are running, there is probably something to show to the user.
304    /// Either the user has 0 room, in this case, it's a good opportunity to
305    /// show a special screen for that, or the user has multiple rooms, and it's
306    /// the classical room list.
307    ///
308    /// The number of rooms is represented by `maximum_number_of_rooms`.
309    ///
310    /// From [`Self::Loaded`], it's not possible to move back to
311    /// [`Self::NotLoaded`].
312    Loaded {
313        /// The maximum number of rooms a [`RoomList`] contains.
314        ///
315        /// It does not mean that there are exactly this many rooms to display.
316        /// The room entries are represented by [`RoomListItem`]. The room entry
317        /// might have been synced or not synced yet, but we know for sure
318        /// (from the server), that there will be this amount of rooms in the
319        /// list at the end.
320        ///
321        /// Note that it's an `Option`, because it may be possible that the
322        /// server did miss to send us this value. It's up to you, dear reader,
323        /// to know which default to adopt in case of `None`.
324        maximum_number_of_rooms: Option<u32>,
325    },
326}
327
328/// Controller for the [`RoomList`] dynamic entries.
329///
330/// To get one value of this type, use
331/// [`RoomList::entries_with_dynamic_adapters`]
332pub struct RoomListDynamicEntriesController {
333    filter: Arc<AsyncCell<BoxedFilterFn>>,
334    page_size: usize,
335    limit: SharedObservable<usize>,
336    maximum_number_of_rooms: Subscriber<Option<u32>>,
337}
338
339impl RoomListDynamicEntriesController {
340    fn new(
341        filter: Arc<AsyncCell<BoxedFilterFn>>,
342        page_size: usize,
343        limit_stream: SharedObservable<usize>,
344        maximum_number_of_rooms: Subscriber<Option<u32>>,
345    ) -> Self {
346        Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
347    }
348
349    /// Set the filter.
350    ///
351    /// If the associated stream has been dropped, returns `false` to indicate
352    /// the operation didn't have an effect.
353    pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
354        if Arc::strong_count(&self.filter) == 1 {
355            // there is no other reference to the boxed filter fn, setting it
356            // would be pointless (no new references can be created from self,
357            // either)
358            false
359        } else {
360            self.filter.set(filter);
361            true
362        }
363    }
364
365    /// Add one page, i.e. view `page_size` more entries in the room list if
366    /// any.
367    pub fn add_one_page(&self) {
368        let Some(max) = self.maximum_number_of_rooms.get() else {
369            return;
370        };
371
372        let max: usize = max.try_into().unwrap();
373        let limit = self.limit.get();
374
375        if limit < max {
376            // With this logic, it is possible that `limit` becomes greater than `max` if
377            // `max - limit < page_size`, and that's perfectly fine. It's OK to have a
378            // `limit` greater than `max`, but it's not OK to increase the limit
379            // indefinitely.
380            self.limit.set_if_not_eq(limit + self.page_size);
381        }
382    }
383
384    /// Reset the one page, i.e. forget all pages and move back to the first
385    /// page.
386    pub fn reset_to_one_page(&self) {
387        self.limit.set_if_not_eq(self.page_size);
388    }
389}
390
391/// A facade type that derefs to [`Room`] and that caches data from
392/// [`RoomInfo`].
393///
394/// Why caching data? [`RoomInfo`] is behind a lock. Every time a filter or a
395/// sorter calls a method on [`Room`], it's likely to hit the lock in front of
396/// [`RoomInfo`]. It creates a big contention. By caching the data, it avoids
397/// hitting the lock, improving the performance greatly.
398///
399/// Data are refreshed in `merge_stream_and_receiver` (private function).
400///
401/// [`RoomInfo`]: matrix_sdk::RoomInfo
402#[derive(Clone, Debug)]
403pub struct RoomListItem {
404    /// The inner room.
405    inner: Room,
406
407    /// Cache of `Room::new_latest_event_timestamp`.
408    pub(super) cached_latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
409
410    /// Cache of `Room::new_latest_event_is_local`.
411    pub(super) cached_latest_event_is_local: bool,
412
413    /// Cache of `Room::recency_stamp`.
414    pub(super) cached_recency_stamp: Option<RoomRecencyStamp>,
415
416    /// Cache of `Room::cached_display_name`, already as a string.
417    pub(super) cached_display_name: Option<String>,
418
419    /// Cache of `Room::is_space`.
420    pub(super) cached_is_space: bool,
421
422    // Cache of `Room::state`.
423    pub(super) cached_state: RoomState,
424}
425
426impl RoomListItem {
427    /// Deconstruct to the inner room value.
428    pub fn into_inner(self) -> Room {
429        self.inner
430    }
431
432    /// Refresh the cached data.
433    pub(super) fn refresh_cached_data(&mut self) {
434        self.cached_latest_event_timestamp = self.inner.new_latest_event_timestamp();
435        self.cached_latest_event_is_local = self.inner.new_latest_event_is_local();
436        self.cached_recency_stamp = self.inner.recency_stamp();
437        self.cached_display_name = self.inner.cached_display_name().map(|name| name.to_string());
438        self.cached_is_space = self.inner.is_space();
439        self.cached_state = self.inner.state();
440    }
441}
442
443impl From<Room> for RoomListItem {
444    fn from(inner: Room) -> Self {
445        let cached_latest_event_timestamp = inner.new_latest_event_timestamp();
446        let cached_latest_event_is_local = inner.new_latest_event_is_local();
447        let cached_recency_stamp = inner.recency_stamp();
448        let cached_display_name = inner.cached_display_name().map(|name| name.to_string());
449        let cached_is_space = inner.is_space();
450        let cached_state = inner.state();
451
452        Self {
453            inner,
454            cached_latest_event_timestamp,
455            cached_latest_event_is_local,
456            cached_recency_stamp,
457            cached_display_name,
458            cached_is_space,
459            cached_state,
460        }
461    }
462}
463
464impl Deref for RoomListItem {
465    type Target = Room;
466
467    fn deref(&self) -> &Self::Target {
468        &self.inner
469    }
470}