1use std::{cmp::Ordering, collections::HashMap, sync::Arc};
31
32use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
33use futures_util::pin_mut;
34use imbl::Vector;
35use itertools::Itertools;
36use matrix_sdk::{
37 Client, Error as SDKError, deserialized_responses::SyncOrStrippedState, executor::AbortOnDrop,
38};
39use matrix_sdk_common::executor::spawn;
40use ruma::{
41 OwnedRoomId, RoomId,
42 events::{
43 self, SyncStateEvent,
44 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
45 },
46};
47use thiserror::Error;
48use tokio::sync::Mutex as AsyncMutex;
49use tracing::error;
50
51use crate::spaces::{graph::SpaceGraph, leave::LeaveSpaceHandle};
52pub use crate::spaces::{room::SpaceRoom, room_list::SpaceRoomList};
53
54pub mod graph;
55pub mod leave;
56pub mod room;
57pub mod room_list;
58
59#[derive(Debug, Error)]
61pub enum Error {
62 #[error("Room `{0}` not found")]
64 RoomNotFound(OwnedRoomId),
65
66 #[error("Failed to leave space")]
68 LeaveSpace(SDKError),
69
70 #[error("Failed to load members")]
72 LoadRoomMembers(SDKError),
73}
74
75struct SpaceState {
76 graph: SpaceGraph,
77 joined_rooms: ObservableVector<SpaceRoom>,
78}
79
80pub struct SpaceService {
120 client: Client,
121
122 space_state: Arc<AsyncMutex<SpaceState>>,
123
124 room_update_handle: AsyncMutex<Option<AbortOnDrop<()>>>,
125}
126
127impl SpaceService {
128 pub fn new(client: Client) -> Self {
130 Self {
131 client,
132 space_state: Arc::new(AsyncMutex::new(SpaceState {
133 graph: SpaceGraph::new(),
134 joined_rooms: ObservableVector::new(),
135 })),
136 room_update_handle: AsyncMutex::new(None),
137 }
138 }
139
140 pub async fn subscribe_to_joined_spaces(
143 &self,
144 ) -> (Vector<SpaceRoom>, VectorSubscriberBatchedStream<SpaceRoom>) {
145 let mut room_update_handle = self.room_update_handle.lock().await;
146
147 if room_update_handle.is_none() {
148 let client = self.client.clone();
149 let space_state = Arc::clone(&self.space_state);
150 let all_room_updates_receiver = self.client.subscribe_to_all_room_updates();
151
152 *room_update_handle = Some(AbortOnDrop::new(spawn(async move {
153 pin_mut!(all_room_updates_receiver);
154
155 loop {
156 match all_room_updates_receiver.recv().await {
157 Ok(updates) => {
158 if updates.is_empty() {
159 continue;
160 }
161
162 let (spaces, graph) = Self::joined_spaces_for(&client).await;
163 Self::update_joined_spaces_if_needed(
164 Vector::from(spaces),
165 graph,
166 &space_state,
167 )
168 .await;
169 }
170 Err(err) => {
171 error!("error when listening to room updates: {err}");
172 }
173 }
174 }
175 })));
176
177 let (spaces, graph) = Self::joined_spaces_for(&self.client).await;
179 Self::update_joined_spaces_if_needed(Vector::from(spaces), graph, &self.space_state)
180 .await;
181 }
182
183 self.space_state.lock().await.joined_rooms.subscribe().into_values_and_batched_stream()
184 }
185
186 pub async fn joined_spaces(&self) -> Vec<SpaceRoom> {
190 let (spaces, graph) = Self::joined_spaces_for(&self.client).await;
191
192 Self::update_joined_spaces_if_needed(
193 Vector::from(spaces.clone()),
194 graph,
195 &self.space_state,
196 )
197 .await;
198
199 spaces
200 }
201
202 pub async fn space_room_list(&self, space_id: OwnedRoomId) -> SpaceRoomList {
204 SpaceRoomList::new(self.client.clone(), space_id).await
205 }
206
207 pub async fn leave_space(&self, space_id: &RoomId) -> Result<LeaveSpaceHandle, Error> {
215 let space_state = self.space_state.lock().await;
216
217 if !space_state.graph.has_node(space_id) {
218 return Err(Error::RoomNotFound(space_id.to_owned()));
219 }
220
221 let room_ids = space_state.graph.flattened_bottom_up_subtree(space_id);
222
223 let handle = LeaveSpaceHandle::new(self.client.clone(), room_ids).await;
224
225 Ok(handle)
226 }
227
228 async fn update_joined_spaces_if_needed(
229 new_spaces: Vector<SpaceRoom>,
230 new_graph: SpaceGraph,
231 space_state: &Arc<AsyncMutex<SpaceState>>,
232 ) {
233 let mut space_state = space_state.lock().await;
234
235 if new_spaces != space_state.joined_rooms.clone() {
236 space_state.joined_rooms.clear();
237 space_state.joined_rooms.append(new_spaces);
238 }
239
240 space_state.graph = new_graph;
241 }
242
243 async fn joined_spaces_for(client: &Client) -> (Vec<SpaceRoom>, SpaceGraph) {
244 let joined_spaces = client.joined_space_rooms();
245
246 let mut graph = SpaceGraph::new();
248
249 for space in joined_spaces.iter() {
252 graph.add_node(space.room_id().to_owned());
253
254 if let Ok(parents) = space.get_state_events_static::<SpaceParentEventContent>().await {
255 parents.into_iter()
256 .flat_map(|parent_event| match parent_event.deserialize() {
257 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
258 Some(e.state_key)
259 }
260 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
261 Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key),
262 Err(e) => {
263 error!(room_id = ?space.room_id(), "Could not deserialize m.space.parent: {e}");
264 None
265 }
266 }).for_each(|parent| graph.add_edge(parent, space.room_id().to_owned()));
267 } else {
268 error!(room_id = ?space.room_id(), "Could not get m.space.parent events");
269 }
270
271 if let Ok(children) = space.get_state_events_static::<SpaceChildEventContent>().await {
272 children.into_iter()
273 .filter_map(|child_event| match child_event.deserialize() {
274 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
275 Some(e.state_key)
276 }
277 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
278 Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key),
279 Err(e) => {
280 error!(room_id = ?space.room_id(), "Could not deserialize m.space.child: {e}");
281 None
282 }
283 }).for_each(|child| graph.add_edge(space.room_id().to_owned(), child));
284 } else {
285 error!(room_id = ?space.room_id(), "Could not get m.space.child events");
286 }
287 }
288
289 graph.remove_cycles();
292
293 let root_nodes = graph.root_nodes();
294
295 let top_level_spaces = joined_spaces
299 .iter()
300 .filter(|room| root_nodes.contains(&room.room_id()))
301 .collect::<Vec<_>>();
302
303 let mut top_level_space_order = HashMap::new();
304 for space in &top_level_spaces {
305 if let Ok(Some(raw_event)) =
306 space.account_data_static::<events::space_order::SpaceOrderEventContent>().await
307 && let Ok(event) = raw_event.deserialize()
308 {
309 top_level_space_order.insert(space.room_id().to_owned(), event.content.order);
310 }
311 }
312
313 let top_level_spaces = top_level_spaces
314 .iter()
315 .sorted_by(|a, b| {
316 match (
318 top_level_space_order.get(a.room_id()),
319 top_level_space_order.get(b.room_id()),
320 ) {
321 (Some(a_order), Some(b_order)) => {
322 a_order.cmp(b_order).then(a.room_id().cmp(b.room_id()))
323 }
324 (Some(_), None) => Ordering::Less,
325 (None, Some(_)) => Ordering::Greater,
326 (None, None) => a.room_id().cmp(b.room_id()),
327 }
328 })
329 .map(|room| {
330 SpaceRoom::new_from_known(room, graph.children_of(room.room_id()).len() as u64)
331 })
332 .collect();
333
334 (top_level_spaces, graph)
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use assert_matches2::assert_let;
341 use eyeball_im::VectorDiff;
342 use futures_util::{StreamExt, pin_mut};
343 use matrix_sdk::{room::ParentSpace, test_utils::mocks::MatrixMockServer};
344 use matrix_sdk_test::{
345 JoinedRoomBuilder, LeftRoomBuilder, RoomAccountDataTestEvent, async_test,
346 event_factory::EventFactory,
347 };
348 use ruma::{RoomVersionId, UserId, owned_room_id, room_id};
349 use serde_json::json;
350 use stream_assert::{assert_next_eq, assert_pending};
351
352 use super::*;
353
354 #[async_test]
355 async fn test_spaces_hierarchy() {
356 let server = MatrixMockServer::new().await;
357 let client = server.client_builder().build().await;
358 let user_id = client.user_id().unwrap();
359 let space_service = SpaceService::new(client.clone());
360 let factory = EventFactory::new();
361
362 server.mock_room_state_encryption().plain().mount().await;
363
364 let parent_space_id = room_id!("!parent_space:example.org");
367 let child_space_id_1 = room_id!("!child_space_1:example.org");
368 let child_space_id_2 = room_id!("!child_space_2:example.org");
369
370 server
371 .sync_room(
372 &client,
373 JoinedRoomBuilder::new(child_space_id_1)
374 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
375 .add_state_event(
376 factory
377 .space_parent(parent_space_id.to_owned(), child_space_id_1.to_owned())
378 .sender(user_id),
379 ),
380 )
381 .await;
382
383 server
384 .sync_room(
385 &client,
386 JoinedRoomBuilder::new(child_space_id_2)
387 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
388 .add_state_event(
389 factory
390 .space_parent(parent_space_id.to_owned(), child_space_id_2.to_owned())
391 .sender(user_id),
392 ),
393 )
394 .await;
395 server
396 .sync_room(
397 &client,
398 JoinedRoomBuilder::new(parent_space_id)
399 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
400 .add_state_event(
401 factory
402 .space_child(parent_space_id.to_owned(), child_space_id_1.to_owned())
403 .sender(user_id),
404 )
405 .add_state_event(
406 factory
407 .space_child(parent_space_id.to_owned(), child_space_id_2.to_owned())
408 .sender(user_id),
409 ),
410 )
411 .await;
412
413 assert_eq!(
415 space_service
416 .joined_spaces()
417 .await
418 .iter()
419 .map(|s| s.room_id.to_owned())
420 .collect::<Vec<_>>(),
421 vec![parent_space_id]
422 );
423
424 assert_eq!(
426 space_service
427 .joined_spaces()
428 .await
429 .iter()
430 .map(|s| s.children_count)
431 .collect::<Vec<_>>(),
432 vec![2]
433 );
434
435 let parent_space = client.get_room(parent_space_id).unwrap();
436 assert!(parent_space.is_space());
437
438 let spaces: Vec<ParentSpace> = client
441 .get_room(child_space_id_1)
442 .unwrap()
443 .parent_spaces()
444 .await
445 .unwrap()
446 .map(Result::unwrap)
447 .collect()
448 .await;
449
450 assert_let!(ParentSpace::Reciprocal(parent) = spaces.first().unwrap());
451 assert_eq!(parent.room_id(), parent_space.room_id());
452
453 let spaces: Vec<ParentSpace> = client
454 .get_room(child_space_id_2)
455 .unwrap()
456 .parent_spaces()
457 .await
458 .unwrap()
459 .map(Result::unwrap)
460 .collect()
461 .await;
462
463 assert_let!(ParentSpace::Reciprocal(parent) = spaces.last().unwrap());
464 assert_eq!(parent.room_id(), parent_space.room_id());
465 }
466
467 #[async_test]
468 async fn test_joined_spaces_updates() {
469 let server = MatrixMockServer::new().await;
470 let client = server.client_builder().build().await;
471 let user_id = client.user_id().unwrap();
472 let factory = EventFactory::new();
473
474 server.mock_room_state_encryption().plain().mount().await;
475
476 let first_space_id = room_id!("!first_space:example.org");
477 let second_space_id = room_id!("!second_space:example.org");
478
479 server
481 .sync_room(
482 &client,
483 JoinedRoomBuilder::new(first_space_id)
484 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type()),
485 )
486 .await;
487
488 let space_service = SpaceService::new(client.clone());
492
493 let (initial_values, joined_spaces_subscriber) =
494 space_service.subscribe_to_joined_spaces().await;
495 pin_mut!(joined_spaces_subscriber);
496 assert_pending!(joined_spaces_subscriber);
497
498 assert_eq!(
499 initial_values,
500 vec![SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0)].into()
501 );
502
503 assert_eq!(
504 space_service.joined_spaces().await,
505 vec![SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0)]
506 );
507
508 assert_pending!(joined_spaces_subscriber);
511
512 server
515 .sync_room(
516 &client,
517 JoinedRoomBuilder::new(second_space_id)
518 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
519 .add_state_event(
520 factory
521 .space_child(
522 second_space_id.to_owned(),
523 owned_room_id!("!child:example.org"),
524 )
525 .sender(user_id),
526 ),
527 )
528 .await;
529
530 assert_eq!(
532 space_service.joined_spaces().await,
533 vec![
534 SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0),
535 SpaceRoom::new_from_known(&client.get_room(second_space_id).unwrap(), 1)
536 ]
537 );
538
539 assert_next_eq!(
540 joined_spaces_subscriber,
541 vec![
542 VectorDiff::Clear,
543 VectorDiff::Append {
544 values: vec![
545 SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0),
546 SpaceRoom::new_from_known(&client.get_room(second_space_id).unwrap(), 1)
547 ]
548 .into()
549 },
550 ]
551 );
552
553 server.sync_room(&client, LeftRoomBuilder::new(second_space_id)).await;
554
555 assert_next_eq!(
557 joined_spaces_subscriber,
558 vec![
559 VectorDiff::Clear,
560 VectorDiff::Append {
561 values: vec![SpaceRoom::new_from_known(
562 &client.get_room(first_space_id).unwrap(),
563 0
564 )]
565 .into()
566 },
567 ]
568 );
569
570 server
572 .sync_room(
573 &client,
574 JoinedRoomBuilder::new(room_id!("!room:example.org"))
575 .add_state_event(factory.create(user_id, RoomVersionId::V1)),
576 )
577 .await;
578
579 assert_pending!(joined_spaces_subscriber);
581 assert_eq!(
582 space_service.joined_spaces().await,
583 vec![SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0)]
584 );
585 }
586
587 #[async_test]
588 async fn test_top_level_space_order() {
589 let server = MatrixMockServer::new().await;
590 let client = server.client_builder().build().await;
591
592 server.mock_room_state_encryption().plain().mount().await;
593
594 add_space_rooms_with(
595 vec![
596 (room_id!("!2:a.b"), Some("2")),
597 (room_id!("!4:a.b"), None),
598 (room_id!("!3:a.b"), None),
599 (room_id!("!1:a.b"), Some("1")),
600 ],
601 &client,
602 &server,
603 &EventFactory::new(),
604 client.user_id().unwrap(),
605 )
606 .await;
607
608 let space_service = SpaceService::new(client.clone());
609
610 assert_eq!(
613 space_service.joined_spaces().await,
614 vec![
615 SpaceRoom::new_from_known(&client.get_room(room_id!("!1:a.b")).unwrap(), 0),
616 SpaceRoom::new_from_known(&client.get_room(room_id!("!2:a.b")).unwrap(), 0),
617 SpaceRoom::new_from_known(&client.get_room(room_id!("!3:a.b")).unwrap(), 0),
618 SpaceRoom::new_from_known(&client.get_room(room_id!("!4:a.b")).unwrap(), 0),
619 ]
620 );
621 }
622
623 async fn add_space_rooms_with(
624 rooms: Vec<(&RoomId, Option<&str>)>,
625 client: &Client,
626 server: &MatrixMockServer,
627 factory: &EventFactory,
628 user_id: &UserId,
629 ) {
630 for (room_id, order) in rooms {
631 let mut builder = JoinedRoomBuilder::new(room_id)
632 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type());
633
634 if let Some(order) = order {
635 builder = builder.add_account_data(RoomAccountDataTestEvent::Custom(json!({
636 "type": "m.space_order",
637 "content": {
638 "order": order
639 }
640 })));
641 }
642
643 server.sync_room(client, builder).await;
644 }
645 }
646}