1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5 RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9 OwnedRoomId,
10 api::{
11 FeatureFlag, SupportedVersions,
12 client::sync::sync_events::v5::{self as http, response},
13 },
14 events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result, sync::subscribe_to_room_latest_events};
20
21#[derive(Clone, Debug)]
23pub enum Version {
24 None,
27
28 Native,
31}
32
33impl Version {
34 #[cfg(test)]
35 pub(crate) fn is_native(&self) -> bool {
36 matches!(self, Self::Native)
37 }
38}
39
40#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43 #[error("`.well-known` is not set")]
45 WellKnownNotSet,
46
47 #[error("The `/versions` response is not set")]
49 MissingVersionsResponse,
50
51 #[error(
54 "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55 or it's not set to true."
56 )]
57 NativeVersionIsUnset,
58}
59
60#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63 None,
65
66 Native,
68
69 DiscoverNative,
73}
74
75impl VersionBuilder {
76 pub(crate) fn needs_get_supported_versions(&self) -> bool {
77 matches!(self, Self::DiscoverNative)
78 }
79
80 pub fn build(
85 self,
86 supported: Option<&SupportedVersions>,
87 ) -> Result<Version, VersionBuilderError> {
88 Ok(match self {
89 Self::None => Version::None,
90
91 Self::Native => Version::Native,
92
93 Self::DiscoverNative => {
94 let Some(supported) = supported else {
95 return Err(VersionBuilderError::MissingVersionsResponse);
96 };
97
98 if supported.features.contains(&FeatureFlag::Msc4186) {
99 Version::Native
100 } else {
101 return Err(VersionBuilderError::NativeVersionIsUnset);
102 }
103 }
104 })
105 }
106}
107
108impl Client {
109 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117 let supported_versions = self.supported_versions().await.ok();
118
119 [VersionBuilder::DiscoverNative]
120 .into_iter()
121 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122 .collect()
123 }
124
125 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130 Ok(SlidingSync::builder(id.into(), self.clone())?)
131 }
132
133 #[cfg(any(test, feature = "testing"))]
139 #[tracing::instrument(skip(self, response))]
140 pub async fn process_sliding_sync_test_helper(
141 &self,
142 response: &http::Response,
143 requested_required_states: &RequestedRequiredStates,
144 ) -> Result<SyncResponse> {
145 let response =
146 self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148 tracing::debug!("done processing on base_client");
149 self.call_sync_response_handlers(&response).await?;
150
151 Ok(response)
152 }
153}
154
155#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162 client: Client,
163 to_device_events: Vec<ProcessedToDeviceEvent>,
164 response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168 pub fn new(client: Client) -> Self {
169 Self { client, to_device_events: Vec::new(), response: None }
170 }
171
172 #[cfg(feature = "e2e-encryption")]
173 pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174 assert!(self.response.is_none());
177
178 self.to_device_events = if let Some(to_device_events) = self
179 .client
180 .base_client()
181 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182 .await?
183 {
184 self.client.encryption().backups().maybe_trigger_backup();
186
187 to_device_events
188 } else {
189 Vec::new()
190 };
191
192 Ok(())
193 }
194
195 pub async fn handle_room_response(
196 &mut self,
197 response: &http::Response,
198 requested_required_states: &RequestedRequiredStates,
199 ) -> Result<()> {
200 subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
201
202 let previously_joined_rooms = self
203 .client
204 .joined_rooms()
205 .into_iter()
206 .map(|r| r.room_id().to_owned())
207 .collect::<BTreeSet<_>>();
208
209 let mut sync_response = self
210 .client
211 .base_client()
212 .process_sliding_sync(response, requested_required_states)
213 .await?;
214
215 handle_receipts_extension(&self.client, response, &mut sync_response).await?;
216
217 update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
218
219 self.response = Some(sync_response);
220
221 Ok(())
222 }
223
224 pub async fn handle_thread_subscriptions(
225 &mut self,
226 previous_pos: Option<&str>,
227 thread_subs: response::ThreadSubscriptions,
228 ) -> Result<()> {
229 let catchup_token =
230 thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
231 from: prev_batch,
232 to: previous_pos.map(|s| s.to_owned()),
233 });
234
235 self.client
236 .thread_subscription_catchup()
237 .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
238 .await?;
239
240 Ok(())
241 }
242
243 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
244 let mut response = self.response.take().unwrap_or_default();
245
246 response.to_device.extend(self.to_device_events);
247
248 self.client.call_sync_response_handlers(&response).await?;
249
250 Ok(response)
251 }
252}
253
254async fn update_in_memory_caches(
258 client: &Client,
259 previously_joined_rooms: &BTreeSet<OwnedRoomId>,
260 response: &SyncResponse,
261) {
262 let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
263
264 if response.account_data.iter().any(|event| {
267 event
268 .get_field::<GlobalAccountDataEventType>("type")
269 .ok()
270 .flatten()
271 .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
272 }) {
273 let notification_settings = client.notification_settings().await;
274 let rules = notification_settings.rules().await;
275
276 for room in client.joined_rooms() {
278 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
279 room.update_cached_user_defined_notification_mode(mode);
280 } else {
281 room.clear_user_defined_notification_mode();
282 }
283 }
284 } else {
285 let mut rules = None;
290
291 for room_id in response
292 .rooms
293 .joined
294 .keys()
295 .filter(|room_id| !previously_joined_rooms.contains(*room_id))
296 {
297 let Some(room) = client.get_room(room_id) else {
298 error!(?room_id, "The room must exist since it has been joined");
299 continue;
300 };
301
302 let rules = if let Some(rules) = &mut rules {
304 rules
305 } else {
306 rules.insert(client.notification_settings().await.rules().await.clone())
307 };
308
309 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
311 room.update_cached_user_defined_notification_mode(mode);
312 }
313 }
314 }
315}
316
317async fn handle_receipts_extension(
319 client: &Client,
320 response: &http::Response,
321 sync_response: &mut SyncResponse,
322) -> Result<()> {
323 let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
324
325 let room_ids = BTreeSet::from_iter(
328 sync_response
329 .rooms
330 .joined
331 .keys()
332 .cloned()
333 .chain(response.extensions.receipts.rooms.keys().cloned()),
334 );
335
336 let futures = room_ids.into_iter().map(|room_id| async {
338 let receipt_event = client
339 .base_client()
340 .process_sliding_sync_receipts_extension_for_room(&room_id, response)
341 .await?;
342
343 Result::<_, crate::Error>::Ok(Some((room_id, receipt_event)))
344 });
345
346 let updates = try_join_all(futures).await?;
347
348 for (room_id, receipt_event_content) in updates.into_iter().flatten() {
349 if let Some(event) = receipt_event_content {
350 sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
351 }
352 }
353
354 Ok(())
355}
356
357#[cfg(all(test, not(target_family = "wasm")))]
358mod tests {
359 use std::{collections::BTreeMap, ops::Not};
360
361 use assert_matches::assert_matches;
362 use matrix_sdk_base::{
363 RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
364 notification_settings::RoomNotificationMode,
365 };
366 use matrix_sdk_test::{async_test, event_factory::EventFactory};
367 use ruma::{
368 api::client::discovery::get_supported_versions, assign, event_id, room_id, serde::Raw,
369 user_id,
370 };
371 use serde_json::json;
372 use tokio::task::yield_now;
373
374 use super::{Version, VersionBuilder};
375 use crate::{
376 SlidingSyncList, SlidingSyncMode,
377 error::Result,
378 sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
379 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
380 };
381
382 #[test]
383 fn test_version_builder_none() {
384 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
385 }
386
387 #[test]
388 fn test_version_builder_native() {
389 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
390 }
391
392 #[test]
393 fn test_version_builder_discover_native() {
394 let mut response = get_supported_versions::Response::new(vec![]);
395 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
396
397 assert_matches!(
398 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
399 Ok(Version::Native)
400 );
401 }
402
403 #[test]
404 fn test_version_builder_discover_native_no_supported_versions() {
405 assert_matches!(
406 VersionBuilder::DiscoverNative.build(None),
407 Err(VersionBuilderError::MissingVersionsResponse)
408 );
409 }
410
411 #[test]
412 fn test_version_builder_discover_native_unstable_features_is_disabled() {
413 let mut response = get_supported_versions::Response::new(vec![]);
414 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
415
416 assert_matches!(
417 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
418 Err(VersionBuilderError::NativeVersionIsUnset)
419 );
420 }
421
422 #[async_test]
423 async fn test_available_sliding_sync_versions_none() {
424 let client = MockClientBuilder::new(None).build().await;
425 let available_versions = client.available_sliding_sync_versions().await;
426
427 assert!(available_versions.is_empty());
430 }
431
432 #[async_test]
433 async fn test_available_sliding_sync_versions_native() {
434 let server = MatrixMockServer::new().await;
435 let client = server.client_builder().no_server_versions().build().await;
436
437 server.mock_versions().with_simplified_sliding_sync().ok().mock_once().mount().await;
438
439 let available_versions = client.available_sliding_sync_versions().await;
440
441 assert_eq!(available_versions.len(), 1);
443 assert_matches!(available_versions[0], Version::Native);
444 }
445
446 #[async_test]
447 async fn test_cache_user_defined_notification_mode() -> Result<()> {
448 let client = MockClientBuilder::new(None).build().await;
449 let room_id = room_id!("!r0:matrix.org");
450
451 let sliding_sync = client
452 .sliding_sync("test")?
453 .with_account_data_extension(
454 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
455 )
456 .add_list(
457 SlidingSyncList::builder("all")
458 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
459 )
460 .build()
461 .await?;
462
463 {
466 let server_response = assign!(http::Response::new("0".to_owned()), {
467 rooms: BTreeMap::from([(
468 room_id.to_owned(),
469 http::response::Room::default(),
470 )]),
471 extensions: assign!(http::response::Extensions::default(), {
472 account_data: assign!(http::response::AccountData::default(), {
473 global: vec![
474 Raw::from_json_string(
475 json!({
476 "type": "m.push_rules",
477 "content": {
478 "global": {
479 "room": [
480 {
481 "actions": ["notify"],
482 "rule_id": room_id,
483 "default": false,
484 "enabled": true,
485 },
486 ],
487 },
488 },
489 })
490 .to_string(),
491 ).unwrap()
492 ]
493 })
494 })
495 });
496
497 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
498 sliding_sync
499 .handle_response(
500 server_response.clone(),
501 &mut pos_guard,
502 RequestedRequiredStates::default(),
503 )
504 .await?;
505 }
506
507 let room = client.get_room(room_id).unwrap();
509
510 assert_eq!(
512 room.cached_user_defined_notification_mode(),
513 Some(RoomNotificationMode::AllMessages),
514 );
515
516 {
520 let server_response = assign!(http::Response::new("0".to_owned()), {
521 rooms: BTreeMap::from([(
522 room_id.to_owned(),
523 http::response::Room::default(),
524 )]),
525 extensions: assign!(http::response::Extensions::default(), {
526 account_data: assign!(http::response::AccountData::default(), {
527 global: vec![
528 Raw::from_json_string(
529 json!({
530 "type": "m.push_rules",
531 "content": {
532 "global": {
533 "room": [
534 {
535 "actions": [],
536 "rule_id": room_id,
537 "default": false,
538 "enabled": true,
539 },
540 ],
541 },
542 },
543 })
544 .to_string(),
545 ).unwrap()
546 ]
547 })
548 })
549 });
550
551 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
552 sliding_sync
553 .handle_response(
554 server_response.clone(),
555 &mut pos_guard,
556 RequestedRequiredStates::default(),
557 )
558 .await?;
559 }
560
561 assert_eq!(
563 room.cached_user_defined_notification_mode(),
564 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
565 );
566
567 {
571 let server_response = assign!(http::Response::new("0".to_owned()), {
572 extensions: assign!(http::response::Extensions::default(), {
573 account_data: assign!(http::response::AccountData::default(), {
574 global: vec![
575 Raw::from_json_string(
576 json!({
577 "type": "m.push_rules",
578 "content": {
579 "global": {
580 "room": [
581 {
582 "actions": ["notify"],
583 "rule_id": room_id,
584 "default": false,
585 "enabled": true,
586 },
587 ],
588 },
589 },
590 })
591 .to_string(),
592 ).unwrap()
593 ]
594 })
595 })
596 });
597
598 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
599 sliding_sync
600 .handle_response(
601 server_response.clone(),
602 &mut pos_guard,
603 RequestedRequiredStates::default(),
604 )
605 .await?;
606 }
607
608 assert_eq!(
610 room.cached_user_defined_notification_mode(),
611 Some(RoomNotificationMode::AllMessages),
612 );
613
614 Ok(())
615 }
616
617 #[async_test]
618 async fn test_auto_listen_to_latest_events() -> Result<()> {
619 let client = MockClientBuilder::new(None).build().await;
620 let room_id = room_id!("!r0");
621
622 client.base_client().get_or_create_room(room_id, RoomState::Joined);
624
625 client.event_cache().subscribe()?;
627
628 assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
630
631 let sliding_sync = client
633 .sliding_sync("test")?
634 .add_list(
635 SlidingSyncList::builder("all")
636 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
637 )
638 .build()
639 .await?;
640
641 {
643 let server_response = assign!(http::Response::new("0".to_owned()), {
644 rooms: BTreeMap::from([(
645 room_id.to_owned(),
646 http::response::Room::default(),
647 )]),
648 });
649
650 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
651
652 sliding_sync
653 .handle_response(
654 server_response.clone(),
655 &mut pos_guard,
656 RequestedRequiredStates::default(),
657 )
658 .await?;
659 }
660
661 assert!(client.get_room(room_id).is_some());
663
664 assert!(client.latest_events().await.is_listening_to_room(room_id).await);
666
667 Ok(())
668 }
669
670 #[async_test]
671 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
672 use ruma::api::client::sync::sync_events::v5 as http;
673
674 let client = MockClientBuilder::new(None).build().await;
676 client.event_cache().subscribe().unwrap();
677
678 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
679
680 let room_id = room_id!("!r:e.uk");
682 let room = http::response::Room::new();
683 let mut response = http::Response::new("5".to_owned());
684 response.rooms.insert(room_id.to_owned(), room);
685
686 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
687 processor
688 .handle_room_response(&response, &RequestedRequiredStates::default())
689 .await
690 .expect("Failed to process sync");
691 processor.process_and_take_response().await.expect("Failed to finish processing sync");
692
693 assert_matches!(
695 room_info_notable_update_stream.recv().await,
696 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
697 assert_eq!(received_room_id, room_id);
698 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
699 }
700 );
701 assert_matches!(
702 room_info_notable_update_stream.recv().await,
703 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
704 assert_eq!(received_room_id, room_id);
705 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
706 }
707 );
708 assert!(room_info_notable_update_stream.is_empty());
709
710 let room_id = room_id!("!r:e.uk");
713 let f = EventFactory::new().room(room_id).sender(user_id!("@u:h.uk"));
714 let events = vec![
715 f.text_msg("hi").event_id(event_id!("$3")).into_raw_sync(),
716 f.text_msg("hi").event_id(event_id!("$4")).into_raw_sync(),
717 ];
718 let room = assign!(http::response::Room::new(), {
719 timeline: events,
720 });
721 let mut response = http::Response::new("5".to_owned());
722 response.rooms.insert(room_id.to_owned(), room);
723
724 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
725 processor
726 .handle_room_response(&response, &RequestedRequiredStates::default())
727 .await
728 .expect("Failed to process sync");
729 processor.process_and_take_response().await.expect("Failed to finish processing sync");
730
731 assert_matches!(
735 room_info_notable_update_stream.recv().await,
736 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
737 assert_eq!(received_room_id, room_id);
738 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
739 }
740 );
741 assert_matches!(
743 room_info_notable_update_stream.recv().await,
744 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
745 assert_eq!(received_room_id, room_id);
746 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
747 }
748 );
749
750 assert_matches!(
753 room_info_notable_update_stream.recv().await,
754 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
755 assert_eq!(received_room_id, room_id);
756 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT), "{received_reasons:?}");
757 }
758 );
759
760 assert_matches!(
763 room_info_notable_update_stream.recv().await,
764 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
765 assert_eq!(received_room_id, room_id);
766 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT), "{received_reasons:?}");
767 }
768 );
769
770 yield_now().await;
771
772 assert!(room_info_notable_update_stream.is_empty());
774 }
775}