1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5 RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9 OwnedRoomId,
10 api::{
11 FeatureFlag, SupportedVersions,
12 client::sync::sync_events::v5::{self as http, response},
13 },
14 events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result, sync::subscribe_to_room_latest_events};
20
21#[derive(Clone, Debug)]
23pub enum Version {
24 None,
27
28 Native,
31}
32
33impl Version {
34 #[cfg(test)]
35 pub(crate) fn is_native(&self) -> bool {
36 matches!(self, Self::Native)
37 }
38}
39
40#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43 #[error("`.well-known` is not set")]
45 WellKnownNotSet,
46
47 #[error("The `/versions` response is not set")]
49 MissingVersionsResponse,
50
51 #[error(
54 "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55 or it's not set to true."
56 )]
57 NativeVersionIsUnset,
58}
59
60#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63 None,
65
66 Native,
68
69 DiscoverNative,
73}
74
75impl VersionBuilder {
76 pub(crate) fn needs_get_supported_versions(&self) -> bool {
77 matches!(self, Self::DiscoverNative)
78 }
79
80 pub fn build(
85 self,
86 supported: Option<&SupportedVersions>,
87 ) -> Result<Version, VersionBuilderError> {
88 Ok(match self {
89 Self::None => Version::None,
90
91 Self::Native => Version::Native,
92
93 Self::DiscoverNative => {
94 let Some(supported) = supported else {
95 return Err(VersionBuilderError::MissingVersionsResponse);
96 };
97
98 if supported.features.contains(&FeatureFlag::Msc4186) {
99 Version::Native
100 } else {
101 return Err(VersionBuilderError::NativeVersionIsUnset);
102 }
103 }
104 })
105 }
106}
107
108impl Client {
109 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117 let supported_versions = self.supported_versions().await.ok();
118
119 [VersionBuilder::DiscoverNative]
120 .into_iter()
121 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122 .collect()
123 }
124
125 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130 Ok(SlidingSync::builder(id.into(), self.clone())?)
131 }
132
133 #[cfg(any(test, feature = "testing"))]
139 #[tracing::instrument(skip(self, response))]
140 pub async fn process_sliding_sync_test_helper(
141 &self,
142 response: &http::Response,
143 requested_required_states: &RequestedRequiredStates,
144 ) -> Result<SyncResponse> {
145 let response =
146 self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148 tracing::debug!("done processing on base_client");
149 self.call_sync_response_handlers(&response).await?;
150
151 Ok(response)
152 }
153}
154
155#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162 client: Client,
163 to_device_events: Vec<ProcessedToDeviceEvent>,
164 response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168 pub fn new(client: Client) -> Self {
169 Self { client, to_device_events: Vec::new(), response: None }
170 }
171
172 #[cfg(feature = "e2e-encryption")]
173 pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174 assert!(self.response.is_none());
177
178 self.to_device_events = if let Some(to_device_events) = self
179 .client
180 .base_client()
181 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182 .await?
183 {
184 self.client.encryption().backups().maybe_trigger_backup();
186
187 to_device_events
188 } else {
189 Vec::new()
190 };
191
192 Ok(())
193 }
194
195 pub async fn handle_room_response(
196 &mut self,
197 response: &http::Response,
198 requested_required_states: &RequestedRequiredStates,
199 ) -> Result<()> {
200 subscribe_to_room_latest_events(&self.client, response.rooms.keys()).await;
201
202 let previously_joined_rooms = self
203 .client
204 .joined_rooms()
205 .into_iter()
206 .map(|r| r.room_id().to_owned())
207 .collect::<BTreeSet<_>>();
208
209 let mut sync_response = self
210 .client
211 .base_client()
212 .process_sliding_sync(response, requested_required_states)
213 .await?;
214
215 handle_receipts_extension(&self.client, response, &mut sync_response).await?;
216
217 update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
218
219 self.response = Some(sync_response);
220
221 Ok(())
222 }
223
224 pub async fn handle_thread_subscriptions(
225 &mut self,
226 previous_pos: Option<&str>,
227 thread_subs: response::ThreadSubscriptions,
228 ) -> Result<()> {
229 let catchup_token =
230 thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
231 from: prev_batch,
232 to: previous_pos.map(|s| s.to_owned()),
233 });
234
235 self.client
236 .thread_subscription_catchup()
237 .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
238 .await?;
239
240 Ok(())
241 }
242
243 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
244 let mut response = self.response.take().unwrap_or_default();
245
246 response.to_device.extend(self.to_device_events);
247
248 self.client.call_sync_response_handlers(&response).await?;
249
250 Ok(response)
251 }
252}
253
254async fn update_in_memory_caches(
258 client: &Client,
259 previously_joined_rooms: &BTreeSet<OwnedRoomId>,
260 response: &SyncResponse,
261) {
262 let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
263
264 if response.account_data.iter().any(|event| {
267 event
268 .get_field::<GlobalAccountDataEventType>("type")
269 .ok()
270 .flatten()
271 .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
272 }) {
273 let notification_settings = client.notification_settings().await;
274 let rules = notification_settings.rules().await;
275
276 for room in client.joined_rooms() {
278 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
279 room.update_cached_user_defined_notification_mode(mode);
280 } else {
281 room.clear_user_defined_notification_mode();
282 }
283 }
284 } else {
285 let mut rules = None;
290
291 for room_id in response
292 .rooms
293 .joined
294 .keys()
295 .filter(|room_id| !previously_joined_rooms.contains(*room_id))
296 {
297 let Some(room) = client.get_room(room_id) else {
298 error!(?room_id, "The room must exist since it has been joined");
299 continue;
300 };
301
302 let rules = if let Some(rules) = &mut rules {
304 rules
305 } else {
306 rules.insert(client.notification_settings().await.rules().await.clone())
307 };
308
309 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
311 room.update_cached_user_defined_notification_mode(mode);
312 }
313 }
314 }
315}
316
317async fn handle_receipts_extension(
319 client: &Client,
320 response: &http::Response,
321 sync_response: &mut SyncResponse,
322) -> Result<()> {
323 let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
324
325 let room_ids = BTreeSet::from_iter(
328 sync_response
329 .rooms
330 .joined
331 .keys()
332 .cloned()
333 .chain(response.extensions.receipts.rooms.keys().cloned()),
334 );
335
336 let futures = room_ids.into_iter().map(|room_id| {
338 let new_sync_events = sync_response
339 .rooms
340 .joined
341 .entry(room_id.to_owned())
342 .or_default()
343 .timeline
344 .events
345 .clone();
346
347 async {
348 let Ok((room_event_cache, _drop_handle)) =
349 client.event_cache().for_room(&room_id).await
350 else {
351 tracing::info!(
352 ?room_id,
353 "Failed to fetch the `RoomEventCache` when computing unread counts"
354 );
355 return Ok::<_, crate::Error>(None);
356 };
357
358 let previous_events = room_event_cache.events().await;
359
360 let receipt_event = client
361 .base_client()
362 .process_sliding_sync_receipts_extension_for_room(
363 &room_id,
364 response,
365 new_sync_events,
366 previous_events,
367 )
368 .await?;
369
370 Ok(Some((room_id, receipt_event)))
371 }
372 });
373
374 let updates = try_join_all(futures).await?;
375
376 for (room_id, receipt_event_content) in updates.into_iter().flatten() {
377 if let Some(event) = receipt_event_content {
378 sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
379 }
380 }
381
382 Ok(())
383}
384
385#[cfg(all(test, not(target_family = "wasm")))]
386mod tests {
387 use std::{collections::BTreeMap, ops::Not};
388
389 use assert_matches::assert_matches;
390 use matrix_sdk_base::{
391 RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomState,
392 notification_settings::RoomNotificationMode,
393 };
394 use matrix_sdk_test::async_test;
395 use ruma::{
396 api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
397 room_id, serde::Raw,
398 };
399 use serde_json::json;
400
401 use super::{Version, VersionBuilder};
402 use crate::{
403 SlidingSyncList, SlidingSyncMode,
404 error::Result,
405 sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
406 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
407 };
408
409 #[test]
410 fn test_version_builder_none() {
411 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
412 }
413
414 #[test]
415 fn test_version_builder_native() {
416 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
417 }
418
419 #[test]
420 fn test_version_builder_discover_native() {
421 let mut response = get_supported_versions::Response::new(vec![]);
422 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
423
424 assert_matches!(
425 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
426 Ok(Version::Native)
427 );
428 }
429
430 #[test]
431 fn test_version_builder_discover_native_no_supported_versions() {
432 assert_matches!(
433 VersionBuilder::DiscoverNative.build(None),
434 Err(VersionBuilderError::MissingVersionsResponse)
435 );
436 }
437
438 #[test]
439 fn test_version_builder_discover_native_unstable_features_is_disabled() {
440 let mut response = get_supported_versions::Response::new(vec![]);
441 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
442
443 assert_matches!(
444 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
445 Err(VersionBuilderError::NativeVersionIsUnset)
446 );
447 }
448
449 #[async_test]
450 async fn test_available_sliding_sync_versions_none() {
451 let client = MockClientBuilder::new(None).build().await;
452 let available_versions = client.available_sliding_sync_versions().await;
453
454 assert!(available_versions.is_empty());
457 }
458
459 #[async_test]
460 async fn test_available_sliding_sync_versions_native() {
461 let server = MatrixMockServer::new().await;
462 let client = server.client_builder().no_server_versions().build().await;
463
464 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
465
466 let available_versions = client.available_sliding_sync_versions().await;
467
468 assert_eq!(available_versions.len(), 1);
470 assert_matches!(available_versions[0], Version::Native);
471 }
472
473 #[async_test]
474 async fn test_cache_user_defined_notification_mode() -> Result<()> {
475 let client = MockClientBuilder::new(None).build().await;
476 let room_id = room_id!("!r0:matrix.org");
477
478 let sliding_sync = client
479 .sliding_sync("test")?
480 .with_account_data_extension(
481 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
482 )
483 .add_list(
484 SlidingSyncList::builder("all")
485 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
486 )
487 .build()
488 .await?;
489
490 {
493 let server_response = assign!(http::Response::new("0".to_owned()), {
494 rooms: BTreeMap::from([(
495 room_id.to_owned(),
496 http::response::Room::default(),
497 )]),
498 extensions: assign!(http::response::Extensions::default(), {
499 account_data: assign!(http::response::AccountData::default(), {
500 global: vec![
501 Raw::from_json_string(
502 json!({
503 "type": "m.push_rules",
504 "content": {
505 "global": {
506 "room": [
507 {
508 "actions": ["notify"],
509 "rule_id": room_id,
510 "default": false,
511 "enabled": true,
512 },
513 ],
514 },
515 },
516 })
517 .to_string(),
518 ).unwrap()
519 ]
520 })
521 })
522 });
523
524 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
525 sliding_sync
526 .handle_response(
527 server_response.clone(),
528 &mut pos_guard,
529 RequestedRequiredStates::default(),
530 )
531 .await?;
532 }
533
534 let room = client.get_room(room_id).unwrap();
536
537 assert_eq!(
539 room.cached_user_defined_notification_mode(),
540 Some(RoomNotificationMode::AllMessages),
541 );
542
543 {
547 let server_response = assign!(http::Response::new("0".to_owned()), {
548 rooms: BTreeMap::from([(
549 room_id.to_owned(),
550 http::response::Room::default(),
551 )]),
552 extensions: assign!(http::response::Extensions::default(), {
553 account_data: assign!(http::response::AccountData::default(), {
554 global: vec![
555 Raw::from_json_string(
556 json!({
557 "type": "m.push_rules",
558 "content": {
559 "global": {
560 "room": [
561 {
562 "actions": [],
563 "rule_id": room_id,
564 "default": false,
565 "enabled": true,
566 },
567 ],
568 },
569 },
570 })
571 .to_string(),
572 ).unwrap()
573 ]
574 })
575 })
576 });
577
578 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
579 sliding_sync
580 .handle_response(
581 server_response.clone(),
582 &mut pos_guard,
583 RequestedRequiredStates::default(),
584 )
585 .await?;
586 }
587
588 assert_eq!(
590 room.cached_user_defined_notification_mode(),
591 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
592 );
593
594 {
598 let server_response = assign!(http::Response::new("0".to_owned()), {
599 extensions: assign!(http::response::Extensions::default(), {
600 account_data: assign!(http::response::AccountData::default(), {
601 global: vec![
602 Raw::from_json_string(
603 json!({
604 "type": "m.push_rules",
605 "content": {
606 "global": {
607 "room": [
608 {
609 "actions": ["notify"],
610 "rule_id": room_id,
611 "default": false,
612 "enabled": true,
613 },
614 ],
615 },
616 },
617 })
618 .to_string(),
619 ).unwrap()
620 ]
621 })
622 })
623 });
624
625 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
626 sliding_sync
627 .handle_response(
628 server_response.clone(),
629 &mut pos_guard,
630 RequestedRequiredStates::default(),
631 )
632 .await?;
633 }
634
635 assert_eq!(
637 room.cached_user_defined_notification_mode(),
638 Some(RoomNotificationMode::AllMessages),
639 );
640
641 Ok(())
642 }
643
644 #[async_test]
645 async fn test_auto_listen_to_latest_events() -> Result<()> {
646 let client = MockClientBuilder::new(None).build().await;
647 let room_id = room_id!("!r0");
648
649 client.base_client().get_or_create_room(room_id, RoomState::Joined);
651
652 client.event_cache().subscribe()?;
654
655 assert!(client.latest_events().await.is_listening_to_room(room_id).await.not());
657
658 let sliding_sync = client
660 .sliding_sync("test")?
661 .add_list(
662 SlidingSyncList::builder("all")
663 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
664 )
665 .build()
666 .await?;
667
668 {
670 let server_response = assign!(http::Response::new("0".to_owned()), {
671 rooms: BTreeMap::from([(
672 room_id.to_owned(),
673 http::response::Room::default(),
674 )]),
675 });
676
677 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
678
679 sliding_sync
680 .handle_response(
681 server_response.clone(),
682 &mut pos_guard,
683 RequestedRequiredStates::default(),
684 )
685 .await?;
686 }
687
688 assert!(client.get_room(room_id).is_some());
690
691 assert!(client.latest_events().await.is_listening_to_room(room_id).await);
693
694 Ok(())
695 }
696
697 #[async_test]
698 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
699 use ruma::api::client::sync::sync_events::v5 as http;
700
701 let client = MockClientBuilder::new(None).build().await;
703 client.event_cache().subscribe().unwrap();
704
705 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
706
707 let room_id = room_id!("!r:e.uk");
709 let room = http::response::Room::new();
710 let mut response = http::Response::new("5".to_owned());
711 response.rooms.insert(room_id.to_owned(), room);
712
713 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
714 processor
715 .handle_room_response(&response, &RequestedRequiredStates::default())
716 .await
717 .expect("Failed to process sync");
718 processor.process_and_take_response().await.expect("Failed to finish processing sync");
719
720 assert_matches!(
722 room_info_notable_update_stream.recv().await,
723 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
724 assert_eq!(received_room_id, room_id);
725 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
726 }
727 );
728 assert_matches!(
729 room_info_notable_update_stream.recv().await,
730 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
731 assert_eq!(received_room_id, room_id);
732 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
733 }
734 );
735 assert!(room_info_notable_update_stream.is_empty());
736
737 let room_id = room_id!("!r:e.uk");
740 let events = vec![
741 make_raw_event("m.room.message", "$3"),
742 make_raw_event("m.room.message", "$4"),
743 make_raw_event("m.read", "$5"),
744 ];
745 let room = assign!(http::response::Room::new(), {
746 timeline: events,
747 });
748 let mut response = http::Response::new("5".to_owned());
749 response.rooms.insert(room_id.to_owned(), room);
750
751 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
752 processor
753 .handle_room_response(&response, &RequestedRequiredStates::default())
754 .await
755 .expect("Failed to process sync");
756 processor.process_and_take_response().await.expect("Failed to finish processing sync");
757
758 assert_matches!(
762 room_info_notable_update_stream.recv().await,
763 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
764 assert_eq!(received_room_id, room_id);
765 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
766 }
767 );
768 assert_matches!(
770 room_info_notable_update_stream.recv().await,
771 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
772 assert_eq!(received_room_id, room_id);
773 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
774 }
775 );
776 assert!(room_info_notable_update_stream.is_empty());
777 }
778
779 fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
780 Raw::from_json_string(
781 json!({
782 "type": event_type,
783 "event_id": id,
784 "content": { "msgtype": "m.text", "body": "my msg" },
785 "sender": "@u:h.uk",
786 "origin_server_ts": 12344445,
787 })
788 .to_string(),
789 )
790 .unwrap()
791 }
792}