1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5 RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9 OwnedRoomId,
10 api::{
11 FeatureFlag, SupportedVersions,
12 client::sync::sync_events::v5::{self as http, response},
13 },
14 events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result};
20
21#[derive(Clone, Debug)]
23pub enum Version {
24 None,
27
28 Native,
31}
32
33impl Version {
34 #[cfg(test)]
35 pub(crate) fn is_native(&self) -> bool {
36 matches!(self, Self::Native)
37 }
38}
39
40#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43 #[error("`.well-known` is not set")]
45 WellKnownNotSet,
46
47 #[error("The `/versions` response is not set")]
49 MissingVersionsResponse,
50
51 #[error(
54 "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55 or it's not set to true."
56 )]
57 NativeVersionIsUnset,
58}
59
60#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63 None,
65
66 Native,
68
69 DiscoverNative,
73}
74
75impl VersionBuilder {
76 pub(crate) fn needs_get_supported_versions(&self) -> bool {
77 matches!(self, Self::DiscoverNative)
78 }
79
80 pub fn build(
85 self,
86 supported: Option<&SupportedVersions>,
87 ) -> Result<Version, VersionBuilderError> {
88 Ok(match self {
89 Self::None => Version::None,
90
91 Self::Native => Version::Native,
92
93 Self::DiscoverNative => {
94 let Some(supported) = supported else {
95 return Err(VersionBuilderError::MissingVersionsResponse);
96 };
97
98 if supported.features.contains(&FeatureFlag::Msc4186) {
99 Version::Native
100 } else {
101 return Err(VersionBuilderError::NativeVersionIsUnset);
102 }
103 }
104 })
105 }
106}
107
108impl Client {
109 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117 let supported_versions = self.supported_versions().await.ok();
118
119 [VersionBuilder::DiscoverNative]
120 .into_iter()
121 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122 .collect()
123 }
124
125 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130 Ok(SlidingSync::builder(id.into(), self.clone())?)
131 }
132
133 #[cfg(any(test, feature = "testing"))]
139 #[tracing::instrument(skip(self, response))]
140 pub async fn process_sliding_sync_test_helper(
141 &self,
142 response: &http::Response,
143 requested_required_states: &RequestedRequiredStates,
144 ) -> Result<SyncResponse> {
145 let response =
146 self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148 tracing::debug!("done processing on base_client");
149 self.call_sync_response_handlers(&response).await?;
150
151 Ok(response)
152 }
153}
154
155#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162 client: Client,
163 to_device_events: Vec<ProcessedToDeviceEvent>,
164 response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168 pub fn new(client: Client) -> Self {
169 Self { client, to_device_events: Vec::new(), response: None }
170 }
171
172 #[cfg(feature = "e2e-encryption")]
173 pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174 assert!(self.response.is_none());
177
178 self.to_device_events = if let Some(to_device_events) = self
179 .client
180 .base_client()
181 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182 .await?
183 {
184 self.client.encryption().backups().maybe_trigger_backup();
186
187 to_device_events
188 } else {
189 Vec::new()
190 };
191
192 Ok(())
193 }
194
195 pub async fn handle_room_response(
196 &mut self,
197 response: &http::Response,
198 requested_required_states: &RequestedRequiredStates,
199 ) -> Result<()> {
200 let previously_joined_rooms = self
201 .client
202 .joined_rooms()
203 .into_iter()
204 .map(|r| r.room_id().to_owned())
205 .collect::<BTreeSet<_>>();
206
207 let mut sync_response = self
208 .client
209 .base_client()
210 .process_sliding_sync(response, requested_required_states)
211 .await?;
212
213 handle_receipts_extension(&self.client, response, &mut sync_response).await?;
214
215 update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
216
217 self.response = Some(sync_response);
218
219 Ok(())
220 }
221
222 pub async fn handle_thread_subscriptions(
223 &mut self,
224 previous_pos: Option<&str>,
225 thread_subs: response::ThreadSubscriptions,
226 ) -> Result<()> {
227 let catchup_token =
228 thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
229 from: prev_batch,
230 to: previous_pos.map(|s| s.to_owned()),
231 });
232
233 self.client
234 .thread_subscription_catchup()
235 .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
236 .await?;
237
238 Ok(())
239 }
240
241 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
242 let mut response = self.response.take().unwrap_or_default();
243
244 response.to_device.extend(self.to_device_events);
245
246 self.client.call_sync_response_handlers(&response).await?;
247
248 Ok(response)
249 }
250}
251
252async fn update_in_memory_caches(
256 client: &Client,
257 previously_joined_rooms: &BTreeSet<OwnedRoomId>,
258 response: &SyncResponse,
259) {
260 let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
261
262 if response.account_data.iter().any(|event| {
265 event
266 .get_field::<GlobalAccountDataEventType>("type")
267 .ok()
268 .flatten()
269 .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
270 }) {
271 let notification_settings = client.notification_settings().await;
272 let rules = notification_settings.rules().await;
273
274 for room in client.joined_rooms() {
276 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
277 room.update_cached_user_defined_notification_mode(mode);
278 }
279 }
280 } else {
281 let mut rules = None;
286
287 for room_id in response
288 .rooms
289 .joined
290 .keys()
291 .filter(|room_id| !previously_joined_rooms.contains(*room_id))
292 {
293 let Some(room) = client.get_room(room_id) else {
294 error!(?room_id, "The room must exist since it has been joined");
295 continue;
296 };
297
298 let rules = if let Some(rules) = &mut rules {
300 rules
301 } else {
302 rules.insert(client.notification_settings().await.rules().await.clone())
303 };
304
305 if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
307 room.update_cached_user_defined_notification_mode(mode);
308 }
309 }
310 }
311}
312
313async fn handle_receipts_extension(
315 client: &Client,
316 response: &http::Response,
317 sync_response: &mut SyncResponse,
318) -> Result<()> {
319 let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
320
321 let room_ids = BTreeSet::from_iter(
324 sync_response
325 .rooms
326 .joined
327 .keys()
328 .cloned()
329 .chain(response.extensions.receipts.rooms.keys().cloned()),
330 );
331
332 let futures = room_ids.into_iter().map(|room_id| {
334 let new_sync_events = sync_response
335 .rooms
336 .joined
337 .entry(room_id.to_owned())
338 .or_default()
339 .timeline
340 .events
341 .clone();
342
343 async {
344 let Ok((room_event_cache, _drop_handle)) =
345 client.event_cache().for_room(&room_id).await
346 else {
347 tracing::info!(
348 ?room_id,
349 "Failed to fetch the `RoomEventCache` when computing unread counts"
350 );
351 return Ok::<_, crate::Error>(None);
352 };
353
354 let previous_events = room_event_cache.events().await;
355
356 let receipt_event = client
357 .base_client()
358 .process_sliding_sync_receipts_extension_for_room(
359 &room_id,
360 response,
361 new_sync_events,
362 previous_events,
363 )
364 .await?;
365
366 Ok(Some((room_id, receipt_event)))
367 }
368 });
369
370 let updates = try_join_all(futures).await?;
371
372 for (room_id, receipt_event_content) in updates.into_iter().flatten() {
373 if let Some(event) = receipt_event_content {
374 sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
375 }
376 }
377
378 Ok(())
379}
380
381#[cfg(all(test, not(target_family = "wasm")))]
382mod tests {
383 use std::collections::BTreeMap;
384
385 use assert_matches::assert_matches;
386 use matrix_sdk_base::{
387 RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
388 notification_settings::RoomNotificationMode,
389 };
390 use matrix_sdk_test::async_test;
391 use ruma::{
392 api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
393 room_id, serde::Raw,
394 };
395 use serde_json::json;
396
397 use super::{Version, VersionBuilder};
398 use crate::{
399 SlidingSyncList, SlidingSyncMode,
400 error::Result,
401 sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
402 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
403 };
404
405 #[test]
406 fn test_version_builder_none() {
407 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
408 }
409
410 #[test]
411 fn test_version_builder_native() {
412 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
413 }
414
415 #[test]
416 fn test_version_builder_discover_native() {
417 let mut response = get_supported_versions::Response::new(vec![]);
418 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
419
420 assert_matches!(
421 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
422 Ok(Version::Native)
423 );
424 }
425
426 #[test]
427 fn test_version_builder_discover_native_no_supported_versions() {
428 assert_matches!(
429 VersionBuilder::DiscoverNative.build(None),
430 Err(VersionBuilderError::MissingVersionsResponse)
431 );
432 }
433
434 #[test]
435 fn test_version_builder_discover_native_unstable_features_is_disabled() {
436 let mut response = get_supported_versions::Response::new(vec![]);
437 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
438
439 assert_matches!(
440 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
441 Err(VersionBuilderError::NativeVersionIsUnset)
442 );
443 }
444
445 #[async_test]
446 async fn test_available_sliding_sync_versions_none() {
447 let client = MockClientBuilder::new(None).build().await;
448 let available_versions = client.available_sliding_sync_versions().await;
449
450 assert!(available_versions.is_empty());
453 }
454
455 #[async_test]
456 async fn test_available_sliding_sync_versions_native() {
457 let server = MatrixMockServer::new().await;
458 let client = server.client_builder().no_server_versions().build().await;
459
460 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
461
462 let available_versions = client.available_sliding_sync_versions().await;
463
464 assert_eq!(available_versions.len(), 1);
466 assert_matches!(available_versions[0], Version::Native);
467 }
468
469 #[async_test]
470 async fn test_cache_user_defined_notification_mode() -> Result<()> {
471 let client = MockClientBuilder::new(None).build().await;
472 let room_id = room_id!("!r0:matrix.org");
473
474 let sliding_sync = client
475 .sliding_sync("test")?
476 .with_account_data_extension(
477 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
478 )
479 .add_list(
480 SlidingSyncList::builder("all")
481 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
482 )
483 .build()
484 .await?;
485
486 {
489 let server_response = assign!(http::Response::new("0".to_owned()), {
490 rooms: BTreeMap::from([(
491 room_id.to_owned(),
492 http::response::Room::default(),
493 )]),
494 extensions: assign!(http::response::Extensions::default(), {
495 account_data: assign!(http::response::AccountData::default(), {
496 global: vec![
497 Raw::from_json_string(
498 json!({
499 "type": "m.push_rules",
500 "content": {
501 "global": {
502 "room": [
503 {
504 "actions": ["notify"],
505 "rule_id": room_id,
506 "default": false,
507 "enabled": true,
508 },
509 ],
510 },
511 },
512 })
513 .to_string(),
514 ).unwrap()
515 ]
516 })
517 })
518 });
519
520 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
521 sliding_sync
522 .handle_response(
523 server_response.clone(),
524 &mut pos_guard,
525 RequestedRequiredStates::default(),
526 )
527 .await?;
528 }
529
530 let room = client.get_room(room_id).unwrap();
532
533 assert_eq!(
535 room.cached_user_defined_notification_mode(),
536 Some(RoomNotificationMode::AllMessages),
537 );
538
539 {
543 let server_response = assign!(http::Response::new("0".to_owned()), {
544 rooms: BTreeMap::from([(
545 room_id.to_owned(),
546 http::response::Room::default(),
547 )]),
548 extensions: assign!(http::response::Extensions::default(), {
549 account_data: assign!(http::response::AccountData::default(), {
550 global: vec![
551 Raw::from_json_string(
552 json!({
553 "type": "m.push_rules",
554 "content": {
555 "global": {
556 "room": [
557 {
558 "actions": [],
559 "rule_id": room_id,
560 "default": false,
561 "enabled": true,
562 },
563 ],
564 },
565 },
566 })
567 .to_string(),
568 ).unwrap()
569 ]
570 })
571 })
572 });
573
574 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
575 sliding_sync
576 .handle_response(
577 server_response.clone(),
578 &mut pos_guard,
579 RequestedRequiredStates::default(),
580 )
581 .await?;
582 }
583
584 assert_eq!(
586 room.cached_user_defined_notification_mode(),
587 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
588 );
589
590 {
594 let server_response = assign!(http::Response::new("0".to_owned()), {
595 extensions: assign!(http::response::Extensions::default(), {
596 account_data: assign!(http::response::AccountData::default(), {
597 global: vec![
598 Raw::from_json_string(
599 json!({
600 "type": "m.push_rules",
601 "content": {
602 "global": {
603 "room": [
604 {
605 "actions": ["notify"],
606 "rule_id": room_id,
607 "default": false,
608 "enabled": true,
609 },
610 ],
611 },
612 },
613 })
614 .to_string(),
615 ).unwrap()
616 ]
617 })
618 })
619 });
620
621 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
622 sliding_sync
623 .handle_response(
624 server_response.clone(),
625 &mut pos_guard,
626 RequestedRequiredStates::default(),
627 )
628 .await?;
629 }
630
631 assert_eq!(
633 room.cached_user_defined_notification_mode(),
634 Some(RoomNotificationMode::AllMessages),
635 );
636
637 Ok(())
638 }
639
640 #[async_test]
641 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
642 use ruma::api::client::sync::sync_events::v5 as http;
643
644 let client = MockClientBuilder::new(None).build().await;
646 client.event_cache().subscribe().unwrap();
647
648 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
649
650 let room_id = room_id!("!r:e.uk");
652 let room = http::response::Room::new();
653 let mut response = http::Response::new("5".to_owned());
654 response.rooms.insert(room_id.to_owned(), room);
655
656 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
657 processor
658 .handle_room_response(&response, &RequestedRequiredStates::default())
659 .await
660 .expect("Failed to process sync");
661 processor.process_and_take_response().await.expect("Failed to finish processing sync");
662
663 assert_matches!(
665 room_info_notable_update_stream.recv().await,
666 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
667 assert_eq!(received_room_id, room_id);
668 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
669 }
670 );
671 assert_matches!(
672 room_info_notable_update_stream.recv().await,
673 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
674 assert_eq!(received_room_id, room_id);
675 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
676 }
677 );
678 assert!(room_info_notable_update_stream.is_empty());
679
680 let room_id = room_id!("!r:e.uk");
683 let events = vec![
684 make_raw_event("m.room.message", "$3"),
685 make_raw_event("m.room.message", "$4"),
686 make_raw_event("m.read", "$5"),
687 ];
688 let room = assign!(http::response::Room::new(), {
689 timeline: events,
690 });
691 let mut response = http::Response::new("5".to_owned());
692 response.rooms.insert(room_id.to_owned(), room);
693
694 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
695 processor
696 .handle_room_response(&response, &RequestedRequiredStates::default())
697 .await
698 .expect("Failed to process sync");
699 processor.process_and_take_response().await.expect("Failed to finish processing sync");
700
701 assert_matches!(
705 room_info_notable_update_stream.recv().await,
706 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
707 assert_eq!(received_room_id, room_id);
708 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
709 }
710 );
711 assert_matches!(
713 room_info_notable_update_stream.recv().await,
714 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
715 assert_eq!(received_room_id, room_id);
716 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
717 }
718 );
719 assert!(room_info_notable_update_stream.is_empty());
720 }
721
722 fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
723 Raw::from_json_string(
724 json!({
725 "type": event_type,
726 "event_id": id,
727 "content": { "msgtype": "m.text", "body": "my msg" },
728 "sender": "@u:h.uk",
729 "origin_server_ts": 12344445,
730 })
731 .to_string(),
732 )
733 .unwrap()
734 }
735}