matrix_sdk_ui/room_list_service/
room_list.rs1use std::{future::ready, ops::Deref, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{Stream, StreamExt as _, pin_mut, stream};
24use matrix_sdk::{
25 Client, Room, RoomRecencyStamp, RoomState, SlidingSync, SlidingSyncList,
26 executor::{JoinHandle, spawn},
27};
28use matrix_sdk_base::RoomInfoNotableUpdate;
29use ruma::MilliSecondsSinceUnixEpoch;
30use tokio::{
31 select,
32 sync::broadcast::{self, error::RecvError},
33};
34use tracing::{error, trace};
35
36use super::{
37 Error, State,
38 filters::BoxedFilterFn,
39 sorters::{
40 BoxedSorterFn, new_sorter_latest_event, new_sorter_lexicographic, new_sorter_name,
41 new_sorter_recency,
42 },
43};
44
45#[derive(Debug)]
48pub struct RoomList {
49 client: Client,
50 sliding_sync_list: SlidingSyncList,
51 loading_state: SharedObservable<RoomListLoadingState>,
52 loading_state_task: JoinHandle<()>,
53}
54
55impl Drop for RoomList {
56 fn drop(&mut self) {
57 self.loading_state_task.abort();
58 }
59}
60
61impl RoomList {
62 pub(super) async fn new(
63 client: &Client,
64 sliding_sync: &Arc<SlidingSync>,
65 sliding_sync_list_name: &str,
66 room_list_service_state: Subscriber<State>,
67 ) -> Result<Self, Error> {
68 let sliding_sync_list = sliding_sync
69 .on_list(sliding_sync_list_name, |list| ready(list.clone()))
70 .await
71 .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
72
73 let loading_state =
74 SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
75 Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
76 maximum_number_of_rooms: Some(maximum_number_of_rooms),
77 },
78 None => RoomListLoadingState::NotLoaded,
79 });
80
81 Ok(Self {
82 client: client.clone(),
83 sliding_sync_list: sliding_sync_list.clone(),
84 loading_state: loading_state.clone(),
85 loading_state_task: spawn(async move {
86 pin_mut!(room_list_service_state);
87
88 while let Some(state) = room_list_service_state.next().await {
92 use State::*;
93
94 match state {
95 Terminated { .. } | Error { .. } | Init => (),
96 SettingUp | Recovering | Running => break,
97 }
98 }
99
100 let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
102
103 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
104
105 let mut maximum_number_of_rooms_stream =
107 sliding_sync_list.maximum_number_of_rooms_stream();
108
109 while let Some(maximum_number_of_rooms) =
110 maximum_number_of_rooms_stream.next().await
111 {
112 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
113 }
114 }),
115 })
116 }
117
118 pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
124 self.loading_state.subscribe_reset()
125 }
126
127 fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
129 self.client.rooms_stream()
130 }
131
132 pub fn entries_with_dynamic_adapters(
144 &self,
145 page_size: usize,
146 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
147 {
148 self.entries_with_dynamic_adapters_impl(page_size, false)
149 }
150
151 #[doc(hidden)]
152 pub fn entries_with_dynamic_adapters_with(
153 &self,
154 page_size: usize,
155 enable_latest_event_sorter: bool,
156 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
157 {
158 self.entries_with_dynamic_adapters_impl(page_size, enable_latest_event_sorter)
159 }
160
161 fn entries_with_dynamic_adapters_impl(
162 &self,
163 page_size: usize,
164 enable_latest_event_sorter: bool,
165 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
166 {
167 let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
168 let list = self.sliding_sync_list.clone();
169
170 let filter_fn_cell = AsyncCell::shared();
171
172 let limit = SharedObservable::<usize>::new(page_size);
173 let limit_stream = limit.subscribe();
174
175 let dynamic_entries_controller = RoomListDynamicEntriesController::new(
176 filter_fn_cell.clone(),
177 page_size,
178 limit,
179 list.maximum_number_of_rooms_stream(),
180 );
181
182 let stream = stream! {
183 loop {
184 let filter_fn = filter_fn_cell.take().await;
185
186 let (raw_values, raw_stream) = self.entries();
187 let values = raw_values.into_iter().map(Into::into).collect::<Vector<RoomListItem>>();
188
189 let stream = merge_stream_and_receiver(values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
191
192 let mut sorters: Vec<BoxedSorterFn> = Vec::with_capacity(3);
193
194 if enable_latest_event_sorter {
195 sorters.push(Box::new(new_sorter_latest_event()));
198 }
199
200 sorters.push(Box::new(new_sorter_recency()));
204 sorters.push(Box::new(new_sorter_name()));
206
207 let (values, stream) = (values, stream)
208 .filter(filter_fn)
209 .sort_by(new_sorter_lexicographic(sorters))
210 .dynamic_head_with_initial_value(page_size, limit_stream.clone());
211
212 yield stream::once(ready(vec![VectorDiff::Reset { values }]))
214 .chain(stream);
215 }
216 }
217 .switch();
218
219 (stream, dynamic_entries_controller)
220 }
221}
222
223fn merge_stream_and_receiver(
227 mut current_values: Vector<RoomListItem>,
228 raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
229 mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
230) -> impl Stream<Item = Vec<VectorDiff<RoomListItem>>> {
231 stream! {
232 pin_mut!(raw_stream);
233
234 loop {
235 select! {
236 biased;
238
239 diffs = raw_stream.next() => {
240 if let Some(diffs) = diffs {
241 let diffs = diffs.into_iter().map(|diff| diff.map(RoomListItem::from)).collect::<Vec<_>>();
242
243 for diff in &diffs {
244 diff.clone().map(|room| {
245 trace!(room = %room.room_id(), "updated in response");
246 room
247 }).apply(&mut current_values);
248 }
249
250 yield diffs;
251 } else {
252 break;
254 }
255 }
256
257 update = room_info_notable_update_receiver.recv() => {
258 match update {
259 Ok(update) => {
260 if let Some(index) = current_values.iter().position(|room| room.room_id() == update.room_id) {
262 let mut room = current_values[index].clone();
263 room.refresh_cached_data();
264
265 yield vec![VectorDiff::Set { index, value: room }];
266 }
267 }
268
269 Err(RecvError::Closed) => {
270 error!("Cannot receive room info notable updates because the sender has been closed");
271
272 break;
273 }
274
275 Err(RecvError::Lagged(n)) => {
276 error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
277 }
278 }
279 }
280 }
281 }
282 }
283}
284
285#[derive(Clone, Debug, PartialEq, Eq)]
293pub enum RoomListLoadingState {
294 NotLoaded,
301
302 Loaded {
313 maximum_number_of_rooms: Option<u32>,
325 },
326}
327
328pub struct RoomListDynamicEntriesController {
333 filter: Arc<AsyncCell<BoxedFilterFn>>,
334 page_size: usize,
335 limit: SharedObservable<usize>,
336 maximum_number_of_rooms: Subscriber<Option<u32>>,
337}
338
339impl RoomListDynamicEntriesController {
340 fn new(
341 filter: Arc<AsyncCell<BoxedFilterFn>>,
342 page_size: usize,
343 limit_stream: SharedObservable<usize>,
344 maximum_number_of_rooms: Subscriber<Option<u32>>,
345 ) -> Self {
346 Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
347 }
348
349 pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
354 if Arc::strong_count(&self.filter) == 1 {
355 false
359 } else {
360 self.filter.set(filter);
361 true
362 }
363 }
364
365 pub fn add_one_page(&self) {
368 let Some(max) = self.maximum_number_of_rooms.get() else {
369 return;
370 };
371
372 let max: usize = max.try_into().unwrap();
373 let limit = self.limit.get();
374
375 if limit < max {
376 self.limit.set_if_not_eq(limit + self.page_size);
381 }
382 }
383
384 pub fn reset_to_one_page(&self) {
387 self.limit.set_if_not_eq(self.page_size);
388 }
389}
390
391#[derive(Clone, Debug)]
403pub struct RoomListItem {
404 inner: Room,
406
407 pub(super) cached_latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
409
410 pub(super) cached_latest_event_is_local: bool,
412
413 pub(super) cached_recency_stamp: Option<RoomRecencyStamp>,
415
416 pub(super) cached_display_name: Option<String>,
418
419 pub(super) cached_is_space: bool,
421
422 pub(super) cached_state: RoomState,
424}
425
426impl RoomListItem {
427 pub fn into_inner(self) -> Room {
429 self.inner
430 }
431
432 pub(super) fn refresh_cached_data(&mut self) {
434 self.cached_latest_event_timestamp = self.inner.new_latest_event_timestamp();
435 self.cached_latest_event_is_local = self.inner.new_latest_event_is_local();
436 self.cached_recency_stamp = self.inner.recency_stamp();
437 self.cached_display_name = self.inner.cached_display_name().map(|name| name.to_string());
438 self.cached_is_space = self.inner.is_space();
439 self.cached_state = self.inner.state();
440 }
441}
442
443impl From<Room> for RoomListItem {
444 fn from(inner: Room) -> Self {
445 let cached_latest_event_timestamp = inner.new_latest_event_timestamp();
446 let cached_latest_event_is_local = inner.new_latest_event_is_local();
447 let cached_recency_stamp = inner.recency_stamp();
448 let cached_display_name = inner.cached_display_name().map(|name| name.to_string());
449 let cached_is_space = inner.is_space();
450 let cached_state = inner.state();
451
452 Self {
453 inner,
454 cached_latest_event_timestamp,
455 cached_latest_event_is_local,
456 cached_recency_stamp,
457 cached_display_name,
458 cached_is_space,
459 cached_state,
460 }
461 }
462}
463
464impl Deref for RoomListItem {
465 type Target = Room;
466
467 fn deref(&self) -> &Self::Target {
468 &self.inner
469 }
470}