1use std::sync::Arc;
31
32use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
33use futures_util::pin_mut;
34use imbl::Vector;
35use matrix_sdk::{
36 Client, Error as SDKError, deserialized_responses::SyncOrStrippedState, executor::AbortOnDrop,
37};
38use matrix_sdk_common::executor::spawn;
39use ruma::{
40 OwnedRoomId, RoomId,
41 events::{
42 SyncStateEvent,
43 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
44 },
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::error;
49
50use crate::spaces::{graph::SpaceGraph, leave::LeaveSpaceHandle};
51pub use crate::spaces::{room::SpaceRoom, room_list::SpaceRoomList};
52
53pub mod graph;
54pub mod leave;
55pub mod room;
56pub mod room_list;
57
58#[derive(Debug, Error)]
60pub enum Error {
61 #[error("Room `{0}` not found")]
63 RoomNotFound(OwnedRoomId),
64
65 #[error("Failed to leave space")]
67 LeaveSpace(SDKError),
68
69 #[error("Failed to load members")]
71 LoadRoomMembers(SDKError),
72}
73
74struct SpaceState {
75 graph: SpaceGraph,
76 joined_rooms: ObservableVector<SpaceRoom>,
77}
78
79pub struct SpaceService {
119 client: Client,
120
121 space_state: Arc<AsyncMutex<SpaceState>>,
122
123 room_update_handle: AsyncMutex<Option<AbortOnDrop<()>>>,
124}
125
126impl SpaceService {
127 pub fn new(client: Client) -> Self {
129 Self {
130 client,
131 space_state: Arc::new(AsyncMutex::new(SpaceState {
132 graph: SpaceGraph::new(),
133 joined_rooms: ObservableVector::new(),
134 })),
135 room_update_handle: AsyncMutex::new(None),
136 }
137 }
138
139 pub async fn subscribe_to_joined_spaces(
142 &self,
143 ) -> (Vector<SpaceRoom>, VectorSubscriberBatchedStream<SpaceRoom>) {
144 let mut room_update_handle = self.room_update_handle.lock().await;
145
146 if room_update_handle.is_none() {
147 let client = self.client.clone();
148 let space_state = Arc::clone(&self.space_state);
149 let all_room_updates_receiver = self.client.subscribe_to_all_room_updates();
150
151 *room_update_handle = Some(AbortOnDrop::new(spawn(async move {
152 pin_mut!(all_room_updates_receiver);
153
154 loop {
155 match all_room_updates_receiver.recv().await {
156 Ok(updates) => {
157 if updates.is_empty() {
158 continue;
159 }
160
161 let (spaces, graph) = Self::joined_spaces_for(&client).await;
162 Self::update_joined_spaces_if_needed(
163 Vector::from(spaces),
164 graph,
165 &space_state,
166 )
167 .await;
168 }
169 Err(err) => {
170 error!("error when listening to room updates: {err}");
171 }
172 }
173 }
174 })));
175
176 let (spaces, graph) = Self::joined_spaces_for(&self.client).await;
178 Self::update_joined_spaces_if_needed(Vector::from(spaces), graph, &self.space_state)
179 .await;
180 }
181
182 self.space_state.lock().await.joined_rooms.subscribe().into_values_and_batched_stream()
183 }
184
185 pub async fn joined_spaces(&self) -> Vec<SpaceRoom> {
189 let (spaces, graph) = Self::joined_spaces_for(&self.client).await;
190
191 Self::update_joined_spaces_if_needed(
192 Vector::from(spaces.clone()),
193 graph,
194 &self.space_state,
195 )
196 .await;
197
198 spaces
199 }
200
201 pub async fn space_room_list(&self, space_id: OwnedRoomId) -> SpaceRoomList {
203 SpaceRoomList::new(self.client.clone(), space_id).await
204 }
205
206 pub async fn leave_space(&self, space_id: &RoomId) -> Result<LeaveSpaceHandle, Error> {
214 let space_state = self.space_state.lock().await;
215
216 if !space_state.graph.has_node(space_id) {
217 return Err(Error::RoomNotFound(space_id.to_owned()));
218 }
219
220 let room_ids = space_state.graph.flattened_bottom_up_subtree(space_id);
221
222 let handle = LeaveSpaceHandle::new(self.client.clone(), room_ids).await;
223
224 Ok(handle)
225 }
226
227 async fn update_joined_spaces_if_needed(
228 new_spaces: Vector<SpaceRoom>,
229 new_graph: SpaceGraph,
230 space_state: &Arc<AsyncMutex<SpaceState>>,
231 ) {
232 let mut space_state = space_state.lock().await;
233
234 if new_spaces != space_state.joined_rooms.clone() {
235 space_state.joined_rooms.clear();
236 space_state.joined_rooms.append(new_spaces);
237 }
238
239 space_state.graph = new_graph;
240 }
241
242 async fn joined_spaces_for(client: &Client) -> (Vec<SpaceRoom>, SpaceGraph) {
243 let joined_spaces = client.joined_space_rooms();
244
245 let mut graph = SpaceGraph::new();
247
248 for space in joined_spaces.iter() {
251 graph.add_node(space.room_id().to_owned());
252
253 if let Ok(parents) = space.get_state_events_static::<SpaceParentEventContent>().await {
254 parents.into_iter()
255 .flat_map(|parent_event| match parent_event.deserialize() {
256 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
257 Some(e.state_key)
258 }
259 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
260 Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key),
261 Err(e) => {
262 error!(room_id = ?space.room_id(), "Could not deserialize m.space.parent: {e}");
263 None
264 }
265 }).for_each(|parent| graph.add_edge(parent, space.room_id().to_owned()));
266 } else {
267 error!(room_id = ?space.room_id(), "Could not get m.space.parent events");
268 }
269
270 if let Ok(children) = space.get_state_events_static::<SpaceChildEventContent>().await {
271 children.into_iter()
272 .filter_map(|child_event| match child_event.deserialize() {
273 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
274 Some(e.state_key)
275 }
276 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
277 Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key),
278 Err(e) => {
279 error!(room_id = ?space.room_id(), "Could not deserialize m.space.child: {e}");
280 None
281 }
282 }).for_each(|child| graph.add_edge(space.room_id().to_owned(), child));
283 } else {
284 error!(room_id = ?space.room_id(), "Could not get m.space.child events");
285 }
286 }
287
288 graph.remove_cycles();
291
292 let root_nodes = graph.root_nodes();
293
294 let joined_space_rooms = joined_spaces
295 .iter()
296 .filter_map(|room| {
297 let room_id = room.room_id();
298
299 if root_nodes.contains(&room_id) {
300 Some(SpaceRoom::new_from_known(room, graph.children_of(room_id).len() as u64))
301 } else {
302 None
303 }
304 })
305 .collect();
306
307 (joined_space_rooms, graph)
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use assert_matches2::assert_let;
314 use eyeball_im::VectorDiff;
315 use futures_util::{StreamExt, pin_mut};
316 use matrix_sdk::{room::ParentSpace, test_utils::mocks::MatrixMockServer};
317 use matrix_sdk_test::{
318 JoinedRoomBuilder, LeftRoomBuilder, async_test, event_factory::EventFactory,
319 };
320 use ruma::{RoomVersionId, owned_room_id, room_id};
321 use stream_assert::{assert_next_eq, assert_pending};
322
323 use super::*;
324
325 #[async_test]
326 async fn test_spaces_hierarchy() {
327 let server = MatrixMockServer::new().await;
328 let client = server.client_builder().build().await;
329 let user_id = client.user_id().unwrap();
330 let space_service = SpaceService::new(client.clone());
331 let factory = EventFactory::new();
332
333 server.mock_room_state_encryption().plain().mount().await;
334
335 let parent_space_id = room_id!("!parent_space:example.org");
338 let child_space_id_1 = room_id!("!child_space_1:example.org");
339 let child_space_id_2 = room_id!("!child_space_2:example.org");
340
341 server
342 .sync_room(
343 &client,
344 JoinedRoomBuilder::new(child_space_id_1)
345 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
346 .add_state_event(
347 factory
348 .space_parent(parent_space_id.to_owned(), child_space_id_1.to_owned())
349 .sender(user_id),
350 ),
351 )
352 .await;
353
354 server
355 .sync_room(
356 &client,
357 JoinedRoomBuilder::new(child_space_id_2)
358 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
359 .add_state_event(
360 factory
361 .space_parent(parent_space_id.to_owned(), child_space_id_2.to_owned())
362 .sender(user_id),
363 ),
364 )
365 .await;
366 server
367 .sync_room(
368 &client,
369 JoinedRoomBuilder::new(parent_space_id)
370 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
371 .add_state_event(
372 factory
373 .space_child(parent_space_id.to_owned(), child_space_id_1.to_owned())
374 .sender(user_id),
375 )
376 .add_state_event(
377 factory
378 .space_child(parent_space_id.to_owned(), child_space_id_2.to_owned())
379 .sender(user_id),
380 ),
381 )
382 .await;
383
384 assert_eq!(
386 space_service
387 .joined_spaces()
388 .await
389 .iter()
390 .map(|s| s.room_id.to_owned())
391 .collect::<Vec<_>>(),
392 vec![parent_space_id]
393 );
394
395 assert_eq!(
397 space_service
398 .joined_spaces()
399 .await
400 .iter()
401 .map(|s| s.children_count)
402 .collect::<Vec<_>>(),
403 vec![2]
404 );
405
406 let parent_space = client.get_room(parent_space_id).unwrap();
407 assert!(parent_space.is_space());
408
409 let spaces: Vec<ParentSpace> = client
412 .get_room(child_space_id_1)
413 .unwrap()
414 .parent_spaces()
415 .await
416 .unwrap()
417 .map(Result::unwrap)
418 .collect()
419 .await;
420
421 assert_let!(ParentSpace::Reciprocal(parent) = spaces.first().unwrap());
422 assert_eq!(parent.room_id(), parent_space.room_id());
423
424 let spaces: Vec<ParentSpace> = client
425 .get_room(child_space_id_2)
426 .unwrap()
427 .parent_spaces()
428 .await
429 .unwrap()
430 .map(Result::unwrap)
431 .collect()
432 .await;
433
434 assert_let!(ParentSpace::Reciprocal(parent) = spaces.last().unwrap());
435 assert_eq!(parent.room_id(), parent_space.room_id());
436 }
437
438 #[async_test]
439 async fn test_joined_spaces_updates() {
440 let server = MatrixMockServer::new().await;
441 let client = server.client_builder().build().await;
442 let user_id = client.user_id().unwrap();
443 let factory = EventFactory::new();
444
445 server.mock_room_state_encryption().plain().mount().await;
446
447 let first_space_id = room_id!("!first_space:example.org");
448 let second_space_id = room_id!("!second_space:example.org");
449
450 server
452 .sync_room(
453 &client,
454 JoinedRoomBuilder::new(first_space_id)
455 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type()),
456 )
457 .await;
458
459 let space_service = SpaceService::new(client.clone());
463
464 let (initial_values, joined_spaces_subscriber) =
465 space_service.subscribe_to_joined_spaces().await;
466 pin_mut!(joined_spaces_subscriber);
467 assert_pending!(joined_spaces_subscriber);
468
469 assert_eq!(
470 initial_values,
471 vec![SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0)].into()
472 );
473
474 assert_eq!(
475 space_service.joined_spaces().await,
476 vec![SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0)]
477 );
478
479 assert_pending!(joined_spaces_subscriber);
482
483 server
486 .sync_room(
487 &client,
488 JoinedRoomBuilder::new(second_space_id)
489 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
490 .add_state_event(
491 factory
492 .space_child(
493 second_space_id.to_owned(),
494 owned_room_id!("!child:example.org"),
495 )
496 .sender(user_id),
497 ),
498 )
499 .await;
500
501 assert_eq!(
503 space_service.joined_spaces().await,
504 vec![
505 SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0),
506 SpaceRoom::new_from_known(&client.get_room(second_space_id).unwrap(), 1)
507 ]
508 );
509
510 assert_next_eq!(
511 joined_spaces_subscriber,
512 vec![
513 VectorDiff::Clear,
514 VectorDiff::Append {
515 values: vec![
516 SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0),
517 SpaceRoom::new_from_known(&client.get_room(second_space_id).unwrap(), 1)
518 ]
519 .into()
520 },
521 ]
522 );
523
524 server.sync_room(&client, LeftRoomBuilder::new(second_space_id)).await;
525
526 assert_next_eq!(
528 joined_spaces_subscriber,
529 vec![
530 VectorDiff::Clear,
531 VectorDiff::Append {
532 values: vec![SpaceRoom::new_from_known(
533 &client.get_room(first_space_id).unwrap(),
534 0
535 )]
536 .into()
537 },
538 ]
539 );
540
541 server
543 .sync_room(
544 &client,
545 JoinedRoomBuilder::new(room_id!("!room:example.org"))
546 .add_state_event(factory.create(user_id, RoomVersionId::V1)),
547 )
548 .await;
549
550 assert_pending!(joined_spaces_subscriber);
552 assert_eq!(
553 space_service.joined_spaces().await,
554 vec![SpaceRoom::new_from_known(&client.get_room(first_space_id).unwrap(), 0)]
555 );
556 }
557}