matrix_sdk_ui/room_list_service/
room_list.rs1use std::{future::ready, ops::Deref, sync::Arc};
16
17use async_cell::sync::AsyncCell;
18use async_rx::StreamExt as _;
19use async_stream::stream;
20use eyeball::{SharedObservable, Subscriber};
21use eyeball_im::{Vector, VectorDiff};
22use eyeball_im_util::vector::VectorObserverExt;
23use futures_util::{Stream, StreamExt as _, pin_mut, stream};
24use matrix_sdk::{
25 Client, Room, RoomRecencyStamp, RoomState, SlidingSync, SlidingSyncList,
26 executor::{JoinHandle, spawn},
27};
28use matrix_sdk_base::RoomInfoNotableUpdate;
29use ruma::MilliSecondsSinceUnixEpoch;
30use tokio::{
31 select,
32 sync::broadcast::{self, error::RecvError},
33};
34use tracing::{error, trace};
35
36use super::{
37 Error, State,
38 filters::BoxedFilterFn,
39 sorters::{
40 BoxedSorterFn, new_sorter_latest_event, new_sorter_lexicographic, new_sorter_name,
41 new_sorter_recency,
42 },
43};
44
45#[derive(Debug)]
48pub struct RoomList {
49 client: Client,
50 sliding_sync_list: SlidingSyncList,
51 loading_state: SharedObservable<RoomListLoadingState>,
52 loading_state_task: JoinHandle<()>,
53}
54
55impl Drop for RoomList {
56 fn drop(&mut self) {
57 self.loading_state_task.abort();
58 }
59}
60
61impl RoomList {
62 pub(super) async fn new(
63 client: &Client,
64 sliding_sync: &Arc<SlidingSync>,
65 sliding_sync_list_name: &str,
66 room_list_service_state: Subscriber<State>,
67 ) -> Result<Self, Error> {
68 let sliding_sync_list = sliding_sync
69 .on_list(sliding_sync_list_name, |list| ready(list.clone()))
70 .await
71 .ok_or_else(|| Error::UnknownList(sliding_sync_list_name.to_owned()))?;
72
73 let loading_state =
74 SharedObservable::new(match sliding_sync_list.maximum_number_of_rooms() {
75 Some(maximum_number_of_rooms) => RoomListLoadingState::Loaded {
76 maximum_number_of_rooms: Some(maximum_number_of_rooms),
77 },
78 None => RoomListLoadingState::NotLoaded,
79 });
80
81 Ok(Self {
82 client: client.clone(),
83 sliding_sync_list: sliding_sync_list.clone(),
84 loading_state: loading_state.clone(),
85 loading_state_task: spawn(async move {
86 pin_mut!(room_list_service_state);
87
88 while let Some(state) = room_list_service_state.next().await {
92 use State::*;
93
94 match state {
95 Terminated { .. } | Error { .. } | Init => (),
96 SettingUp | Recovering | Running => break,
97 }
98 }
99
100 let maximum_number_of_rooms = sliding_sync_list.maximum_number_of_rooms();
102
103 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
104
105 let mut maximum_number_of_rooms_stream =
107 sliding_sync_list.maximum_number_of_rooms_stream();
108
109 while let Some(maximum_number_of_rooms) =
110 maximum_number_of_rooms_stream.next().await
111 {
112 loading_state.set(RoomListLoadingState::Loaded { maximum_number_of_rooms });
113 }
114 }),
115 })
116 }
117
118 pub fn loading_state(&self) -> Subscriber<RoomListLoadingState> {
122 self.loading_state.subscribe_reset()
123 }
124
125 fn entries(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
127 self.client.rooms_stream()
128 }
129
130 pub fn entries_with_dynamic_adapters(
142 &self,
143 page_size: usize,
144 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
145 {
146 self.entries_with_dynamic_adapters_impl(page_size, false)
147 }
148
149 #[doc(hidden)]
150 pub fn entries_with_dynamic_adapters_with(
151 &self,
152 page_size: usize,
153 enable_latest_event_sorter: bool,
154 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
155 {
156 self.entries_with_dynamic_adapters_impl(page_size, enable_latest_event_sorter)
157 }
158
159 fn entries_with_dynamic_adapters_impl(
160 &self,
161 page_size: usize,
162 enable_latest_event_sorter: bool,
163 ) -> (impl Stream<Item = Vec<VectorDiff<RoomListItem>>> + '_, RoomListDynamicEntriesController)
164 {
165 let room_info_notable_update_receiver = self.client.room_info_notable_update_receiver();
166 let list = self.sliding_sync_list.clone();
167
168 let filter_fn_cell = AsyncCell::shared();
169
170 let limit = SharedObservable::<usize>::new(page_size);
171 let limit_stream = limit.subscribe();
172
173 let dynamic_entries_controller = RoomListDynamicEntriesController::new(
174 filter_fn_cell.clone(),
175 page_size,
176 limit,
177 list.maximum_number_of_rooms_stream(),
178 );
179
180 let stream = stream! {
181 loop {
182 let filter_fn = filter_fn_cell.take().await;
183
184 let (raw_values, raw_stream) = self.entries();
185 let values = raw_values.into_iter().map(Into::into).collect::<Vector<RoomListItem>>();
186
187 let stream = merge_stream_and_receiver(values.clone(), raw_stream, room_info_notable_update_receiver.resubscribe());
189
190 let mut sorters: Vec<BoxedSorterFn> = Vec::with_capacity(3);
191
192 if enable_latest_event_sorter {
193 sorters.push(Box::new(new_sorter_latest_event()));
196 }
197
198 sorters.push(Box::new(new_sorter_recency()));
202 sorters.push(Box::new(new_sorter_name()));
204
205 let (values, stream) = (values, stream)
206 .filter(filter_fn)
207 .sort_by(new_sorter_lexicographic(sorters))
208 .dynamic_head_with_initial_value(page_size, limit_stream.clone());
209
210 yield stream::once(ready(vec![VectorDiff::Reset { values }]))
212 .chain(stream);
213 }
214 }
215 .switch();
216
217 (stream, dynamic_entries_controller)
218 }
219}
220
221fn merge_stream_and_receiver(
225 mut current_values: Vector<RoomListItem>,
226 raw_stream: impl Stream<Item = Vec<VectorDiff<Room>>>,
227 mut room_info_notable_update_receiver: broadcast::Receiver<RoomInfoNotableUpdate>,
228) -> impl Stream<Item = Vec<VectorDiff<RoomListItem>>> {
229 stream! {
230 pin_mut!(raw_stream);
231
232 loop {
233 select! {
234 biased;
236
237 diffs = raw_stream.next() => {
238 if let Some(diffs) = diffs {
239 let diffs = diffs.into_iter().map(|diff| diff.map(RoomListItem::from)).collect::<Vec<_>>();
240
241 for diff in &diffs {
242 diff.clone().map(|room| {
243 trace!(room = %room.room_id(), "updated in response");
244 room
245 }).apply(&mut current_values);
246 }
247
248 yield diffs;
249 } else {
250 break;
252 }
253 }
254
255 update = room_info_notable_update_receiver.recv() => {
256 match update {
257 Ok(update) => {
258 if let Some(index) = current_values.iter().position(|room| room.room_id() == update.room_id) {
260 let mut room = current_values[index].clone();
261 room.refresh_cached_data();
262
263 yield vec![VectorDiff::Set { index, value: room }];
264 }
265 }
266
267 Err(RecvError::Closed) => {
268 error!("Cannot receive room info notable updates because the sender has been closed");
269
270 break;
271 }
272
273 Err(RecvError::Lagged(n)) => {
274 error!(number_of_missed_updates = n, "Lag when receiving room info notable update");
275 }
276 }
277 }
278 }
279 }
280 }
281}
282
283#[derive(Clone, Debug, PartialEq, Eq)]
289pub enum RoomListLoadingState {
290 NotLoaded,
297
298 Loaded {
309 maximum_number_of_rooms: Option<u32>,
321 },
322}
323
324pub struct RoomListDynamicEntriesController {
329 filter: Arc<AsyncCell<BoxedFilterFn>>,
330 page_size: usize,
331 limit: SharedObservable<usize>,
332 maximum_number_of_rooms: Subscriber<Option<u32>>,
333}
334
335impl RoomListDynamicEntriesController {
336 fn new(
337 filter: Arc<AsyncCell<BoxedFilterFn>>,
338 page_size: usize,
339 limit_stream: SharedObservable<usize>,
340 maximum_number_of_rooms: Subscriber<Option<u32>>,
341 ) -> Self {
342 Self { filter, page_size, limit: limit_stream, maximum_number_of_rooms }
343 }
344
345 pub fn set_filter(&self, filter: BoxedFilterFn) -> bool {
350 if Arc::strong_count(&self.filter) == 1 {
351 false
355 } else {
356 self.filter.set(filter);
357 true
358 }
359 }
360
361 pub fn add_one_page(&self) {
364 let Some(max) = self.maximum_number_of_rooms.get() else {
365 return;
366 };
367
368 let max: usize = max.try_into().unwrap();
369 let limit = self.limit.get();
370
371 if limit < max {
372 self.limit.set_if_not_eq(limit + self.page_size);
377 }
378 }
379
380 pub fn reset_to_one_page(&self) {
383 self.limit.set_if_not_eq(self.page_size);
384 }
385}
386
387#[derive(Clone, Debug)]
399pub struct RoomListItem {
400 inner: Room,
402
403 pub(super) cached_latest_event_timestamp: Option<MilliSecondsSinceUnixEpoch>,
405
406 pub(super) cached_latest_event_is_local: bool,
408
409 pub(super) cached_recency_stamp: Option<RoomRecencyStamp>,
411
412 pub(super) cached_display_name: Option<String>,
414
415 pub(super) cached_is_space: bool,
417
418 pub(super) cached_state: RoomState,
420}
421
422impl RoomListItem {
423 pub fn into_inner(self) -> Room {
425 self.inner
426 }
427
428 pub(super) fn refresh_cached_data(&mut self) {
430 self.cached_latest_event_timestamp = self.inner.new_latest_event_timestamp();
431 self.cached_latest_event_is_local = self.inner.new_latest_event_is_local();
432 self.cached_recency_stamp = self.inner.recency_stamp();
433 self.cached_display_name = self.inner.cached_display_name().map(|name| name.to_string());
434 self.cached_state = self.inner.state();
436 }
437}
438
439impl From<Room> for RoomListItem {
440 fn from(inner: Room) -> Self {
441 let cached_latest_event_timestamp = inner.new_latest_event_timestamp();
442 let cached_latest_event_is_local = inner.new_latest_event_is_local();
443 let cached_recency_stamp = inner.recency_stamp();
444 let cached_display_name = inner.cached_display_name().map(|name| name.to_string());
445 let cached_is_space = inner.is_space();
446 let cached_state = inner.state();
447
448 Self {
449 inner,
450 cached_latest_event_timestamp,
451 cached_latest_event_is_local,
452 cached_recency_stamp,
453 cached_display_name,
454 cached_is_space,
455 cached_state,
456 }
457 }
458}
459
460impl Deref for RoomListItem {
461 type Target = Room;
462
463 fn deref(&self) -> &Self::Target {
464 &self.inner
465 }
466}