matrix_sdk_ui/room_list_service/
room_list.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{future::ready, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{pin_mut, stream, Stream, StreamExt as _};
24use matrix_sdk::{
25    executor::{spawn, JoinHandle},
26    Client, SlidingSync, SlidingSyncList,
27};
28use matrix_sdk_base::RoomInfoNotableUpdate;
29use tokio::{
30    select,
31    sync::broadcast::{self, error::RecvError},
32};
33use tracing::{error, trace};
34
35use super::{
36    filters::BoxedFilterFn,
37    sorters::{new_sorter_lexicographic, new_sorter_name, new_sorter_recency},
38    Error, Room, State,
39};
40
41/// A `RoomList` represents a list of rooms, from a
42/// [`RoomListService`](super::RoomListService).
43#[derive(Debug)]
44pub struct RoomList {
45    client: Client,
46    sliding_sync: Arc<SlidingSync>,
47    sliding_sync_list: SlidingSyncList,
48    loading_state: SharedObservable<RoomListLoadingState>,
49    loading_state_task: JoinHandle<()>,
50}
51
52impl Drop for RoomList {
53    fn drop(&mut self) {
54        self.loading_state_task.abort();
55    }
56}
57
58impl RoomList {
59    pub(super) async fn new(
60        client: &Client,
61        sliding_sync: &Arc<SlidingSync>,
62        sliding_sync_list_name: &str,
63        room_list_service_state: Subscriber<State>,
64    ) -> Result<Self, Error> {
65        let sliding_sync_list = sliding_sync
66            .on_list(sliding_sync_list_name, |list| ready(list.clone()))
67            .await
68            .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
69
70        let loading_state =
71            SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
72                Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
73                    maximum_number_of_rooms: Some(maximum_number_of_rooms),
74                },
75                None => RoomListLoadingState::NotLoaded,
76            });
77
78        Ok(Self {
79            client: client.clone(),
80            sliding_sync: sliding_sync.clone(),
81            sliding_sync_list: sliding_sync_list.clone(),
82            loading_state: loading_state.clone(),
83            loading_state_task: spawn(async move {
84                pin_mut!(room_list_service_state);
85
86                // As soon as `RoomListService` changes its state, if it isn't
87                // `Terminated` nor `Error`, we know we have fetched something,
88                // so the room list is loaded.
89                while let Some(state) = room_list_service_state.next().await {
90                    use State::*;
91
92                    match state {
93                        Terminated { .. } | Error { .. } | Init => (),
94                        SettingUp | Recovering | Running => break,
95                    }
96                }
97
98                // Let's jump from `NotLoaded` to `Loaded`.
99                let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
100
101                loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
102
103                // Wait for updates on the maximum number of rooms to update again.
104                let mut maximum_number_of_rooms_stream =
105                    sliding_sync_list.maximum_number_of_rooms_stream();
106
107                while let Some(maximum_number_of_rooms) =
108                    maximum_number_of_rooms_stream.next().await
109                {
110                    loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
111                }
112            }),
113        })
114    }
115
116    /// Get a subscriber to the room list loading state.
117    ///
118    /// This method will send out the current loading state as the first update.
119    pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
120        self.loading_state.subscribe_reset()
121    }
122
123    /// Get a stream of rooms.
124    fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
125        let (rooms, stream) = self.client.rooms_stream();
126
127        let map_room = |room| Room::new(room, &self.sliding_sync);
128
129        (
130            rooms.into_iter().map(map_room).collect(),
131            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
132        )
133    }
134
135    /// Get a configurable stream of rooms.
136    ///
137    /// It's possible to provide a filter that will filter out room list
138    /// entries, and that it's also possible to “paginate” over the entries by
139    /// `page_size`. The rooms are also sorted.
140    ///
141    /// The returned stream will only start yielding diffs once a filter is set
142    /// through the returned [`RoomListDynamicEntriesController`]. For every
143    /// call to [`RoomListDynamicEntriesController::set_filter`], the stream
144    /// will yield a [`VectorDiff::Reset`] followed by any updates of the
145    /// room list under that filter (until the next reset).
146    pub fn entries_with_dynamic_adapters(
147        &self,
148        page_size: usize,
149    ) -> (impl Stream<Item = Vec<VectorDiff<Room>>> + '_, RoomListDynamicEntriesController) {
150        let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
151        let list = self.sliding_sync_list.clone();
152
153        let filter_fn_cell = AsyncCell::shared();
154
155        let limit = SharedObservable::<usize>::new(page_size);
156        let limit_stream = limit.subscribe();
157
158        let dynamic_entries_controller = RoomListDynamicEntriesController::new(
159            filter_fn_cell.clone(),
160            page_size,
161            limit,
162            list.maximum_number_of_rooms_stream(),
163        );
164
165        let stream = stream! {
166            loop {
167                let filter_fn = filter_fn_cell.take().await;
168
169                let (raw_values, raw_stream) = self.entries();
170
171                // Combine normal stream events with other updates from rooms
172                let merged_streams = merge_stream_and_receiver(raw_values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
173
174                let (values, stream) = (raw_values, merged_streams)
175                    .filter(filter_fn)
176                    .sort_by(new_sorter_lexicographic(vec![
177                        Box::new(new_sorter_recency()),
178                        Box::new(new_sorter_name())
179                    ]))
180                    .dynamic_head_with_initial_value(page_size, limit_stream.clone());
181
182                // Clearing the stream before chaining with the real stream.
183                yield stream::once(ready(vec![VectorDiff::Reset { values }]))
184                    .chain(stream);
185            }
186        }
187        .switch();
188
189        (stream, dynamic_entries_controller)
190    }
191}
192
193/// This function remembers the current state of the unfiltered room list, so it
194/// knows where all rooms are. When the receiver is triggered, a Set operation
195/// for the room position is inserted to the stream.
196fn merge_stream_and_receiver(
197    mut raw_current_values: Vector<Room>,
198    raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
199    mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
200) -> impl Stream<Item = Vec<VectorDiff<Room>>> {
201    stream! {
202        pin_mut!(raw_stream);
203
204        loop {
205            select! {
206                // We want to give priority on updates from `raw_stream` as it will necessarily trigger a “refresh” of the rooms.
207                biased;
208
209                diffs = raw_stream.next() => {
210                    if let Some(diffs) = diffs {
211                        for diff in &diffs {
212                            diff.clone().map(|room| {
213                                trace!(room = %room.room_id(), "updated in response");
214                                room
215                            }).apply(&mut raw_current_values);
216                        }
217
218                        yield diffs;
219                    } else {
220                        // Restart immediately, don't keep on waiting for the receiver
221                        break;
222                    }
223                }
224
225                update = room_info_notable_update_receiver.recv() => {
226                    match update {
227                        Ok(update) => {
228                            // Emit a `VectorDiff::Set` for the specific rooms.
229                            if let Some(index) = raw_current_values.iter().position(|room| room.room_id() == update.room_id) {
230                                let room = &raw_current_values[index];
231                                let update = VectorDiff::Set { index, value: room.clone() };
232                                yield vec![update];
233                            }
234                        }
235
236                        Err(RecvError::Closed) => {
237                            error!("Cannot receive room info notable updates because the sender has been closed");
238
239                            break;
240                        }
241
242                        Err(RecvError::Lagged(n)) => {
243                            error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
244                        }
245                    }
246                }
247            }
248        }
249    }
250}
251
252/// The loading state of a [`RoomList`].
253///
254/// When a [`RoomList`] is displayed to the user, it can be in various states.
255/// This enum tries to represent those states with a correct level of
256/// abstraction.
257#[derive(Clone, Debug, PartialEq, Eq)]
258pub enum RoomListLoadingState {
259    /// The [`RoomList`] has not been loaded yet, i.e. a sync might run
260    /// or not run at all, there is nothing to show in this `RoomList` yet.
261    /// It's a good opportunity to show a placeholder to the user.
262    ///
263    /// From [`Self::NotLoaded`], it's only possible to move to
264    /// [`Self::Loaded`].
265    NotLoaded,
266
267    /// The [`RoomList`] has been loaded, i.e. a sync has been run, or more
268    /// syncs are running, there is probably something to show to the user.
269    /// Either the user has 0 room, in this case, it's a good opportunity to
270    /// show a special screen for that, or the user has multiple rooms, and it's
271    /// the classical room list.
272    ///
273    /// The number of rooms is represented by `maximum_number_of_rooms`.
274    ///
275    /// From [`Self::Loaded`], it's not possible to move back to
276    /// [`Self::NotLoaded`].
277    Loaded {
278        /// The maximum number of rooms a [`RoomList`] contains.
279        ///
280        /// It does not mean that there are exactly this many rooms to display.
281        /// Usually, the room entries are represented by [`Room`]. The room
282        /// entry might have been synced or not synced yet, but we know for sure
283        /// (from the server), that there will be this amount of rooms in the
284        /// list at the end.
285        ///
286        /// Note that it's an `Option`, because it may be possible that the
287        /// server did miss to send us this value. It's up to you, dear reader,
288        /// to know which default to adopt in case of `None`.
289        maximum_number_of_rooms: Option<u32>,
290    },
291}
292
293/// Controller for the [`RoomList`] dynamic entries.
294///
295/// To get one value of this type, use
296/// [`RoomList::entries_with_dynamic_adapters`]
297pub struct RoomListDynamicEntriesController {
298    filter: Arc<AsyncCell<BoxedFilterFn>>,
299    page_size: usize,
300    limit: SharedObservable<usize>,
301    maximum_number_of_rooms: Subscriber<Option<u32>>,
302}
303
304impl RoomListDynamicEntriesController {
305    fn new(
306        filter: Arc<AsyncCell<BoxedFilterFn>>,
307        page_size: usize,
308        limit_stream: SharedObservable<usize>,
309        maximum_number_of_rooms: Subscriber<Option<u32>>,
310    ) -> Self {
311        Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
312    }
313
314    /// Set the filter.
315    ///
316    /// If the associated stream has been dropped, returns `false` to indicate
317    /// the operation didn't have an effect.
318    pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
319        if Arc::strong_count(&self.filter) == 1 {
320            // there is no other reference to the boxed filter fn, setting it
321            // would be pointless (no new references can be created from self,
322            // either)
323            false
324        } else {
325            self.filter.set(filter);
326            true
327        }
328    }
329
330    /// Add one page, i.e. view `page_size` more entries in the room list if
331    /// any.
332    pub fn add_one_page(&self) {
333        let Some(max) = self.maximum_number_of_rooms.get() else {
334            return;
335        };
336
337        let max: usize = max.try_into().unwrap();
338        let limit = self.limit.get();
339
340        if limit < max {
341            // With this logic, it is possible that `limit` becomes greater than `max` if
342            // `max - limit < page_size`, and that's perfectly fine. It's OK to have a
343            // `limit` greater than `max`, but it's not OK to increase the limit
344            // indefinitely.
345            self.limit.set_if_not_eq(limit + self.page_size);
346        }
347    }
348
349    /// Reset the one page, i.e. forget all pages and move back to the first
350    /// page.
351    pub fn reset_to_one_page(&self) {
352        self.limit.set_if_not_eq(self.page_size);
353    }
354}