matrix_sdk_ui/room_list_service/
room_list.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{future::ready, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{pin_mut, stream, Stream, StreamExt as _};
24use matrix_sdk::{
25    executor::{spawn, JoinHandle},
26    Client, SlidingSync, SlidingSyncList,
27};
28use matrix_sdk_base::RoomInfoNotableUpdate;
29use tokio::{
30    select,
31    sync::broadcast::{self, error::RecvError},
32};
33use tracing::{error, trace};
34
35use super::{
36    filters::BoxedFilterFn,
37    sorters::{new_sorter_lexicographic, new_sorter_name, new_sorter_recency},
38    Error, Room, State,
39};
40
41/// A `RoomList` represents a list of rooms, from a
42/// [`RoomListService`](super::RoomListService).
43#[derive(Debug)]
44pub struct RoomList {
45    client: Client,
46    sliding_sync_list: SlidingSyncList,
47    loading_state: SharedObservable<RoomListLoadingState>,
48    loading_state_task: JoinHandle<()>,
49}
50
51impl Drop for RoomList {
52    fn drop(&mut self) {
53        self.loading_state_task.abort();
54    }
55}
56
57impl RoomList {
58    pub(super) async fn new(
59        client: &Client,
60        sliding_sync: &Arc<SlidingSync>,
61        sliding_sync_list_name: &str,
62        room_list_service_state: Subscriber<State>,
63    ) -> Result<Self, Error> {
64        let sliding_sync_list = sliding_sync
65            .on_list(sliding_sync_list_name, |list| ready(list.clone()))
66            .await
67            .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
68
69        let loading_state =
70            SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
71                Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
72                    maximum_number_of_rooms: Some(maximum_number_of_rooms),
73                },
74                None => RoomListLoadingState::NotLoaded,
75            });
76
77        Ok(Self {
78            client: client.clone(),
79            sliding_sync_list: sliding_sync_list.clone(),
80            loading_state: loading_state.clone(),
81            loading_state_task: spawn(async move {
82                pin_mut!(room_list_service_state);
83
84                // As soon as `RoomListService` changes its state, if it isn't
85                // `Terminated` nor `Error`, we know we have fetched something,
86                // so the room list is loaded.
87                while let Some(state) = room_list_service_state.next().await {
88                    use State::*;
89
90                    match state {
91                        Terminated { .. } | Error { .. } | Init => (),
92                        SettingUp | Recovering | Running => break,
93                    }
94                }
95
96                // Let's jump from `NotLoaded` to `Loaded`.
97                let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
98
99                loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
100
101                // Wait for updates on the maximum number of rooms to update again.
102                let mut maximum_number_of_rooms_stream =
103                    sliding_sync_list.maximum_number_of_rooms_stream();
104
105                while let Some(maximum_number_of_rooms) =
106                    maximum_number_of_rooms_stream.next().await
107                {
108                    loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
109                }
110            }),
111        })
112    }
113
114    /// Get a subscriber to the room list loading state.
115    ///
116    /// This method will send out the current loading state as the first update.
117    pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
118        self.loading_state.subscribe_reset()
119    }
120
121    /// Get a stream of rooms.
122    fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
123        self.client.rooms_stream()
124    }
125
126    /// Get a configurable stream of rooms.
127    ///
128    /// It's possible to provide a filter that will filter out room list
129    /// entries, and that it's also possible to “paginate” over the entries by
130    /// `page_size`. The rooms are also sorted.
131    ///
132    /// The returned stream will only start yielding diffs once a filter is set
133    /// through the returned [`RoomListDynamicEntriesController`]. For every
134    /// call to [`RoomListDynamicEntriesController::set_filter`], the stream
135    /// will yield a [`VectorDiff::Reset`] followed by any updates of the
136    /// room list under that filter (until the next reset).
137    pub fn entries_with_dynamic_adapters(
138        &self,
139        page_size: usize,
140    ) -> (impl Stream<Item = Vec<VectorDiff<Room>>> + '_, RoomListDynamicEntriesController) {
141        let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
142        let list = self.sliding_sync_list.clone();
143
144        let filter_fn_cell = AsyncCell::shared();
145
146        let limit = SharedObservable::<usize>::new(page_size);
147        let limit_stream = limit.subscribe();
148
149        let dynamic_entries_controller = RoomListDynamicEntriesController::new(
150            filter_fn_cell.clone(),
151            page_size,
152            limit,
153            list.maximum_number_of_rooms_stream(),
154        );
155
156        let stream = stream! {
157            loop {
158                let filter_fn = filter_fn_cell.take().await;
159
160                let (raw_values, raw_stream) = self.entries();
161
162                // Combine normal stream events with other updates from rooms
163                let merged_streams = merge_stream_and_receiver(raw_values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
164
165                let (values, stream) = (raw_values, merged_streams)
166                    .filter(filter_fn)
167                    .sort_by(new_sorter_lexicographic(vec![
168                        Box::new(new_sorter_recency()),
169                        Box::new(new_sorter_name())
170                    ]))
171                    .dynamic_head_with_initial_value(page_size, limit_stream.clone());
172
173                // Clearing the stream before chaining with the real stream.
174                yield stream::once(ready(vec![VectorDiff::Reset { values }]))
175                    .chain(stream);
176            }
177        }
178        .switch();
179
180        (stream, dynamic_entries_controller)
181    }
182}
183
184/// This function remembers the current state of the unfiltered room list, so it
185/// knows where all rooms are. When the receiver is triggered, a Set operation
186/// for the room position is inserted to the stream.
187fn merge_stream_and_receiver(
188    mut raw_current_values: Vector<Room>,
189    raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
190    mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
191) -> impl Stream<Item = Vec<VectorDiff<Room>>> {
192    stream! {
193        pin_mut!(raw_stream);
194
195        loop {
196            select! {
197                // We want to give priority on updates from `raw_stream` as it will necessarily trigger a “refresh” of the rooms.
198                biased;
199
200                diffs = raw_stream.next() => {
201                    if let Some(diffs) = diffs {
202                        for diff in &diffs {
203                            diff.clone().map(|room| {
204                                trace!(room = %room.room_id(), "updated in response");
205                                room
206                            }).apply(&mut raw_current_values);
207                        }
208
209                        yield diffs;
210                    } else {
211                        // Restart immediately, don't keep on waiting for the receiver
212                        break;
213                    }
214                }
215
216                update = room_info_notable_update_receiver.recv() => {
217                    match update {
218                        Ok(update) => {
219                            // Emit a `VectorDiff::Set` for the specific rooms.
220                            if let Some(index) = raw_current_values.iter().position(|room| room.room_id() == update.room_id) {
221                                let room = &raw_current_values[index];
222                                let update = VectorDiff::Set { index, value: room.clone() };
223                                yield vec![update];
224                            }
225                        }
226
227                        Err(RecvError::Closed) => {
228                            error!("Cannot receive room info notable updates because the sender has been closed");
229
230                            break;
231                        }
232
233                        Err(RecvError::Lagged(n)) => {
234                            error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
235                        }
236                    }
237                }
238            }
239        }
240    }
241}
242
243/// The loading state of a [`RoomList`].
244///
245/// When a [`RoomList`] is displayed to the user, it can be in various states.
246/// This enum tries to represent those states with a correct level of
247/// abstraction.
248#[derive(Clone, Debug, PartialEq, Eq)]
249pub enum RoomListLoadingState {
250    /// The [`RoomList`] has not been loaded yet, i.e. a sync might run
251    /// or not run at all, there is nothing to show in this `RoomList` yet.
252    /// It's a good opportunity to show a placeholder to the user.
253    ///
254    /// From [`Self::NotLoaded`], it's only possible to move to
255    /// [`Self::Loaded`].
256    NotLoaded,
257
258    /// The [`RoomList`] has been loaded, i.e. a sync has been run, or more
259    /// syncs are running, there is probably something to show to the user.
260    /// Either the user has 0 room, in this case, it's a good opportunity to
261    /// show a special screen for that, or the user has multiple rooms, and it's
262    /// the classical room list.
263    ///
264    /// The number of rooms is represented by `maximum_number_of_rooms`.
265    ///
266    /// From [`Self::Loaded`], it's not possible to move back to
267    /// [`Self::NotLoaded`].
268    Loaded {
269        /// The maximum number of rooms a [`RoomList`] contains.
270        ///
271        /// It does not mean that there are exactly this many rooms to display.
272        /// Usually, the room entries are represented by [`Room`]. The room
273        /// entry might have been synced or not synced yet, but we know for sure
274        /// (from the server), that there will be this amount of rooms in the
275        /// list at the end.
276        ///
277        /// Note that it's an `Option`, because it may be possible that the
278        /// server did miss to send us this value. It's up to you, dear reader,
279        /// to know which default to adopt in case of `None`.
280        maximum_number_of_rooms: Option<u32>,
281    },
282}
283
284/// Controller for the [`RoomList`] dynamic entries.
285///
286/// To get one value of this type, use
287/// [`RoomList::entries_with_dynamic_adapters`]
288pub struct RoomListDynamicEntriesController {
289    filter: Arc<AsyncCell<BoxedFilterFn>>,
290    page_size: usize,
291    limit: SharedObservable<usize>,
292    maximum_number_of_rooms: Subscriber<Option<u32>>,
293}
294
295impl RoomListDynamicEntriesController {
296    fn new(
297        filter: Arc<AsyncCell<BoxedFilterFn>>,
298        page_size: usize,
299        limit_stream: SharedObservable<usize>,
300        maximum_number_of_rooms: Subscriber<Option<u32>>,
301    ) -> Self {
302        Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
303    }
304
305    /// Set the filter.
306    ///
307    /// If the associated stream has been dropped, returns `false` to indicate
308    /// the operation didn't have an effect.
309    pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
310        if Arc::strong_count(&self.filter) == 1 {
311            // there is no other reference to the boxed filter fn, setting it
312            // would be pointless (no new references can be created from self,
313            // either)
314            false
315        } else {
316            self.filter.set(filter);
317            true
318        }
319    }
320
321    /// Add one page, i.e. view `page_size` more entries in the room list if
322    /// any.
323    pub fn add_one_page(&self) {
324        let Some(max) = self.maximum_number_of_rooms.get() else {
325            return;
326        };
327
328        let max: usize = max.try_into().unwrap();
329        let limit = self.limit.get();
330
331        if limit < max {
332            // With this logic, it is possible that `limit` becomes greater than `max` if
333            // `max - limit < page_size`, and that's perfectly fine. It's OK to have a
334            // `limit` greater than `max`, but it's not OK to increase the limit
335            // indefinitely.
336            self.limit.set_if_not_eq(limit + self.page_size);
337        }
338    }
339
340    /// Reset the one page, i.e. forget all pages and move back to the first
341    /// page.
342    pub fn reset_to_one_page(&self) {
343        self.limit.set_if_not_eq(self.page_size);
344    }
345}