1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5 RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9 OwnedRoomId,
10 api::{
11 FeatureFlag, SupportedVersions,
12 client::sync::sync_events::v5::{self as http, response},
13 },
14 events::GlobalAccountDataEventType,
15};
16use tokio::sync::MutexGuard;
17use tracing::error;
18
19use super::{SlidingSync, SlidingSyncBuilder};
20use crate::{Client, Result, sync::subscribe_to_room_latest_events};
21
22#[derive(Clone, Debug)]
24pub enum Version {
25 None,
28
29 Native,
32}
33
34impl Version {
35 #[cfg(test)]
36 pub(crate) fn is_native(&self) -> bool {
37 matches!(self, Self::Native)
38 }
39}
40
41#[derive(thiserror::Error, Debug)]
43pub enum VersionBuilderError {
44 #[error("`.well-known` is not set")]
46 WellKnownNotSet,
47
48 #[error("The `/versions` response is not set")]
50 MissingVersionsResponse,
51
52 #[error(
55 "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
56 or it's not set to true."
57 )]
58 NativeVersionIsUnset,
59}
60
61#[derive(Clone, Debug)]
63pub enum VersionBuilder {
64 None,
66
67 Native,
69
70 DiscoverNative,
74}
75
76impl VersionBuilder {
77 pub(crate) fn needs_get_supported_versions(&self) -> bool {
78 matches!(self, Self::DiscoverNative)
79 }
80
81 pub fn build(
86 self,
87 supported: Option<&SupportedVersions>,
88 ) -> Result<Version, VersionBuilderError> {
89 Ok(match self {
90 Self::None => Version::None,
91
92 Self::Native => Version::Native,
93
94 Self::DiscoverNative => {
95 let Some(supported) = supported else {
96 return Err(VersionBuilderError::MissingVersionsResponse);
97 };
98
99 if supported.features.contains(&FeatureFlag::Msc4186) {
100 Version::Native
101 } else {
102 return Err(VersionBuilderError::NativeVersionIsUnset);
103 }
104 }
105 })
106 }
107}
108
109impl Client {
110 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
118 let supported_versions = self.supported_versions().await.ok();
119
120 [VersionBuilder::DiscoverNative]
121 .into_iter()
122 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
123 .collect()
124 }
125
126 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
131 Ok(SlidingSync::builder(id.into(), self.clone())?)
132 }
133
134 #[cfg(any(test, feature = "testing"))]
140 #[tracing::instrument(skip(self, response))]
141 pub async fn process_sliding_sync_test_helper(
142 &self,
143 response: &http::Response,
144 requested_required_states: &RequestedRequiredStates,
145 ) -> Result<SyncResponse> {
146 let response = self
147 .base_client()
148 .process_sliding_sync(
149 response,
150 requested_required_states,
151 &self.base_client().state_store_lock().lock().await,
152 )
153 .await?;
154
155 tracing::debug!("done processing on base_client");
156 self.call_sync_response_handlers(&response).await?;
157
158 Ok(response)
159 }
160}
161
162#[must_use]
168pub(crate) struct SlidingSyncResponseProcessor {
169 client: Client,
170 to_device_events: Vec<ProcessedToDeviceEvent>,
171 response: Option<SyncResponse>,
172}
173
174impl SlidingSyncResponseProcessor {
175 pub fn new(client: Client) -> Self {
176 Self { client, to_device_events: Vec::new(), response: None }
177 }
178
179 #[cfg(feature = "e2e-encryption")]
180 pub async fn handle_encryption(
181 &mut self,
182 extensions: &response::Extensions,
183 state_store_guard: &MutexGuard<'_, ()>,
184 ) -> Result<()> {
185 assert!(self.response.is_none());
188
189 self.to_device_events = if let Some(to_device_events) = self
190 .client
191 .base_client()
192 .process_sliding_sync_e2ee(
193 extensions.to_device.as_ref(),
194 &extensions.e2ee,
195 state_store_guard,
196 )
197 .await?
198 {
199 self.client.encryption().backups().maybe_trigger_backup();
201
202 to_device_events
203 } else {
204 Vec::new()
205 };
206
207 Ok(())
208 }
209
210 pub async fn handle_room_response(
211 &mut self,
212 response: &http::Response,
213 requested_required_states: &RequestedRequiredStates,
214 state_store_guard: &MutexGuard<'_, ()>,
215 ) -> Result<()> {
216 subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
217
218 let previously_joined_rooms = self
219 .client
220 .joined_rooms()
221 .into_iter()
222 .map(|r| r.room_id().to_owned())
223 .collect::<BTreeSet<_>>();
224
225 let mut sync_response = self
226 .client
227 .base_client()
228 .process_sliding_sync(response, requested_required_states, state_store_guard)
229 .await?;
230
231 handle_receipts_extension(&self.client, response, &mut sync_response, state_store_guard)
232 .await?;
233
234 update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
235
236 self.response = Some(sync_response);
237
238 Ok(())
239 }
240
241 pub async fn handle_thread_subscriptions(
242 &mut self,
243 previous_pos: Option<&str>,
244 thread_subs: response::ThreadSubscriptions,
245 ) -> Result<()> {
246 let catchup_token =
247 thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
248 from: prev_batch,
249 to: previous_pos.map(|s| s.to_owned()),
250 });
251
252 self.client
253 .thread_subscription_catchup()
254 .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
255 .await?;
256
257 Ok(())
258 }
259
260 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
261 let mut response = self.response.take().unwrap_or_default();
262
263 response.to_device.extend(self.to_device_events);
264
265 self.client.call_sync_response_handlers(&response).await?;
266
267 Ok(response)
268 }
269}
270
271async fn update_in_memory_caches(
275 client: &Client,
276 previously_joined_rooms: &BTreeSet<OwnedRoomId>,
277 response: &SyncResponse,
278) {
279 let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
280
281 if response.account_data.iter().any(|event| {
284 event
285 .get_field::<GlobalAccountDataEventType>("type")
286 .ok()
287 .flatten()
288 .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
289 }) {
290 let notification_settings = client.notification_settings().await;
291 let rules = notification_settings.rules().await;
292
293 for room in client.joined_rooms() {
295 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
296 room.update_cached_user_defined_notification_mode(mode);
297 } else {
298 room.clear_user_defined_notification_mode();
299 }
300 }
301 } else {
302 let mut rules = None;
307
308 for room_id in response
309 .rooms
310 .joined
311 .keys()
312 .filter(|room_id| !previously_joined_rooms.contains(*room_id))
313 {
314 let Some(room) = client.get_room(room_id) else {
315 error!(?room_id, "The room must exist since it has been joined");
316 continue;
317 };
318
319 let rules = if let Some(rules) = &mut rules {
321 rules
322 } else {
323 rules.insert(client.notification_settings().await.rules().await.clone())
324 };
325
326 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
328 room.update_cached_user_defined_notification_mode(mode);
329 }
330 }
331 }
332}
333
334async fn handle_receipts_extension(
336 client: &Client,
337 response: &http::Response,
338 sync_response: &mut SyncResponse,
339 state_store_guard: &MutexGuard<'_, ()>,
340) -> Result<()> {
341 let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
342
343 let room_ids = BTreeSet::from_iter(
346 sync_response
347 .rooms
348 .joined
349 .keys()
350 .cloned()
351 .chain(response.extensions.receipts.rooms.keys().cloned()),
352 );
353
354 let futures = room_ids.into_iter().map(|room_id| async {
356 let receipt_event = client
357 .base_client()
358 .process_sliding_sync_receipts_extension_for_room(&room_id, response, state_store_guard)
359 .await?;
360
361 Result::<_, crate::Error>::Ok(Some((room_id, receipt_event)))
362 });
363
364 let updates = try_join_all(futures).await?;
365
366 for (room_id, receipt_event_content) in updates.into_iter().flatten() {
367 if let Some(event) = receipt_event_content {
368 sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
369 }
370 }
371
372 Ok(())
373}
374
375#[cfg(all(test, not(target_family = "wasm")))]
376mod tests {
377 use std::{collections::BTreeMap, ops::Not};
378
379 use assert_matches::assert_matches;
380 use matrix_sdk_base::{
381 RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
382 notification_settings::RoomNotificationMode,
383 };
384 use matrix_sdk_test::{async_test, event_factory::EventFactory};
385 use ruma::{
386 api::client::discovery::get_supported_versions, assign, event_id, room_id, serde::Raw,
387 user_id,
388 };
389 use serde_json::json;
390 use tokio::task::yield_now;
391
392 use super::{Version, VersionBuilder};
393 use crate::{
394 SlidingSyncList, SlidingSyncMode,
395 error::Result,
396 sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
397 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
398 };
399
400 #[test]
401 fn test_version_builder_none() {
402 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
403 }
404
405 #[test]
406 fn test_version_builder_native() {
407 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
408 }
409
410 #[test]
411 fn test_version_builder_discover_native() {
412 let mut response = get_supported_versions::Response::new(vec![]);
413 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
414
415 assert_matches!(
416 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
417 Ok(Version::Native)
418 );
419 }
420
421 #[test]
422 fn test_version_builder_discover_native_no_supported_versions() {
423 assert_matches!(
424 VersionBuilder::DiscoverNative.build(None),
425 Err(VersionBuilderError::MissingVersionsResponse)
426 );
427 }
428
429 #[test]
430 fn test_version_builder_discover_native_unstable_features_is_disabled() {
431 let mut response = get_supported_versions::Response::new(vec![]);
432 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
433
434 assert_matches!(
435 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
436 Err(VersionBuilderError::NativeVersionIsUnset)
437 );
438 }
439
440 #[async_test]
441 async fn test_available_sliding_sync_versions_none() {
442 let client = MockClientBuilder::new(None).build().await;
443 let available_versions = client.available_sliding_sync_versions().await;
444
445 assert!(available_versions.is_empty());
448 }
449
450 #[async_test]
451 async fn test_available_sliding_sync_versions_native() {
452 let server = MatrixMockServer::new().await;
453 let client = server.client_builder().no_server_versions().build().await;
454
455 server.mock_versions().with_simplified_sliding_sync().ok().mock_once().mount().await;
456
457 let available_versions = client.available_sliding_sync_versions().await;
458
459 assert_eq!(available_versions.len(), 1);
461 assert_matches!(available_versions[0], Version::Native);
462 }
463
464 #[async_test]
465 async fn test_cache_user_defined_notification_mode() -> Result<()> {
466 let client = MockClientBuilder::new(None).build().await;
467 let room_id = room_id!("!r0:matrix.org");
468
469 let sliding_sync = client
470 .sliding_sync("test")?
471 .with_account_data_extension(
472 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
473 )
474 .add_list(
475 SlidingSyncList::builder("all")
476 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
477 )
478 .build()
479 .await?;
480
481 {
484 let server_response = assign!(http::Response::new("0".to_owned()), {
485 rooms: BTreeMap::from([(
486 room_id.to_owned(),
487 http::response::Room::default(),
488 )]),
489 extensions: assign!(http::response::Extensions::default(), {
490 account_data: assign!(http::response::AccountData::default(), {
491 global: vec![
492 Raw::from_json_string(
493 json!({
494 "type": "m.push_rules",
495 "content": {
496 "global": {
497 "room": [
498 {
499 "actions": ["notify"],
500 "rule_id": room_id,
501 "default": false,
502 "enabled": true,
503 },
504 ],
505 },
506 },
507 })
508 .to_string(),
509 ).unwrap()
510 ]
511 })
512 })
513 });
514
515 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
516 sliding_sync
517 .handle_response(
518 server_response.clone(),
519 &mut pos_guard,
520 RequestedRequiredStates::default(),
521 )
522 .await?;
523 }
524
525 let room = client.get_room(room_id).unwrap();
527
528 assert_eq!(
530 room.cached_user_defined_notification_mode(),
531 Some(RoomNotificationMode::AllMessages),
532 );
533
534 {
538 let server_response = assign!(http::Response::new("0".to_owned()), {
539 rooms: BTreeMap::from([(
540 room_id.to_owned(),
541 http::response::Room::default(),
542 )]),
543 extensions: assign!(http::response::Extensions::default(), {
544 account_data: assign!(http::response::AccountData::default(), {
545 global: vec![
546 Raw::from_json_string(
547 json!({
548 "type": "m.push_rules",
549 "content": {
550 "global": {
551 "room": [
552 {
553 "actions": [],
554 "rule_id": room_id,
555 "default": false,
556 "enabled": true,
557 },
558 ],
559 },
560 },
561 })
562 .to_string(),
563 ).unwrap()
564 ]
565 })
566 })
567 });
568
569 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
570 sliding_sync
571 .handle_response(
572 server_response.clone(),
573 &mut pos_guard,
574 RequestedRequiredStates::default(),
575 )
576 .await?;
577 }
578
579 assert_eq!(
581 room.cached_user_defined_notification_mode(),
582 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
583 );
584
585 {
589 let server_response = assign!(http::Response::new("0".to_owned()), {
590 extensions: assign!(http::response::Extensions::default(), {
591 account_data: assign!(http::response::AccountData::default(), {
592 global: vec![
593 Raw::from_json_string(
594 json!({
595 "type": "m.push_rules",
596 "content": {
597 "global": {
598 "room": [
599 {
600 "actions": ["notify"],
601 "rule_id": room_id,
602 "default": false,
603 "enabled": true,
604 },
605 ],
606 },
607 },
608 })
609 .to_string(),
610 ).unwrap()
611 ]
612 })
613 })
614 });
615
616 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
617 sliding_sync
618 .handle_response(
619 server_response.clone(),
620 &mut pos_guard,
621 RequestedRequiredStates::default(),
622 )
623 .await?;
624 }
625
626 assert_eq!(
628 room.cached_user_defined_notification_mode(),
629 Some(RoomNotificationMode::AllMessages),
630 );
631
632 Ok(())
633 }
634
635 #[async_test]
636 async fn test_auto_listen_to_latest_events() -> Result<()> {
637 let client = MockClientBuilder::new(None).build().await;
638 let room_id = room_id!("!r0");
639
640 client.base_client().get_or_create_room(room_id, RoomState::Joined);
642
643 client.event_cache().subscribe()?;
645
646 assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
648
649 let sliding_sync = client
651 .sliding_sync("test")?
652 .add_list(
653 SlidingSyncList::builder("all")
654 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
655 )
656 .build()
657 .await?;
658
659 {
661 let server_response = assign!(http::Response::new("0".to_owned()), {
662 rooms: BTreeMap::from([(
663 room_id.to_owned(),
664 http::response::Room::default(),
665 )]),
666 });
667
668 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
669
670 sliding_sync
671 .handle_response(
672 server_response.clone(),
673 &mut pos_guard,
674 RequestedRequiredStates::default(),
675 )
676 .await?;
677 }
678
679 assert!(client.get_room(room_id).is_some());
681
682 assert!(client.latest_events().await.is_listening_to_room(room_id).await);
684
685 Ok(())
686 }
687
688 #[async_test]
689 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
690 use ruma::api::client::sync::sync_events::v5 as http;
691
692 let client = MockClientBuilder::new(None).build().await;
694 client.event_cache().subscribe().unwrap();
695
696 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
697
698 let room_id = room_id!("!r:e.uk");
700 let room = http::response::Room::new();
701 let mut response = http::Response::new("5".to_owned());
702 response.rooms.insert(room_id.to_owned(), room);
703
704 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
705 {
706 let state_store_guard = client.base_client().state_store_lock().lock().await;
707 processor
708 .handle_room_response(
709 &response,
710 &RequestedRequiredStates::default(),
711 &state_store_guard,
712 )
713 .await
714 .expect("Failed to process sync");
715 }
716 processor.process_and_take_response().await.expect("Failed to finish processing sync");
717
718 assert_matches!(
720 room_info_notable_update_stream.recv().await,
721 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
722 assert_eq!(received_room_id, room_id);
723 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
724 }
725 );
726 assert_matches!(
727 room_info_notable_update_stream.recv().await,
728 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
729 assert_eq!(received_room_id, room_id);
730 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
731 }
732 );
733 assert!(room_info_notable_update_stream.is_empty());
734
735 let room_id = room_id!("!r:e.uk");
738 let f = EventFactory::new().room(room_id).sender(user_id!("@u:h.uk"));
739 let events = vec![
740 f.text_msg("hi").event_id(event_id!("$3")).into_raw_sync(),
741 f.text_msg("hi").event_id(event_id!("$4")).into_raw_sync(),
742 ];
743 let room = assign!(http::response::Room::new(), {
744 timeline: events,
745 });
746 let mut response = http::Response::new("5".to_owned());
747 response.rooms.insert(room_id.to_owned(), room);
748
749 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
750 {
751 let state_store_guard = client.base_client().state_store_lock().lock().await;
752 processor
753 .handle_room_response(
754 &response,
755 &RequestedRequiredStates::default(),
756 &state_store_guard,
757 )
758 .await
759 .expect("Failed to process sync");
760 }
761 processor.process_and_take_response().await.expect("Failed to finish processing sync");
762
763 assert_matches!(
767 room_info_notable_update_stream.recv().await,
768 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
769 assert_eq!(received_room_id, room_id);
770 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
771 }
772 );
773 assert_matches!(
775 room_info_notable_update_stream.recv().await,
776 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
777 assert_eq!(received_room_id, room_id);
778 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
779 }
780 );
781
782 assert_matches!(
785 room_info_notable_update_stream.recv().await,
786 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
787 assert_eq!(received_room_id, room_id);
788 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT), "{received_reasons:?}");
789 }
790 );
791
792 yield_now().await;
793
794 assert!(room_info_notable_update_stream.is_empty());
796 }
797}