1use std::sync::Arc;
31
32use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
33use futures_util::pin_mut;
34use imbl::Vector;
35use matrix_sdk::{
36 Client, deserialized_responses::SyncOrStrippedState, executor::AbortOnDrop, locks::Mutex,
37};
38use matrix_sdk_common::executor::spawn;
39use ruma::{
40 OwnedRoomId,
41 events::{
42 SyncStateEvent,
43 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
44 },
45};
46use tokio::sync::Mutex as AsyncMutex;
47use tracing::error;
48
49use crate::spaces::graph::SpaceGraph;
50pub use crate::spaces::{room::SpaceRoom, room_list::SpaceRoomList};
51
52pub mod graph;
53pub mod room;
54pub mod room_list;
55
56pub struct SpaceService {
95 client: Client,
96
97 joined_spaces: Arc<Mutex<ObservableVector<SpaceRoom>>>,
98
99 room_update_handle: AsyncMutex<Option<AbortOnDrop<()>>>,
100}
101
102impl SpaceService {
103 pub fn new(client: Client) -> Self {
105 Self {
106 client,
107 joined_spaces: Arc::new(Mutex::new(ObservableVector::new())),
108 room_update_handle: AsyncMutex::new(None),
109 }
110 }
111
112 pub async fn subscribe_to_joined_spaces(
115 &self,
116 ) -> (Vector<SpaceRoom>, VectorSubscriberBatchedStream<SpaceRoom>) {
117 let mut room_update_handle = self.room_update_handle.lock().await;
118
119 if room_update_handle.is_none() {
120 let client = self.client.clone();
121 let joined_spaces = Arc::clone(&self.joined_spaces);
122 let all_room_updates_receiver = self.client.subscribe_to_all_room_updates();
123
124 *room_update_handle = Some(AbortOnDrop::new(spawn(async move {
125 pin_mut!(all_room_updates_receiver);
126
127 loop {
128 match all_room_updates_receiver.recv().await {
129 Ok(updates) => {
130 if updates.is_empty() {
131 continue;
132 }
133
134 let new_spaces = Vector::from(Self::joined_spaces_for(&client).await);
135 Self::update_joined_spaces_if_needed(new_spaces, &joined_spaces);
136 }
137 Err(err) => {
138 error!("error when listening to room updates: {err}");
139 }
140 }
141 }
142 })));
143
144 let spaces = Self::joined_spaces_for(&self.client).await;
146 Self::update_joined_spaces_if_needed(Vector::from(spaces), &self.joined_spaces);
147 }
148
149 self.joined_spaces.lock().subscribe().into_values_and_batched_stream()
150 }
151
152 pub async fn joined_spaces(&self) -> Vec<SpaceRoom> {
156 let spaces = Self::joined_spaces_for(&self.client).await;
157
158 Self::update_joined_spaces_if_needed(Vector::from(spaces.clone()), &self.joined_spaces);
159
160 spaces
161 }
162
163 pub fn space_room_list(&self, space_id: OwnedRoomId) -> SpaceRoomList {
165 SpaceRoomList::new(self.client.clone(), space_id)
166 }
167
168 fn update_joined_spaces_if_needed(
169 new_spaces: Vector<SpaceRoom>,
170 joined_spaces: &Arc<Mutex<ObservableVector<SpaceRoom>>>,
171 ) {
172 let old_spaces = joined_spaces.lock().clone();
173
174 if new_spaces != old_spaces {
175 joined_spaces.lock().clear();
176 joined_spaces.lock().append(new_spaces);
177 }
178 }
179
180 async fn joined_spaces_for(client: &Client) -> Vec<SpaceRoom> {
181 let joined_spaces = client.joined_space_rooms();
182
183 let mut graph = SpaceGraph::new();
185
186 for space in joined_spaces.iter() {
189 graph.add_node(space.room_id().to_owned());
190
191 if let Ok(parents) = space.get_state_events_static::<SpaceParentEventContent>().await {
192 parents.into_iter()
193 .flat_map(|parent_event| match parent_event.deserialize() {
194 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
195 Some(e.state_key)
196 }
197 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
198 Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key),
199 Err(e) => {
200 error!(room_id = ?space.room_id(), "Could not deserialize m.space.parent: {e}");
201 None
202 }
203 }).for_each(|parent| graph.add_edge(parent, space.room_id().to_owned()));
204 } else {
205 error!(room_id = ?space.room_id(), "Could not get m.space.parent events");
206 }
207
208 if let Ok(children) = space.get_state_events_static::<SpaceChildEventContent>().await {
209 children.into_iter()
210 .filter_map(|child_event| match child_event.deserialize() {
211 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
212 Some(e.state_key)
213 }
214 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
215 Ok(SyncOrStrippedState::Stripped(e)) => Some(e.state_key),
216 Err(e) => {
217 error!(room_id = ?space.room_id(), "Could not deserialize m.space.child: {e}");
218 None
219 }
220 }).for_each(|child| graph.add_edge(space.room_id().to_owned(), child));
221 } else {
222 error!(room_id = ?space.room_id(), "Could not get m.space.child events");
223 }
224 }
225
226 graph.remove_cycles();
229
230 let root_nodes = graph.root_nodes();
231
232 joined_spaces
233 .iter()
234 .filter_map(|room| {
235 let room_id = room.room_id();
236
237 if root_nodes.contains(&room_id) {
238 Some(SpaceRoom::new_from_known(
239 room.clone(),
240 graph.children_of(room_id).len() as u64,
241 ))
242 } else {
243 None
244 }
245 })
246 .collect()
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use assert_matches2::assert_let;
253 use eyeball_im::VectorDiff;
254 use futures_util::{StreamExt, pin_mut};
255 use matrix_sdk::{room::ParentSpace, test_utils::mocks::MatrixMockServer};
256 use matrix_sdk_test::{
257 JoinedRoomBuilder, LeftRoomBuilder, async_test, event_factory::EventFactory,
258 };
259 use ruma::{RoomVersionId, owned_room_id, room_id};
260 use stream_assert::{assert_next_eq, assert_pending};
261
262 use super::*;
263
264 #[async_test]
265 async fn test_spaces_hierarchy() {
266 let server = MatrixMockServer::new().await;
267 let client = server.client_builder().build().await;
268 let user_id = client.user_id().unwrap();
269 let space_service = SpaceService::new(client.clone());
270 let factory = EventFactory::new();
271
272 server.mock_room_state_encryption().plain().mount().await;
273
274 let parent_space_id = room_id!("!parent_space:example.org");
277 let child_space_id_1 = room_id!("!child_space_1:example.org");
278 let child_space_id_2 = room_id!("!child_space_2:example.org");
279
280 server
281 .sync_room(
282 &client,
283 JoinedRoomBuilder::new(child_space_id_1)
284 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
285 .add_state_event(
286 factory
287 .space_parent(parent_space_id.to_owned(), child_space_id_1.to_owned())
288 .sender(user_id),
289 ),
290 )
291 .await;
292
293 server
294 .sync_room(
295 &client,
296 JoinedRoomBuilder::new(child_space_id_2)
297 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
298 .add_state_event(
299 factory
300 .space_parent(parent_space_id.to_owned(), child_space_id_2.to_owned())
301 .sender(user_id),
302 ),
303 )
304 .await;
305 server
306 .sync_room(
307 &client,
308 JoinedRoomBuilder::new(parent_space_id)
309 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
310 .add_state_event(
311 factory
312 .space_child(parent_space_id.to_owned(), child_space_id_1.to_owned())
313 .sender(user_id),
314 )
315 .add_state_event(
316 factory
317 .space_child(parent_space_id.to_owned(), child_space_id_2.to_owned())
318 .sender(user_id),
319 ),
320 )
321 .await;
322
323 assert_eq!(
325 space_service
326 .joined_spaces()
327 .await
328 .iter()
329 .map(|s| s.room_id.to_owned())
330 .collect::<Vec<_>>(),
331 vec![parent_space_id]
332 );
333
334 assert_eq!(
336 space_service
337 .joined_spaces()
338 .await
339 .iter()
340 .map(|s| s.children_count)
341 .collect::<Vec<_>>(),
342 vec![2]
343 );
344
345 let parent_space = client.get_room(parent_space_id).unwrap();
346 assert!(parent_space.is_space());
347
348 let spaces: Vec<ParentSpace> = client
351 .get_room(child_space_id_1)
352 .unwrap()
353 .parent_spaces()
354 .await
355 .unwrap()
356 .map(Result::unwrap)
357 .collect()
358 .await;
359
360 assert_let!(ParentSpace::Reciprocal(parent) = spaces.first().unwrap());
361 assert_eq!(parent.room_id(), parent_space.room_id());
362
363 let spaces: Vec<ParentSpace> = client
364 .get_room(child_space_id_2)
365 .unwrap()
366 .parent_spaces()
367 .await
368 .unwrap()
369 .map(Result::unwrap)
370 .collect()
371 .await;
372
373 assert_let!(ParentSpace::Reciprocal(parent) = spaces.last().unwrap());
374 assert_eq!(parent.room_id(), parent_space.room_id());
375 }
376
377 #[async_test]
378 async fn test_joined_spaces_updates() {
379 let server = MatrixMockServer::new().await;
380 let client = server.client_builder().build().await;
381 let user_id = client.user_id().unwrap();
382 let factory = EventFactory::new();
383
384 server.mock_room_state_encryption().plain().mount().await;
385
386 let first_space_id = room_id!("!first_space:example.org");
387 let second_space_id = room_id!("!second_space:example.org");
388
389 server
391 .sync_room(
392 &client,
393 JoinedRoomBuilder::new(first_space_id)
394 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type()),
395 )
396 .await;
397
398 let space_service = SpaceService::new(client.clone());
402
403 let (initial_values, joined_spaces_subscriber) =
404 space_service.subscribe_to_joined_spaces().await;
405 pin_mut!(joined_spaces_subscriber);
406 assert_pending!(joined_spaces_subscriber);
407
408 assert_eq!(
409 initial_values,
410 vec![SpaceRoom::new_from_known(client.get_room(first_space_id).unwrap(), 0)].into()
411 );
412
413 assert_eq!(
414 space_service.joined_spaces().await,
415 vec![SpaceRoom::new_from_known(client.get_room(first_space_id).unwrap(), 0)]
416 );
417
418 assert_pending!(joined_spaces_subscriber);
421
422 server
425 .sync_room(
426 &client,
427 JoinedRoomBuilder::new(second_space_id)
428 .add_state_event(factory.create(user_id, RoomVersionId::V1).with_space_type())
429 .add_state_event(
430 factory
431 .space_child(
432 second_space_id.to_owned(),
433 owned_room_id!("!child:example.org"),
434 )
435 .sender(user_id),
436 ),
437 )
438 .await;
439
440 assert_eq!(
442 space_service.joined_spaces().await,
443 vec![
444 SpaceRoom::new_from_known(client.get_room(first_space_id).unwrap(), 0),
445 SpaceRoom::new_from_known(client.get_room(second_space_id).unwrap(), 1)
446 ]
447 );
448
449 assert_next_eq!(
450 joined_spaces_subscriber,
451 vec![
452 VectorDiff::Clear,
453 VectorDiff::Append {
454 values: vec![
455 SpaceRoom::new_from_known(client.get_room(first_space_id).unwrap(), 0),
456 SpaceRoom::new_from_known(client.get_room(second_space_id).unwrap(), 1)
457 ]
458 .into()
459 },
460 ]
461 );
462
463 server.sync_room(&client, LeftRoomBuilder::new(second_space_id)).await;
464
465 assert_next_eq!(
467 joined_spaces_subscriber,
468 vec![
469 VectorDiff::Clear,
470 VectorDiff::Append {
471 values: vec![SpaceRoom::new_from_known(
472 client.get_room(first_space_id).unwrap(),
473 0
474 )]
475 .into()
476 },
477 ]
478 );
479
480 server
482 .sync_room(
483 &client,
484 JoinedRoomBuilder::new(room_id!("!room:example.org"))
485 .add_state_event(factory.create(user_id, RoomVersionId::V1)),
486 )
487 .await;
488
489 assert_pending!(joined_spaces_subscriber);
491 assert_eq!(
492 space_service.joined_spaces().await,
493 vec![SpaceRoom::new_from_known(client.get_room(first_space_id).unwrap(), 0)]
494 );
495 }
496}