1use std::collections::BTreeSet;
2
3use matrix_sdk_base::{sync::SyncResponse, RequestedRequiredStates};
4use ruma::{
5 api::client::{discovery::get_supported_versions, sync::sync_events::v5 as http},
6 events::AnyToDeviceEvent,
7 serde::Raw,
8};
9use tracing::error;
10
11use super::{SlidingSync, SlidingSyncBuilder};
12use crate::{Client, Result};
13
14#[derive(Clone, Debug)]
16pub enum Version {
17 None,
20
21 Native,
24}
25
26impl Version {
27 #[cfg(test)]
28 pub(crate) fn is_native(&self) -> bool {
29 matches!(self, Self::Native)
30 }
31}
32
33#[derive(thiserror::Error, Debug)]
35pub enum VersionBuilderError {
36 #[error("`.well-known` is not set")]
38 WellKnownNotSet,
39
40 #[error("The `/versions` response is not set")]
42 MissingVersionsResponse,
43
44 #[error("`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, or it's not set to true.")]
47 NativeVersionIsUnset,
48}
49
50#[derive(Clone, Debug)]
52pub enum VersionBuilder {
53 None,
55
56 Native,
58
59 DiscoverNative,
63}
64
65impl VersionBuilder {
66 pub(crate) fn needs_get_supported_versions(&self) -> bool {
67 matches!(self, Self::DiscoverNative)
68 }
69
70 pub fn build(
75 self,
76 versions: Option<&get_supported_versions::Response>,
77 ) -> Result<Version, VersionBuilderError> {
78 Ok(match self {
79 Self::None => Version::None,
80
81 Self::Native => Version::Native,
82
83 Self::DiscoverNative => {
84 let Some(versions) = versions else {
85 return Err(VersionBuilderError::MissingVersionsResponse);
86 };
87
88 match versions.unstable_features.get("org.matrix.simplified_msc3575") {
89 Some(value) if *value => Version::Native,
90 _ => return Err(VersionBuilderError::NativeVersionIsUnset),
91 }
92 }
93 })
94 }
95}
96
97impl Client {
98 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
106 let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
107 let mut response = get_supported_versions::Response::new(vec![]);
108 response.unstable_features = unstable_features;
109
110 response
111 });
112
113 [VersionBuilder::DiscoverNative]
114 .into_iter()
115 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
116 .collect()
117 }
118
119 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
124 Ok(SlidingSync::builder(id.into(), self.clone())?)
125 }
126
127 #[cfg(any(test, feature = "testing"))]
133 #[tracing::instrument(skip(self, response))]
134 pub async fn process_sliding_sync_test_helper(
135 &self,
136 response: &http::Response,
137 requested_required_states: &RequestedRequiredStates,
138 ) -> Result<SyncResponse> {
139 let response =
140 self.base_client().process_sliding_sync(response, requested_required_states).await?;
141
142 tracing::debug!("done processing on base_client");
143 self.call_sync_response_handlers(&response).await?;
144
145 Ok(response)
146 }
147}
148
149#[must_use]
155pub(crate) struct SlidingSyncResponseProcessor {
156 client: Client,
157 to_device_events: Vec<Raw<AnyToDeviceEvent>>,
158 response: Option<SyncResponse>,
159}
160
161impl SlidingSyncResponseProcessor {
162 pub fn new(client: Client) -> Self {
163 Self { client, to_device_events: Vec::new(), response: None }
164 }
165
166 #[cfg(feature = "e2e-encryption")]
167 pub async fn handle_encryption(
168 &mut self,
169 extensions: &http::response::Extensions,
170 ) -> Result<()> {
171 assert!(self.response.is_none());
174
175 self.to_device_events = if let Some(to_device_events) = self
176 .client
177 .base_client()
178 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
179 .await?
180 {
181 self.client.encryption().backups().maybe_trigger_backup();
183
184 to_device_events
185 } else {
186 Vec::new()
187 };
188
189 Ok(())
190 }
191
192 pub async fn handle_room_response(
193 &mut self,
194 response: &http::Response,
195 requested_required_states: &RequestedRequiredStates,
196 ) -> Result<()> {
197 let mut sync_response = self
198 .client
199 .base_client()
200 .process_sliding_sync(response, requested_required_states)
201 .await?;
202 handle_receipts_extension(&self.client, response, &mut sync_response).await?;
203
204 self.response = Some(sync_response);
205 self.post_process().await
206 }
207
208 async fn post_process(&mut self) -> Result<()> {
209 let response = self.response.as_ref().unwrap();
212
213 update_in_memory_caches(&self.client, response).await?;
214
215 Ok(())
216 }
217
218 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
219 let mut response = self.response.take().unwrap_or_default();
220
221 response.to_device.extend(self.to_device_events);
222
223 self.client.call_sync_response_handlers(&response).await?;
224
225 Ok(response)
226 }
227}
228
229async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
233 for room_id in response.rooms.joined.keys() {
234 let Some(room) = client.get_room(room_id) else {
235 error!(?room_id, "Cannot post process a room in sliding sync because it is missing");
236 continue;
237 };
238
239 room.user_defined_notification_mode().await;
240 }
241
242 Ok(())
243}
244
245async fn handle_receipts_extension(
247 client: &Client,
248 response: &http::Response,
249 sync_response: &mut SyncResponse,
250) -> Result<()> {
251 let room_ids = BTreeSet::from_iter(
254 sync_response
255 .rooms
256 .joined
257 .keys()
258 .cloned()
259 .chain(response.extensions.receipts.rooms.keys().cloned()),
260 );
261
262 for room_id in room_ids {
263 let Ok((room_event_cache, _drop_handle)) = client.event_cache().for_room(&room_id).await
264 else {
265 tracing::info!(
266 ?room_id,
267 "Failed to fetch the `RoomEventCache` when computing unread counts"
268 );
269
270 continue;
271 };
272
273 let previous_events = room_event_cache.events().await;
274
275 client
276 .base_client()
277 .process_sliding_sync_receipts_extension_for_room(
278 &room_id,
279 response,
280 sync_response,
281 previous_events,
282 )
283 .await?;
284 }
285 Ok(())
286}
287
288#[cfg(all(test, not(target_family = "wasm")))]
289mod tests {
290 use std::collections::BTreeMap;
291
292 use assert_matches::assert_matches;
293 use matrix_sdk_base::{
294 notification_settings::RoomNotificationMode, RequestedRequiredStates,
295 RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
296 };
297 use matrix_sdk_test::async_test;
298 use ruma::{assign, events::AnySyncTimelineEvent, room_id, serde::Raw};
299 use serde_json::json;
300 use wiremock::{
301 matchers::{method, path},
302 Mock, ResponseTemplate,
303 };
304
305 use super::{get_supported_versions, Version, VersionBuilder};
306 use crate::{
307 error::Result,
308 sliding_sync::{client::SlidingSyncResponseProcessor, http, VersionBuilderError},
309 test_utils::{logged_in_client, logged_in_client_with_server},
310 SlidingSyncList, SlidingSyncMode,
311 };
312
313 #[test]
314 fn test_version_builder_none() {
315 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
316 }
317
318 #[test]
319 fn test_version_builder_native() {
320 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
321 }
322
323 #[test]
324 fn test_version_builder_discover_native() {
325 let mut response = get_supported_versions::Response::new(vec![]);
326 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
327
328 assert_matches!(VersionBuilder::DiscoverNative.build(Some(&response)), Ok(Version::Native));
329 }
330
331 #[test]
332 fn test_version_builder_discover_native_no_supported_versions() {
333 assert_matches!(
334 VersionBuilder::DiscoverNative.build(None),
335 Err(VersionBuilderError::MissingVersionsResponse)
336 );
337 }
338
339 #[test]
340 fn test_version_builder_discover_native_unstable_features_is_disabled() {
341 let mut response = get_supported_versions::Response::new(vec![]);
342 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
343
344 assert_matches!(
345 VersionBuilder::DiscoverNative.build(Some(&response)),
346 Err(VersionBuilderError::NativeVersionIsUnset)
347 );
348 }
349
350 #[async_test]
351 async fn test_available_sliding_sync_versions_none() {
352 let (client, _server) = logged_in_client_with_server().await;
353 let available_versions = client.available_sliding_sync_versions().await;
354
355 assert!(available_versions.is_empty());
358 }
359
360 #[async_test]
361 async fn test_available_sliding_sync_versions_native() {
362 let (client, server) = logged_in_client_with_server().await;
363
364 Mock::given(method("GET"))
365 .and(path("/_matrix/client/versions"))
366 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
367 "versions": [],
368 "unstable_features": {
369 "org.matrix.simplified_msc3575": true,
370 },
371 })))
372 .mount(&server)
373 .await;
374
375 let available_versions = client.available_sliding_sync_versions().await;
376
377 assert_eq!(available_versions.len(), 1);
379 assert_matches!(available_versions[0], Version::Native);
380 }
381
382 #[async_test]
383 async fn test_cache_user_defined_notification_mode() -> Result<()> {
384 let (client, _server) = logged_in_client_with_server().await;
385 let room_id = room_id!("!r0:matrix.org");
386
387 let sliding_sync = client
388 .sliding_sync("test")?
389 .with_account_data_extension(
390 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
391 )
392 .add_list(
393 SlidingSyncList::builder("all")
394 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
395 )
396 .build()
397 .await?;
398
399 {
402 let server_response = assign!(http::Response::new("0".to_owned()), {
403 rooms: BTreeMap::from([(
404 room_id.to_owned(),
405 http::response::Room::default(),
406 )]),
407 extensions: assign!(http::response::Extensions::default(), {
408 account_data: assign!(http::response::AccountData::default(), {
409 global: vec![
410 Raw::from_json_string(
411 json!({
412 "type": "m.push_rules",
413 "content": {
414 "global": {
415 "room": [
416 {
417 "actions": ["notify"],
418 "rule_id": room_id,
419 "default": false,
420 "enabled": true,
421 },
422 ],
423 },
424 },
425 })
426 .to_string(),
427 ).unwrap()
428 ]
429 })
430 })
431 });
432
433 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
434 sliding_sync
435 .handle_response(
436 server_response.clone(),
437 &mut pos_guard,
438 RequestedRequiredStates::default(),
439 )
440 .await?;
441 }
442
443 let room = client.get_room(room_id).unwrap();
445
446 assert_eq!(
448 room.cached_user_defined_notification_mode(),
449 Some(RoomNotificationMode::AllMessages),
450 );
451
452 {
456 let server_response = assign!(http::Response::new("0".to_owned()), {
457 rooms: BTreeMap::from([(
458 room_id.to_owned(),
459 http::response::Room::default(),
460 )]),
461 extensions: assign!(http::response::Extensions::default(), {
462 account_data: assign!(http::response::AccountData::default(), {
463 global: vec![
464 Raw::from_json_string(
465 json!({
466 "type": "m.push_rules",
467 "content": {
468 "global": {
469 "room": [
470 {
471 "actions": [],
472 "rule_id": room_id,
473 "default": false,
474 "enabled": true,
475 },
476 ],
477 },
478 },
479 })
480 .to_string(),
481 ).unwrap()
482 ]
483 })
484 })
485 });
486
487 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
488 sliding_sync
489 .handle_response(
490 server_response.clone(),
491 &mut pos_guard,
492 RequestedRequiredStates::default(),
493 )
494 .await?;
495 }
496
497 assert_eq!(
499 room.cached_user_defined_notification_mode(),
500 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
501 );
502
503 Ok(())
504 }
505
506 #[async_test]
507 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
508 use ruma::api::client::sync::sync_events::v5 as http;
509
510 let client = logged_in_client(None).await;
512 client.event_cache().subscribe().unwrap();
513
514 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
515
516 let room_id = room_id!("!r:e.uk");
518 let room = http::response::Room::new();
519 let mut response = http::Response::new("5".to_owned());
520 response.rooms.insert(room_id.to_owned(), room);
521
522 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
523 processor
524 .handle_room_response(&response, &RequestedRequiredStates::default())
525 .await
526 .expect("Failed to process sync");
527 processor.process_and_take_response().await.expect("Failed to finish processing sync");
528
529 assert_matches!(
531 room_info_notable_update_stream.recv().await,
532 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
533 assert_eq!(received_room_id, room_id);
534 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
535 }
536 );
537 assert_matches!(
538 room_info_notable_update_stream.recv().await,
539 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
540 assert_eq!(received_room_id, room_id);
541 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
542 }
543 );
544 assert!(room_info_notable_update_stream.is_empty());
545
546 let room_id = room_id!("!r:e.uk");
549 let events = vec![
550 make_raw_event("m.room.message", "$3"),
551 make_raw_event("m.room.message", "$4"),
552 make_raw_event("m.read", "$5"),
553 ];
554 let room = assign!(http::response::Room::new(), {
555 timeline: events,
556 });
557 let mut response = http::Response::new("5".to_owned());
558 response.rooms.insert(room_id.to_owned(), room);
559
560 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
561 processor
562 .handle_room_response(&response, &RequestedRequiredStates::default())
563 .await
564 .expect("Failed to process sync");
565 processor.process_and_take_response().await.expect("Failed to finish processing sync");
566
567 assert_matches!(
571 room_info_notable_update_stream.recv().await,
572 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
573 assert_eq!(received_room_id, room_id);
574 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
575 }
576 );
577 assert_matches!(
579 room_info_notable_update_stream.recv().await,
580 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
581 assert_eq!(received_room_id, room_id);
582 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
583 }
584 );
585 assert!(room_info_notable_update_stream.is_empty());
586 }
587
588 fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
589 Raw::from_json_string(
590 json!({
591 "type": event_type,
592 "event_id": id,
593 "content": { "msgtype": "m.text", "body": "my msg" },
594 "sender": "@u:h.uk",
595 "origin_server_ts": 12344445,
596 })
597 .to_string(),
598 )
599 .unwrap()
600 }
601}