1use std::sync::Arc;
16
17use eyeball::{ObservableWriteGuard, SharedObservable, Subscriber};
18use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
19use futures_util::pin_mut;
20use imbl::Vector;
21use itertools::Itertools;
22use matrix_sdk::{Client, Error, executor::AbortOnDrop, locks::Mutex, paginators::PaginationToken};
23use matrix_sdk_common::executor::spawn;
24use ruma::{OwnedRoomId, api::client::space::get_hierarchy, uint};
25use tokio::sync::Mutex as AsyncMutex;
26use tracing::error;
27
28use crate::spaces::SpaceRoom;
29
30#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
31#[derive(Clone, Debug, Eq, PartialEq)]
32pub enum SpaceRoomListPaginationState {
33 Idle { end_reached: bool },
34 Loading,
35}
36
37pub struct SpaceRoomList {
98 client: Client,
99
100 parent_space_id: OwnedRoomId,
101
102 token: AsyncMutex<PaginationToken>,
103
104 pagination_state: SharedObservable<SpaceRoomListPaginationState>,
105
106 rooms: Arc<Mutex<ObservableVector<SpaceRoom>>>,
107
108 _room_update_handle: AbortOnDrop<()>,
109}
110
111impl SpaceRoomList {
112 pub fn new(client: Client, parent_space_id: OwnedRoomId) -> Self {
114 let rooms = Arc::new(Mutex::new(ObservableVector::<SpaceRoom>::new()));
115
116 let all_room_updates_receiver = client.subscribe_to_all_room_updates();
117
118 let handle = spawn({
119 let client = client.clone();
120 let rooms = rooms.clone();
121
122 async move {
123 pin_mut!(all_room_updates_receiver);
124
125 loop {
126 match all_room_updates_receiver.recv().await {
127 Ok(updates) => {
128 if updates.is_empty() {
129 continue;
130 }
131
132 let mut mutable_rooms = rooms.lock();
133
134 updates.iter_all_room_ids().for_each(|updated_room_id| {
135 if let Some((position, room)) = mutable_rooms
136 .clone()
137 .iter()
138 .find_position(|room| &room.room_id == updated_room_id)
139 && let Some(update_room) = client.get_room(updated_room_id)
140 {
141 mutable_rooms.set(
142 position,
143 SpaceRoom::new_from_known(update_room, room.children_count),
144 );
145 }
146 })
147 }
148 Err(err) => {
149 error!("error when listening to room updates: {err}");
150 }
151 }
152 }
153 }
154 });
155
156 Self {
157 client,
158 parent_space_id,
159 token: AsyncMutex::new(None.into()),
160 pagination_state: SharedObservable::new(SpaceRoomListPaginationState::Idle {
161 end_reached: false,
162 }),
163 rooms,
164 _room_update_handle: AbortOnDrop::new(handle),
165 }
166 }
167
168 pub fn pagination_state(&self) -> SpaceRoomListPaginationState {
170 self.pagination_state.get()
171 }
172
173 pub fn subscribe_to_pagination_state_updates(
175 &self,
176 ) -> Subscriber<SpaceRoomListPaginationState> {
177 self.pagination_state.subscribe()
178 }
179
180 pub fn rooms(&self) -> Vec<SpaceRoom> {
182 self.rooms.lock().iter().cloned().collect_vec()
183 }
184
185 pub fn subscribe_to_room_updates(
187 &self,
188 ) -> (Vector<SpaceRoom>, VectorSubscriberBatchedStream<SpaceRoom>) {
189 self.rooms.lock().subscribe().into_values_and_batched_stream()
190 }
191
192 pub async fn paginate(&self) -> Result<(), Error> {
195 {
196 let mut pagination_state = self.pagination_state.write();
197
198 match *pagination_state {
199 SpaceRoomListPaginationState::Idle { end_reached } if end_reached => {
200 return Ok(());
201 }
202 SpaceRoomListPaginationState::Loading => {
203 return Ok(());
204 }
205 _ => {}
206 }
207
208 ObservableWriteGuard::set(&mut pagination_state, SpaceRoomListPaginationState::Loading);
209 }
210
211 let mut request = get_hierarchy::v1::Request::new(self.parent_space_id.clone());
212 request.max_depth = Some(uint!(1)); let mut pagination_token = self.token.lock().await;
215
216 if let PaginationToken::HasMore(ref token) = *pagination_token {
217 request.from = Some(token.clone());
218 }
219
220 match self.client.send(request).await {
221 Ok(result) => {
222 *pagination_token = match &result.next_batch {
223 Some(val) => PaginationToken::HasMore(val.clone()),
224 None => PaginationToken::HitEnd,
225 };
226
227 let mut rooms = self.rooms.lock();
228 result
229 .rooms
230 .iter()
231 .filter_map(|room| {
232 if room.summary.room_id == self.parent_space_id {
233 None
234 } else {
235 Some(SpaceRoom::new_from_summary(
236 &room.summary,
237 self.client.get_room(&room.summary.room_id),
238 room.children_state.len() as u64,
239 ))
240 }
241 })
242 .for_each(|room| rooms.push_back(room));
243
244 self.pagination_state.set(SpaceRoomListPaginationState::Idle {
245 end_reached: result.next_batch.is_none(),
246 });
247
248 Ok(())
249 }
250 Err(err) => {
251 self.pagination_state
252 .set(SpaceRoomListPaginationState::Idle { end_reached: false });
253 Err(err.into())
254 }
255 }
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use assert_matches2::assert_matches;
262 use eyeball_im::VectorDiff;
263 use futures_util::pin_mut;
264 use matrix_sdk::{RoomState, test_utils::mocks::MatrixMockServer};
265 use matrix_sdk_test::{JoinedRoomBuilder, LeftRoomBuilder, async_test};
266 use ruma::{
267 room::{JoinRuleSummary, RoomSummary},
268 room_id, uint,
269 };
270 use stream_assert::{assert_next_eq, assert_next_matches, assert_pending, assert_ready};
271
272 use crate::spaces::{SpaceRoom, SpaceService, room_list::SpaceRoomListPaginationState};
273
274 #[async_test]
275 async fn test_room_list_pagination() {
276 let server = MatrixMockServer::new().await;
277 let client = server.client_builder().build().await;
278 let space_service = SpaceService::new(client.clone());
279
280 server.mock_room_state_encryption().plain().mount().await;
281
282 let parent_space_id = room_id!("!parent_space:example.org");
283
284 let room_list = space_service.space_room_list(parent_space_id.to_owned());
285
286 assert_matches!(
288 room_list.pagination_state(),
289 SpaceRoomListPaginationState::Idle { end_reached: false }
290 );
291
292 assert_eq!(room_list.rooms(), vec![]);
294
295 let pagination_state_subscriber = room_list.subscribe_to_pagination_state_updates();
298 pin_mut!(pagination_state_subscriber);
299 assert_pending!(pagination_state_subscriber);
300
301 let (_, rooms_subscriber) = room_list.subscribe_to_room_updates();
302 pin_mut!(rooms_subscriber);
303 assert_pending!(rooms_subscriber);
304
305 let child_space_id_1 = room_id!("!1:example.org");
306 let child_space_id_2 = room_id!("!2:example.org");
307
308 server
310 .mock_get_hierarchy()
311 .ok_with_room_ids_and_children_state(
312 vec![child_space_id_1, child_space_id_2],
313 vec![room_id!("!child:example.org")],
314 )
315 .mount()
316 .await;
317
318 room_list.paginate().await.unwrap();
319
320 assert_next_matches!(
322 pagination_state_subscriber,
323 SpaceRoomListPaginationState::Idle { end_reached: true }
324 );
325
326 assert_next_eq!(
328 rooms_subscriber,
329 vec![
330 VectorDiff::PushBack {
331 value: SpaceRoom::new_from_summary(
332 &RoomSummary::new(
333 child_space_id_1.to_owned(),
334 JoinRuleSummary::Public,
335 false,
336 uint!(1),
337 false,
338 ),
339 None,
340 1
341 )
342 },
343 VectorDiff::PushBack {
344 value: SpaceRoom::new_from_summary(
345 &RoomSummary::new(
346 child_space_id_2.to_owned(),
347 JoinRuleSummary::Public,
348 false,
349 uint!(1),
350 false,
351 ),
352 None,
353 1
354 ),
355 }
356 ]
357 );
358 }
359
360 #[async_test]
361 async fn test_room_state_updates() {
362 let server = MatrixMockServer::new().await;
363 let client = server.client_builder().build().await;
364 let space_service = SpaceService::new(client.clone());
365
366 let parent_space_id = room_id!("!parent_space:example.org");
367 let child_room_id_1 = room_id!("!1:example.org");
368 let child_room_id_2 = room_id!("!2:example.org");
369
370 server
371 .mock_get_hierarchy()
372 .ok_with_room_ids(vec![child_room_id_1, child_room_id_2])
373 .mount()
374 .await;
375
376 let room_list = space_service.space_room_list(parent_space_id.to_owned());
377
378 room_list.paginate().await.unwrap();
379
380 assert_eq!(room_list.rooms().first().unwrap().room_id, child_room_id_1);
382 assert_eq!(room_list.rooms().last().unwrap().room_id, child_room_id_2);
383
384 assert_eq!(room_list.rooms().first().unwrap().state, None);
386 assert_eq!(room_list.rooms().last().unwrap().state, None);
387
388 let (_, rooms_subscriber) = room_list.subscribe_to_room_updates();
389 pin_mut!(rooms_subscriber);
390 assert_pending!(rooms_subscriber);
391
392 server.sync_room(&client, JoinedRoomBuilder::new(child_room_id_1)).await;
394
395 assert_ready!(rooms_subscriber);
397 assert_eq!(room_list.rooms().first().unwrap().state, Some(RoomState::Joined));
398 assert_eq!(room_list.rooms().last().unwrap().state, None);
399
400 server.sync_room(&client, JoinedRoomBuilder::new(child_room_id_2)).await;
402 assert_ready!(rooms_subscriber);
403 assert_eq!(room_list.rooms().first().unwrap().state, Some(RoomState::Joined));
404 assert_eq!(room_list.rooms().last().unwrap().state, Some(RoomState::Joined));
405
406 server.sync_room(&client, LeftRoomBuilder::new(child_room_id_1)).await;
408 server.sync_room(&client, LeftRoomBuilder::new(child_room_id_2)).await;
409 assert_ready!(rooms_subscriber);
410 assert_eq!(room_list.rooms().first().unwrap().state, Some(RoomState::Left));
411 assert_eq!(room_list.rooms().last().unwrap().state, Some(RoomState::Left));
412 }
413}