1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5 RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9 OwnedRoomId,
10 api::{
11 FeatureFlag, SupportedVersions,
12 client::sync::sync_events::v5::{self as http, response},
13 },
14 events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result, sync::subscribe_to_room_latest_events};
20
21#[derive(Clone, Debug)]
23pub enum Version {
24 None,
27
28 Native,
31}
32
33impl Version {
34 #[cfg(test)]
35 pub(crate) fn is_native(&self) -> bool {
36 matches!(self, Self::Native)
37 }
38}
39
40#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43 #[error("`.well-known` is not set")]
45 WellKnownNotSet,
46
47 #[error("The `/versions` response is not set")]
49 MissingVersionsResponse,
50
51 #[error(
54 "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55 or it's not set to true."
56 )]
57 NativeVersionIsUnset,
58}
59
60#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63 None,
65
66 Native,
68
69 DiscoverNative,
73}
74
75impl VersionBuilder {
76 pub(crate) fn needs_get_supported_versions(&self) -> bool {
77 matches!(self, Self::DiscoverNative)
78 }
79
80 pub fn build(
85 self,
86 supported: Option<&SupportedVersions>,
87 ) -> Result<Version, VersionBuilderError> {
88 Ok(match self {
89 Self::None => Version::None,
90
91 Self::Native => Version::Native,
92
93 Self::DiscoverNative => {
94 let Some(supported) = supported else {
95 return Err(VersionBuilderError::MissingVersionsResponse);
96 };
97
98 if supported.features.contains(&FeatureFlag::Msc4186) {
99 Version::Native
100 } else {
101 return Err(VersionBuilderError::NativeVersionIsUnset);
102 }
103 }
104 })
105 }
106}
107
108impl Client {
109 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117 let supported_versions = self.supported_versions().await.ok();
118
119 [VersionBuilder::DiscoverNative]
120 .into_iter()
121 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122 .collect()
123 }
124
125 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130 Ok(SlidingSync::builder(id.into(), self.clone())?)
131 }
132
133 #[cfg(any(test, feature = "testing"))]
139 #[tracing::instrument(skip(self, response))]
140 pub async fn process_sliding_sync_test_helper(
141 &self,
142 response: &http::Response,
143 requested_required_states: &RequestedRequiredStates,
144 ) -> Result<SyncResponse> {
145 let response =
146 self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148 tracing::debug!("done processing on base_client");
149 self.call_sync_response_handlers(&response).await?;
150
151 Ok(response)
152 }
153}
154
155#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162 client: Client,
163 to_device_events: Vec<ProcessedToDeviceEvent>,
164 response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168 pub fn new(client: Client) -> Self {
169 Self { client, to_device_events: Vec::new(), response: None }
170 }
171
172 #[cfg(feature = "e2e-encryption")]
173 pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174 assert!(self.response.is_none());
177
178 self.to_device_events = if let Some(to_device_events) = self
179 .client
180 .base_client()
181 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182 .await?
183 {
184 self.client.encryption().backups().maybe_trigger_backup();
186
187 to_device_events
188 } else {
189 Vec::new()
190 };
191
192 Ok(())
193 }
194
195 pub async fn handle_room_response(
196 &mut self,
197 response: &http::Response,
198 requested_required_states: &RequestedRequiredStates,
199 ) -> Result<()> {
200 subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
201
202 let previously_joined_rooms = self
203 .client
204 .joined_rooms()
205 .into_iter()
206 .map(|r| r.room_id().to_owned())
207 .collect::<BTreeSet<_>>();
208
209 let mut sync_response = self
210 .client
211 .base_client()
212 .process_sliding_sync(response, requested_required_states)
213 .await?;
214
215 handle_receipts_extension(&self.client, response, &mut sync_response).await?;
216
217 update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
218
219 self.response = Some(sync_response);
220
221 Ok(())
222 }
223
224 pub async fn handle_thread_subscriptions(
225 &mut self,
226 previous_pos: Option<&str>,
227 thread_subs: response::ThreadSubscriptions,
228 ) -> Result<()> {
229 let catchup_token =
230 thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
231 from: prev_batch,
232 to: previous_pos.map(|s| s.to_owned()),
233 });
234
235 self.client
236 .thread_subscription_catchup()
237 .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
238 .await?;
239
240 Ok(())
241 }
242
243 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
244 let mut response = self.response.take().unwrap_or_default();
245
246 response.to_device.extend(self.to_device_events);
247
248 self.client.call_sync_response_handlers(&response).await?;
249
250 Ok(response)
251 }
252}
253
254async fn update_in_memory_caches(
258 client: &Client,
259 previously_joined_rooms: &BTreeSet<OwnedRoomId>,
260 response: &SyncResponse,
261) {
262 let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
263
264 if response.account_data.iter().any(|event| {
267 event
268 .get_field::<GlobalAccountDataEventType>("type")
269 .ok()
270 .flatten()
271 .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
272 }) {
273 let notification_settings = client.notification_settings().await;
274 let rules = notification_settings.rules().await;
275
276 for room in client.joined_rooms() {
278 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
279 room.update_cached_user_defined_notification_mode(mode);
280 }
281 }
282 } else {
283 let mut rules = None;
288
289 for room_id in response
290 .rooms
291 .joined
292 .keys()
293 .filter(|room_id| !previously_joined_rooms.contains(*room_id))
294 {
295 let Some(room) = client.get_room(room_id) else {
296 error!(?room_id, "The room must exist since it has been joined");
297 continue;
298 };
299
300 let rules = if let Some(rules) = &mut rules {
302 rules
303 } else {
304 rules.insert(client.notification_settings().await.rules().await.clone())
305 };
306
307 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
309 room.update_cached_user_defined_notification_mode(mode);
310 }
311 }
312 }
313}
314
315async fn handle_receipts_extension(
317 client: &Client,
318 response: &http::Response,
319 sync_response: &mut SyncResponse,
320) -> Result<()> {
321 let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
322
323 let room_ids = BTreeSet::from_iter(
326 sync_response
327 .rooms
328 .joined
329 .keys()
330 .cloned()
331 .chain(response.extensions.receipts.rooms.keys().cloned()),
332 );
333
334 let futures = room_ids.into_iter().map(|room_id| {
336 let new_sync_events = sync_response
337 .rooms
338 .joined
339 .entry(room_id.to_owned())
340 .or_default()
341 .timeline
342 .events
343 .clone();
344
345 async {
346 let Ok((room_event_cache, _drop_handle)) =
347 client.event_cache().for_room(&room_id).await
348 else {
349 tracing::info!(
350 ?room_id,
351 "Failed to fetch the `RoomEventCache` when computing unread counts"
352 );
353 return Ok::<_, crate::Error>(None);
354 };
355
356 let previous_events = room_event_cache.events().await;
357
358 let receipt_event = client
359 .base_client()
360 .process_sliding_sync_receipts_extension_for_room(
361 &room_id,
362 response,
363 new_sync_events,
364 previous_events,
365 )
366 .await?;
367
368 Ok(Some((room_id, receipt_event)))
369 }
370 });
371
372 let updates = try_join_all(futures).await?;
373
374 for (room_id, receipt_event_content) in updates.into_iter().flatten() {
375 if let Some(event) = receipt_event_content {
376 sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
377 }
378 }
379
380 Ok(())
381}
382
383#[cfg(all(test, not(target_family = "wasm")))]
384mod tests {
385 use std::{collections::BTreeMap, ops::Not};
386
387 use assert_matches::assert_matches;
388 use matrix_sdk_base::{
389 RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
390 notification_settings::RoomNotificationMode,
391 };
392 use matrix_sdk_test::async_test;
393 use ruma::{
394 api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
395 room_id, serde::Raw,
396 };
397 use serde_json::json;
398
399 use super::{Version, VersionBuilder};
400 use crate::{
401 SlidingSyncList, SlidingSyncMode,
402 error::Result,
403 sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
404 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
405 };
406
407 #[test]
408 fn test_version_builder_none() {
409 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
410 }
411
412 #[test]
413 fn test_version_builder_native() {
414 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
415 }
416
417 #[test]
418 fn test_version_builder_discover_native() {
419 let mut response = get_supported_versions::Response::new(vec![]);
420 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
421
422 assert_matches!(
423 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
424 Ok(Version::Native)
425 );
426 }
427
428 #[test]
429 fn test_version_builder_discover_native_no_supported_versions() {
430 assert_matches!(
431 VersionBuilder::DiscoverNative.build(None),
432 Err(VersionBuilderError::MissingVersionsResponse)
433 );
434 }
435
436 #[test]
437 fn test_version_builder_discover_native_unstable_features_is_disabled() {
438 let mut response = get_supported_versions::Response::new(vec![]);
439 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
440
441 assert_matches!(
442 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
443 Err(VersionBuilderError::NativeVersionIsUnset)
444 );
445 }
446
447 #[async_test]
448 async fn test_available_sliding_sync_versions_none() {
449 let client = MockClientBuilder::new(None).build().await;
450 let available_versions = client.available_sliding_sync_versions().await;
451
452 assert!(available_versions.is_empty());
455 }
456
457 #[async_test]
458 async fn test_available_sliding_sync_versions_native() {
459 let server = MatrixMockServer::new().await;
460 let client = server.client_builder().no_server_versions().build().await;
461
462 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
463
464 let available_versions = client.available_sliding_sync_versions().await;
465
466 assert_eq!(available_versions.len(), 1);
468 assert_matches!(available_versions[0], Version::Native);
469 }
470
471 #[async_test]
472 async fn test_cache_user_defined_notification_mode() -> Result<()> {
473 let client = MockClientBuilder::new(None).build().await;
474 let room_id = room_id!("!r0:matrix.org");
475
476 let sliding_sync = client
477 .sliding_sync("test")?
478 .with_account_data_extension(
479 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
480 )
481 .add_list(
482 SlidingSyncList::builder("all")
483 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
484 )
485 .build()
486 .await?;
487
488 {
491 let server_response = assign!(http::Response::new("0".to_owned()), {
492 rooms: BTreeMap::from([(
493 room_id.to_owned(),
494 http::response::Room::default(),
495 )]),
496 extensions: assign!(http::response::Extensions::default(), {
497 account_data: assign!(http::response::AccountData::default(), {
498 global: vec![
499 Raw::from_json_string(
500 json!({
501 "type": "m.push_rules",
502 "content": {
503 "global": {
504 "room": [
505 {
506 "actions": ["notify"],
507 "rule_id": room_id,
508 "default": false,
509 "enabled": true,
510 },
511 ],
512 },
513 },
514 })
515 .to_string(),
516 ).unwrap()
517 ]
518 })
519 })
520 });
521
522 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
523 sliding_sync
524 .handle_response(
525 server_response.clone(),
526 &mut pos_guard,
527 RequestedRequiredStates::default(),
528 )
529 .await?;
530 }
531
532 let room = client.get_room(room_id).unwrap();
534
535 assert_eq!(
537 room.cached_user_defined_notification_mode(),
538 Some(RoomNotificationMode::AllMessages),
539 );
540
541 {
545 let server_response = assign!(http::Response::new("0".to_owned()), {
546 rooms: BTreeMap::from([(
547 room_id.to_owned(),
548 http::response::Room::default(),
549 )]),
550 extensions: assign!(http::response::Extensions::default(), {
551 account_data: assign!(http::response::AccountData::default(), {
552 global: vec![
553 Raw::from_json_string(
554 json!({
555 "type": "m.push_rules",
556 "content": {
557 "global": {
558 "room": [
559 {
560 "actions": [],
561 "rule_id": room_id,
562 "default": false,
563 "enabled": true,
564 },
565 ],
566 },
567 },
568 })
569 .to_string(),
570 ).unwrap()
571 ]
572 })
573 })
574 });
575
576 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
577 sliding_sync
578 .handle_response(
579 server_response.clone(),
580 &mut pos_guard,
581 RequestedRequiredStates::default(),
582 )
583 .await?;
584 }
585
586 assert_eq!(
588 room.cached_user_defined_notification_mode(),
589 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
590 );
591
592 {
596 let server_response = assign!(http::Response::new("0".to_owned()), {
597 extensions: assign!(http::response::Extensions::default(), {
598 account_data: assign!(http::response::AccountData::default(), {
599 global: vec![
600 Raw::from_json_string(
601 json!({
602 "type": "m.push_rules",
603 "content": {
604 "global": {
605 "room": [
606 {
607 "actions": ["notify"],
608 "rule_id": room_id,
609 "default": false,
610 "enabled": true,
611 },
612 ],
613 },
614 },
615 })
616 .to_string(),
617 ).unwrap()
618 ]
619 })
620 })
621 });
622
623 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
624 sliding_sync
625 .handle_response(
626 server_response.clone(),
627 &mut pos_guard,
628 RequestedRequiredStates::default(),
629 )
630 .await?;
631 }
632
633 assert_eq!(
635 room.cached_user_defined_notification_mode(),
636 Some(RoomNotificationMode::AllMessages),
637 );
638
639 Ok(())
640 }
641
642 #[async_test]
643 async fn test_auto_listen_to_latest_events() -> Result<()> {
644 let client = MockClientBuilder::new(None).build().await;
645 let room_id = room_id!("!r0");
646
647 client.base_client().get_or_create_room(room_id, RoomState::Joined);
649
650 client.event_cache().subscribe()?;
652
653 assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
655
656 let sliding_sync = client
658 .sliding_sync("test")?
659 .add_list(
660 SlidingSyncList::builder("all")
661 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
662 )
663 .build()
664 .await?;
665
666 {
668 let server_response = assign!(http::Response::new("0".to_owned()), {
669 rooms: BTreeMap::from([(
670 room_id.to_owned(),
671 http::response::Room::default(),
672 )]),
673 });
674
675 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
676
677 sliding_sync
678 .handle_response(
679 server_response.clone(),
680 &mut pos_guard,
681 RequestedRequiredStates::default(),
682 )
683 .await?;
684 }
685
686 assert!(client.get_room(room_id).is_some());
688
689 assert!(client.latest_events().await.is_listening_to_room(room_id).await);
691
692 Ok(())
693 }
694
695 #[async_test]
696 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
697 use ruma::api::client::sync::sync_events::v5 as http;
698
699 let client = MockClientBuilder::new(None).build().await;
701 client.event_cache().subscribe().unwrap();
702
703 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
704
705 let room_id = room_id!("!r:e.uk");
707 let room = http::response::Room::new();
708 let mut response = http::Response::new("5".to_owned());
709 response.rooms.insert(room_id.to_owned(), room);
710
711 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
712 processor
713 .handle_room_response(&response, &RequestedRequiredStates::default())
714 .await
715 .expect("Failed to process sync");
716 processor.process_and_take_response().await.expect("Failed to finish processing sync");
717
718 assert_matches!(
720 room_info_notable_update_stream.recv().await,
721 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
722 assert_eq!(received_room_id, room_id);
723 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
724 }
725 );
726 assert_matches!(
727 room_info_notable_update_stream.recv().await,
728 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
729 assert_eq!(received_room_id, room_id);
730 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
731 }
732 );
733 assert!(room_info_notable_update_stream.is_empty());
734
735 let room_id = room_id!("!r:e.uk");
738 let events = vec![
739 make_raw_event("m.room.message", "$3"),
740 make_raw_event("m.room.message", "$4"),
741 make_raw_event("m.read", "$5"),
742 ];
743 let room = assign!(http::response::Room::new(), {
744 timeline: events,
745 });
746 let mut response = http::Response::new("5".to_owned());
747 response.rooms.insert(room_id.to_owned(), room);
748
749 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
750 processor
751 .handle_room_response(&response, &RequestedRequiredStates::default())
752 .await
753 .expect("Failed to process sync");
754 processor.process_and_take_response().await.expect("Failed to finish processing sync");
755
756 assert_matches!(
760 room_info_notable_update_stream.recv().await,
761 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
762 assert_eq!(received_room_id, room_id);
763 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
764 }
765 );
766 assert_matches!(
768 room_info_notable_update_stream.recv().await,
769 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
770 assert_eq!(received_room_id, room_id);
771 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
772 }
773 );
774 assert!(room_info_notable_update_stream.is_empty());
775 }
776
777 fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
778 Raw::from_json_string(
779 json!({
780 "type": event_type,
781 "event_id": id,
782 "content": { "msgtype": "m.text", "body": "my msg" },
783 "sender": "@u:h.uk",
784 "origin_server_ts": 12344445,
785 })
786 .to_string(),
787 )
788 .unwrap()
789 }
790}