matrix_sdk_ui/room_list_service/room_list.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{future::ready, ops::Deref, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{Stream, StreamExt as _, pin_mut, stream};
24use matrix_sdk::{
25 Client, Room, RoomRecencyStamp, RoomState, SlidingSync, SlidingSyncList,
26 task_monitor::BackgroundTaskHandle,
27};
28use matrix_sdk_base::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons};
29use ruma::MilliSecondsSinceUnixEpoch;
30use tokio::{
31 select,
32 sync::broadcast::{self, error::RecvError},
33};
34use tracing::{error, trace};
35
36use super::{
37 Error, State,
38 filters::BoxedFilterFn,
39 sorters::{
40 new_sorter_latest_event, new_sorter_lexicographic, new_sorter_name, new_sorter_recency,
41 },
42};
43
44/// A `RoomList` represents a list of rooms, from a
45/// [`RoomListService`](super::RoomListService).
46#[derive(Debug)]
47pub struct RoomList {
48 client: Client,
49 sliding_sync_list: SlidingSyncList,
50 loading_state: SharedObservable<RoomListLoadingState>,
51 _loading_state_task: BackgroundTaskHandle,
52}
53
54impl RoomList {
55 pub(super) async fn new(
56 client: &Client,
57 sliding_sync: &Arc<SlidingSync>,
58 sliding_sync_list_name: &str,
59 room_list_service_state: Subscriber<State>,
60 ) -> Result<Self, Error> {
61 let sliding_sync_list = sliding_sync
62 .on_list(sliding_sync_list_name, |list| ready(list.clone()))
63 .await
64 .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
65
66 let loading_state =
67 SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
68 Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
69 maximum_number_of_rooms: Some(maximum_number_of_rooms),
70 },
71 None => RoomListLoadingState::NotLoaded,
72 });
73
74 Ok(Self {
75 client: client.clone(),
76 sliding_sync_list: sliding_sync_list.clone(),
77 loading_state: loading_state.clone(),
78 _loading_state_task: client
79 .task_monitor()
80 .spawn_background_task("room_list::loading_state_task", async move {
81 pin_mut!(room_list_service_state);
82
83 // As soon as `RoomListService` changes its state, if it isn't
84 // `Terminated` nor `Error`, we know we have fetched something,
85 // so the room list is loaded.
86 while let Some(state) = room_list_service_state.next().await {
87 use State::*;
88
89 match state {
90 Terminated { .. } | Error { .. } | Init => (),
91 SettingUp | Recovering | Running => break,
92 }
93 }
94
95 // Let's jump from `NotLoaded` to `Loaded`.
96 let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
97
98 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
99
100 // Wait for updates on the maximum number of rooms to update again.
101 let mut maximum_number_of_rooms_stream =
102 sliding_sync_list.maximum_number_of_rooms_stream();
103
104 while let Some(maximum_number_of_rooms) =
105 maximum_number_of_rooms_stream.next().await
106 {
107 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
108 }
109 })
110 .abort_on_drop(),
111 })
112 }
113
114 /// Get a subscriber to the room list loading state.
115 ///
116 /// This method will send out the current loading state as the first update.
117 ///
118 /// See [`RoomListLoadingState`].
119 pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
120 self.loading_state.subscribe_reset()
121 }
122
123 /// Get a stream of rooms.
124 fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
125 self.client.rooms_stream()
126 }
127
128 /// Get a configurable stream of rooms.
129 ///
130 /// It's possible to provide a filter that will filter out room list
131 /// entries, and that it's also possible to “paginate” over the entries by
132 /// `page_size`. The rooms are also sorted.
133 ///
134 /// The returned stream will only start yielding diffs once a filter is set
135 /// through the returned [`RoomListDynamicEntriesController`]. For every
136 /// call to [`RoomListDynamicEntriesController::set_filter`], the stream
137 /// will yield a [`VectorDiff::Reset`] followed by any updates of the
138 /// room list under that filter (until the next reset).
139 pub fn entries_with_dynamic_adapters(
140 &self,
141 page_size: usize,
142 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
143 {
144 let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
145 let list = self.sliding_sync_list.clone();
146
147 let filter_fn_cell = AsyncCell::shared();
148
149 let limit = SharedObservable::<usize>::new(page_size);
150 let limit_stream = limit.subscribe();
151
152 let dynamic_entries_controller = RoomListDynamicEntriesController::new(
153 filter_fn_cell.clone(),
154 page_size,
155 limit,
156 list.maximum_number_of_rooms_stream(),
157 );
158
159 let stream = stream! {
160 loop {
161 let filter_fn = filter_fn_cell.take().await;
162
163 let (raw_values, raw_stream) = self.entries();
164 let values = raw_values.into_iter().map(Into::into).collect::<Vector<RoomListItem>>();
165
166 // Combine normal stream events with other updates from rooms
167 let stream = merge_stream_and_receiver(values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
168
169 let (values, stream) = (values, stream)
170 .filter(filter_fn)
171 .sort_by(new_sorter_lexicographic(vec![
172 // Sort by latest event's kind, i.e. put the rooms with a
173 // **local** latest event first.
174 Box::new(new_sorter_latest_event()),
175 // Sort rooms by their recency (either by looking
176 // at their latest event's timestamp, or their
177 // `recency_stamp`).
178 Box::new(new_sorter_recency()),
179 // Finally, sort by name.
180 Box::new(new_sorter_name()),
181 ]))
182 .dynamic_head_with_initial_value(page_size, limit_stream.clone());
183
184 // Clearing the stream before chaining with the real stream.
185 yield stream::once(ready(vec![VectorDiff::Reset { values }]))
186 .chain(stream);
187 }
188 }
189 .switch();
190
191 (stream, dynamic_entries_controller)
192 }
193}
194
195/// This function remembers the current state of the unfiltered room list, so it
196/// knows where all rooms are. When the receiver is triggered, a Set operation
197/// for the room position is inserted to the stream.
198fn merge_stream_and_receiver(
199 mut current_values: Vector<RoomListItem>,
200 raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
201 mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
202) -> impl Stream<Item = Vec<VectorDiff<RoomListItem>>> {
203 stream! {
204 pin_mut!(raw_stream);
205
206 loop {
207 select! {
208 // We want to give priority on updates from `raw_stream` as it will necessarily trigger a “refresh” of the rooms.
209 biased;
210
211 diffs = raw_stream.next() => {
212 if let Some(diffs) = diffs {
213 let diffs = diffs.into_iter().map(|diff| diff.map(RoomListItem::from)).collect::<Vec<_>>();
214
215 for diff in &diffs {
216 diff.clone().map(|room| {
217 trace!(room = %room.room_id(), "updated in response");
218 room
219 }).apply(&mut current_values);
220 }
221
222 yield diffs;
223 } else {
224 // Restart immediately, don't keep on waiting for the receiver
225 break;
226 }
227 }
228
229 update = room_info_notable_update_receiver.recv() => {
230 match update {
231 Ok(update) => {
232 // Filter which _reason_ can trigger an update of
233 // the room list.
234 //
235 // If the update is strictly about the
236 // `RECENCY_STAMP`, let's ignore it, because the
237 // Latest Event type is used to sort the room list
238 // by recency already. We don't want to trigger an
239 // update because of `RECENCY_STAMP`.
240 //
241 // If the update contains more reasons than
242 // `RECENCY_STAMP`, then it's fine. That's why we
243 // are using `==` instead of `contains`.
244 if update.reasons == RoomInfoNotableUpdateReasons::RECENCY_STAMP {
245 continue;
246 }
247
248 // Emit a `VectorDiff::Set` for the specific rooms.
249 if let Some(index) = current_values.iter().position(|room| room.room_id() == update.room_id) {
250 let mut room = current_values[index].clone();
251 room.refresh_cached_data();
252
253 yield vec![VectorDiff::Set { index, value: room }];
254 }
255 }
256
257 Err(RecvError::Closed) => {
258 error!("Cannot receive room info notable updates because the sender has been closed");
259
260 break;
261 }
262
263 Err(RecvError::Lagged(n)) => {
264 error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
265 }
266 }
267 }
268 }
269 }
270 }
271}
272
273/// The loading state of a [`RoomList`].
274///
275/// When a [`RoomList`] is displayed to the user, it can be in various states.
276/// This enum tries to represent those states with a correct level of
277/// abstraction.
278///
279/// See [`RoomList::loading_state`].
280#[derive(Clone, Debug, PartialEq, Eq)]
281pub enum RoomListLoadingState {
282 /// The [`RoomList`] has not been loaded yet, i.e. a sync might run
283 /// or not run at all, there is nothing to show in this `RoomList` yet.
284 /// It's a good opportunity to show a placeholder to the user.
285 ///
286 /// From [`Self::NotLoaded`], it's only possible to move to
287 /// [`Self::Loaded`].
288 NotLoaded,
289
290 /// The [`RoomList`] has been loaded, i.e. a sync has been run, or more
291 /// syncs are running, there is probably something to show to the user.
292 /// Either the user has 0 room, in this case, it's a good opportunity to
293 /// show a special screen for that, or the user has multiple rooms, and it's
294 /// the classical room list.
295 ///
296 /// The number of rooms is represented by `maximum_number_of_rooms`.
297 ///
298 /// From [`Self::Loaded`], it's not possible to move back to
299 /// [`Self::NotLoaded`].
300 Loaded {
301 /// The maximum number of rooms a [`RoomList`] contains.
302 ///
303 /// It does not mean that there are exactly this many rooms to display.
304 /// The room entries are represented by [`RoomListItem`]. The room entry
305 /// might have been synced or not synced yet, but we know for sure
306 /// (from the server), that there will be this amount of rooms in the
307 /// list at the end.
308 ///
309 /// Note that it's an `Option`, because it may be possible that the
310 /// server did miss to send us this value. It's up to you, dear reader,
311 /// to know which default to adopt in case of `None`.
312 maximum_number_of_rooms: Option<u32>,
313 },
314}
315
316/// Controller for the [`RoomList`] dynamic entries.
317///
318/// To get one value of this type, use
319/// [`RoomList::entries_with_dynamic_adapters`]
320pub struct RoomListDynamicEntriesController {
321 filter: Arc<AsyncCell<BoxedFilterFn>>,
322 page_size: usize,
323 limit: SharedObservable<usize>,
324 maximum_number_of_rooms: Subscriber<Option<u32>>,
325}
326
327impl RoomListDynamicEntriesController {
328 fn new(
329 filter: Arc<AsyncCell<BoxedFilterFn>>,
330 page_size: usize,
331 limit_stream: SharedObservable<usize>,
332 maximum_number_of_rooms: Subscriber<Option<u32>>,
333 ) -> Self {
334 Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
335 }
336
337 /// Set the filter.
338 ///
339 /// If the associated stream has been dropped, returns `false` to indicate
340 /// the operation didn't have an effect.
341 pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
342 if Arc::strong_count(&self.filter) == 1 {
343 // there is no other reference to the boxed filter fn, setting it
344 // would be pointless (no new references can be created from self,
345 // either)
346 false
347 } else {
348 self.filter.set(filter);
349 true
350 }
351 }
352
353 /// Add one page, i.e. view `page_size` more entries in the room list if
354 /// any.
355 pub fn add_one_page(&self) {
356 let Some(max) = self.maximum_number_of_rooms.get() else {
357 return;
358 };
359
360 let max: usize = max.try_into().unwrap();
361 let limit = self.limit.get();
362
363 if limit < max {
364 // With this logic, it is possible that `limit` becomes greater than `max` if
365 // `max - limit < page_size`, and that's perfectly fine. It's OK to have a
366 // `limit` greater than `max`, but it's not OK to increase the limit
367 // indefinitely.
368 self.limit.set_if_not_eq(limit + self.page_size);
369 }
370 }
371
372 /// Reset the one page, i.e. forget all pages and move back to the first
373 /// page.
374 pub fn reset_to_one_page(&self) {
375 self.limit.set_if_not_eq(self.page_size);
376 }
377}
378
379/// A facade type that derefs to [`Room`] and that caches data from
380/// [`RoomInfo`].
381///
382/// Why caching data? [`RoomInfo`] is behind a lock. Every time a filter or a
383/// sorter calls a method on [`Room`], it's likely to hit the lock in front of
384/// [`RoomInfo`]. It creates a big contention. By caching the data, it avoids
385/// hitting the lock, improving the performance greatly.
386///
387/// Data are refreshed in `merge_stream_and_receiver` (private function).
388///
389/// [`RoomInfo`]: matrix_sdk::RoomInfo
390#[derive(Clone, Debug)]
391pub struct RoomListItem {
392 /// The inner room.
393 inner: Room,
394
395 /// Cache of `Room::latest_event_timestamp`.
396 pub(super) cached_latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
397
398 /// Cache of `Room::latest_event_is_unsent`.
399 pub(super) cached_latest_event_is_unsent: bool,
400
401 /// Cache of `Room::recency_stamp`.
402 pub(super) cached_recency_stamp: Option<RoomRecencyStamp>,
403
404 /// Cache of `Room::cached_display_name`, already as a string.
405 pub(super) cached_display_name: Option<String>,
406
407 /// Cache of `Room::is_space`.
408 pub(super) cached_is_space: bool,
409
410 // Cache of `Room::state`.
411 pub(super) cached_state: RoomState,
412}
413
414impl RoomListItem {
415 /// Deconstruct to the inner room value.
416 pub fn into_inner(self) -> Room {
417 self.inner
418 }
419
420 /// Refresh the cached data.
421 pub(super) fn refresh_cached_data(&mut self) {
422 self.cached_latest_event_timestamp = self.inner.latest_event_timestamp();
423 self.cached_latest_event_is_unsent = self.inner.latest_event_is_unsent();
424 self.cached_recency_stamp = self.inner.recency_stamp();
425 self.cached_display_name = self.inner.cached_display_name().map(|name| name.to_string());
426 self.cached_is_space = self.inner.is_space();
427 self.cached_state = self.inner.state();
428 }
429}
430
431impl From<Room> for RoomListItem {
432 fn from(inner: Room) -> Self {
433 let cached_latest_event_timestamp = inner.latest_event_timestamp();
434 let cached_latest_event_is_unsent = inner.latest_event_is_unsent();
435 let cached_recency_stamp = inner.recency_stamp();
436 let cached_display_name = inner.cached_display_name().map(|name| name.to_string());
437 let cached_is_space = inner.is_space();
438 let cached_state = inner.state();
439
440 Self {
441 inner,
442 cached_latest_event_timestamp,
443 cached_latest_event_is_unsent,
444 cached_recency_stamp,
445 cached_display_name,
446 cached_is_space,
447 cached_state,
448 }
449 }
450}
451
452impl Deref for RoomListItem {
453 type Target = Room;
454
455 fn deref(&self) -> &Self::Target {
456 &self.inner
457 }
458}