1use std::collections::BTreeSet;
2
3use matrix_sdk_base::{sync::SyncResponse, RequestedRequiredStates};
4use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
5use ruma::api::{client::sync::sync_events::v5 as http, FeatureFlag, SupportedVersions};
6use tracing::error;
7
8use super::{SlidingSync, SlidingSyncBuilder};
9use crate::{Client, Result};
10
11#[derive(Clone, Debug)]
13pub enum Version {
14 None,
17
18 Native,
21}
22
23impl Version {
24 #[cfg(test)]
25 pub(crate) fn is_native(&self) -> bool {
26 matches!(self, Self::Native)
27 }
28}
29
30#[derive(thiserror::Error, Debug)]
32pub enum VersionBuilderError {
33 #[error("`.well-known` is not set")]
35 WellKnownNotSet,
36
37 #[error("The `/versions` response is not set")]
39 MissingVersionsResponse,
40
41 #[error(
44 "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
45 or it's not set to true."
46 )]
47 NativeVersionIsUnset,
48}
49
50#[derive(Clone, Debug)]
52pub enum VersionBuilder {
53 None,
55
56 Native,
58
59 DiscoverNative,
63}
64
65impl VersionBuilder {
66 pub(crate) fn needs_get_supported_versions(&self) -> bool {
67 matches!(self, Self::DiscoverNative)
68 }
69
70 pub fn build(
75 self,
76 supported: Option<&SupportedVersions>,
77 ) -> Result<Version, VersionBuilderError> {
78 Ok(match self {
79 Self::None => Version::None,
80
81 Self::Native => Version::Native,
82
83 Self::DiscoverNative => {
84 let Some(supported) = supported else {
85 return Err(VersionBuilderError::MissingVersionsResponse);
86 };
87
88 if supported.features.contains(&FeatureFlag::Msc4186) {
89 Version::Native
90 } else {
91 return Err(VersionBuilderError::NativeVersionIsUnset);
92 }
93 }
94 })
95 }
96}
97
98impl Client {
99 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
107 let supported_versions = self.supported_versions().await.ok();
108
109 [VersionBuilder::DiscoverNative]
110 .into_iter()
111 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
112 .collect()
113 }
114
115 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
120 Ok(SlidingSync::builder(id.into(), self.clone())?)
121 }
122
123 #[cfg(any(test, feature = "testing"))]
129 #[tracing::instrument(skip(self, response))]
130 pub async fn process_sliding_sync_test_helper(
131 &self,
132 response: &http::Response,
133 requested_required_states: &RequestedRequiredStates,
134 ) -> Result<SyncResponse> {
135 let response =
136 self.base_client().process_sliding_sync(response, requested_required_states).await?;
137
138 tracing::debug!("done processing on base_client");
139 self.call_sync_response_handlers(&response).await?;
140
141 Ok(response)
142 }
143}
144
145#[must_use]
151pub(crate) struct SlidingSyncResponseProcessor {
152 client: Client,
153 to_device_events: Vec<ProcessedToDeviceEvent>,
154 response: Option<SyncResponse>,
155}
156
157impl SlidingSyncResponseProcessor {
158 pub fn new(client: Client) -> Self {
159 Self { client, to_device_events: Vec::new(), response: None }
160 }
161
162 #[cfg(feature = "e2e-encryption")]
163 pub async fn handle_encryption(
164 &mut self,
165 extensions: &http::response::Extensions,
166 ) -> Result<()> {
167 assert!(self.response.is_none());
170
171 self.to_device_events = if let Some(to_device_events) = self
172 .client
173 .base_client()
174 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
175 .await?
176 {
177 self.client.encryption().backups().maybe_trigger_backup();
179
180 to_device_events
181 } else {
182 Vec::new()
183 };
184
185 Ok(())
186 }
187
188 pub async fn handle_room_response(
189 &mut self,
190 response: &http::Response,
191 requested_required_states: &RequestedRequiredStates,
192 ) -> Result<()> {
193 let mut sync_response = self
194 .client
195 .base_client()
196 .process_sliding_sync(response, requested_required_states)
197 .await?;
198 handle_receipts_extension(&self.client, response, &mut sync_response).await?;
199
200 update_in_memory_caches(&self.client, &sync_response).await?;
201
202 self.response = Some(sync_response);
203
204 Ok(())
205 }
206
207 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
208 let mut response = self.response.take().unwrap_or_default();
209
210 response.to_device.extend(self.to_device_events);
211
212 self.client.call_sync_response_handlers(&response).await?;
213
214 Ok(response)
215 }
216}
217
218async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
222 for room_id in response.rooms.joined.keys() {
223 let Some(room) = client.get_room(room_id) else {
224 error!(?room_id, "Cannot post process a room in sliding sync because it is missing");
225 continue;
226 };
227
228 room.user_defined_notification_mode().await;
229 }
230
231 Ok(())
232}
233
234async fn handle_receipts_extension(
236 client: &Client,
237 response: &http::Response,
238 sync_response: &mut SyncResponse,
239) -> Result<()> {
240 let room_ids = BTreeSet::from_iter(
243 sync_response
244 .rooms
245 .joined
246 .keys()
247 .cloned()
248 .chain(response.extensions.receipts.rooms.keys().cloned()),
249 );
250
251 for room_id in room_ids {
252 let Ok((room_event_cache, _drop_handle)) = client.event_cache().for_room(&room_id).await
253 else {
254 tracing::info!(
255 ?room_id,
256 "Failed to fetch the `RoomEventCache` when computing unread counts"
257 );
258
259 continue;
260 };
261
262 let previous_events = room_event_cache.events().await;
263
264 client
265 .base_client()
266 .process_sliding_sync_receipts_extension_for_room(
267 &room_id,
268 response,
269 sync_response,
270 previous_events,
271 )
272 .await?;
273 }
274 Ok(())
275}
276
277#[cfg(all(test, not(target_family = "wasm")))]
278mod tests {
279 use std::collections::BTreeMap;
280
281 use assert_matches::assert_matches;
282 use matrix_sdk_base::{
283 notification_settings::RoomNotificationMode, RequestedRequiredStates,
284 RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
285 };
286 use matrix_sdk_test::async_test;
287 use ruma::{
288 api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
289 room_id, serde::Raw,
290 };
291 use serde_json::json;
292
293 use super::{Version, VersionBuilder};
294 use crate::{
295 error::Result,
296 sliding_sync::{client::SlidingSyncResponseProcessor, http, VersionBuilderError},
297 test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
298 SlidingSyncList, SlidingSyncMode,
299 };
300
301 #[test]
302 fn test_version_builder_none() {
303 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
304 }
305
306 #[test]
307 fn test_version_builder_native() {
308 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
309 }
310
311 #[test]
312 fn test_version_builder_discover_native() {
313 let mut response = get_supported_versions::Response::new(vec![]);
314 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
315
316 assert_matches!(
317 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
318 Ok(Version::Native)
319 );
320 }
321
322 #[test]
323 fn test_version_builder_discover_native_no_supported_versions() {
324 assert_matches!(
325 VersionBuilder::DiscoverNative.build(None),
326 Err(VersionBuilderError::MissingVersionsResponse)
327 );
328 }
329
330 #[test]
331 fn test_version_builder_discover_native_unstable_features_is_disabled() {
332 let mut response = get_supported_versions::Response::new(vec![]);
333 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
334
335 assert_matches!(
336 VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
337 Err(VersionBuilderError::NativeVersionIsUnset)
338 );
339 }
340
341 #[async_test]
342 async fn test_available_sliding_sync_versions_none() {
343 let client = MockClientBuilder::new(None).build().await;
344 let available_versions = client.available_sliding_sync_versions().await;
345
346 assert!(available_versions.is_empty());
349 }
350
351 #[async_test]
352 async fn test_available_sliding_sync_versions_native() {
353 let server = MatrixMockServer::new().await;
354 let client = server.client_builder().no_server_versions().build().await;
355
356 server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
357
358 let available_versions = client.available_sliding_sync_versions().await;
359
360 assert_eq!(available_versions.len(), 1);
362 assert_matches!(available_versions[0], Version::Native);
363 }
364
365 #[async_test]
366 async fn test_cache_user_defined_notification_mode() -> Result<()> {
367 let client = MockClientBuilder::new(None).build().await;
368 let room_id = room_id!("!r0:matrix.org");
369
370 let sliding_sync = client
371 .sliding_sync("test")?
372 .with_account_data_extension(
373 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
374 )
375 .add_list(
376 SlidingSyncList::builder("all")
377 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
378 )
379 .build()
380 .await?;
381
382 {
385 let server_response = assign!(http::Response::new("0".to_owned()), {
386 rooms: BTreeMap::from([(
387 room_id.to_owned(),
388 http::response::Room::default(),
389 )]),
390 extensions: assign!(http::response::Extensions::default(), {
391 account_data: assign!(http::response::AccountData::default(), {
392 global: vec![
393 Raw::from_json_string(
394 json!({
395 "type": "m.push_rules",
396 "content": {
397 "global": {
398 "room": [
399 {
400 "actions": ["notify"],
401 "rule_id": room_id,
402 "default": false,
403 "enabled": true,
404 },
405 ],
406 },
407 },
408 })
409 .to_string(),
410 ).unwrap()
411 ]
412 })
413 })
414 });
415
416 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
417 sliding_sync
418 .handle_response(
419 server_response.clone(),
420 &mut pos_guard,
421 RequestedRequiredStates::default(),
422 )
423 .await?;
424 }
425
426 let room = client.get_room(room_id).unwrap();
428
429 assert_eq!(
431 room.cached_user_defined_notification_mode(),
432 Some(RoomNotificationMode::AllMessages),
433 );
434
435 {
439 let server_response = assign!(http::Response::new("0".to_owned()), {
440 rooms: BTreeMap::from([(
441 room_id.to_owned(),
442 http::response::Room::default(),
443 )]),
444 extensions: assign!(http::response::Extensions::default(), {
445 account_data: assign!(http::response::AccountData::default(), {
446 global: vec![
447 Raw::from_json_string(
448 json!({
449 "type": "m.push_rules",
450 "content": {
451 "global": {
452 "room": [
453 {
454 "actions": [],
455 "rule_id": room_id,
456 "default": false,
457 "enabled": true,
458 },
459 ],
460 },
461 },
462 })
463 .to_string(),
464 ).unwrap()
465 ]
466 })
467 })
468 });
469
470 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
471 sliding_sync
472 .handle_response(
473 server_response.clone(),
474 &mut pos_guard,
475 RequestedRequiredStates::default(),
476 )
477 .await?;
478 }
479
480 assert_eq!(
482 room.cached_user_defined_notification_mode(),
483 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
484 );
485
486 Ok(())
487 }
488
489 #[async_test]
490 async fn test_read_receipt_can_trigger_a_notable_update_reason() {
491 use ruma::api::client::sync::sync_events::v5 as http;
492
493 let client = MockClientBuilder::new(None).build().await;
495 client.event_cache().subscribe().unwrap();
496
497 let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
498
499 let room_id = room_id!("!r:e.uk");
501 let room = http::response::Room::new();
502 let mut response = http::Response::new("5".to_owned());
503 response.rooms.insert(room_id.to_owned(), room);
504
505 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
506 processor
507 .handle_room_response(&response, &RequestedRequiredStates::default())
508 .await
509 .expect("Failed to process sync");
510 processor.process_and_take_response().await.expect("Failed to finish processing sync");
511
512 assert_matches!(
514 room_info_notable_update_stream.recv().await,
515 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
516 assert_eq!(received_room_id, room_id);
517 assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
518 }
519 );
520 assert_matches!(
521 room_info_notable_update_stream.recv().await,
522 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
523 assert_eq!(received_room_id, room_id);
524 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
525 }
526 );
527 assert!(room_info_notable_update_stream.is_empty());
528
529 let room_id = room_id!("!r:e.uk");
532 let events = vec![
533 make_raw_event("m.room.message", "$3"),
534 make_raw_event("m.room.message", "$4"),
535 make_raw_event("m.read", "$5"),
536 ];
537 let room = assign!(http::response::Room::new(), {
538 timeline: events,
539 });
540 let mut response = http::Response::new("5".to_owned());
541 response.rooms.insert(room_id.to_owned(), room);
542
543 let mut processor = SlidingSyncResponseProcessor::new(client.clone());
544 processor
545 .handle_room_response(&response, &RequestedRequiredStates::default())
546 .await
547 .expect("Failed to process sync");
548 processor.process_and_take_response().await.expect("Failed to finish processing sync");
549
550 assert_matches!(
554 room_info_notable_update_stream.recv().await,
555 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
556 assert_eq!(received_room_id, room_id);
557 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
558 }
559 );
560 assert_matches!(
562 room_info_notable_update_stream.recv().await,
563 Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
564 assert_eq!(received_room_id, room_id);
565 assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
566 }
567 );
568 assert!(room_info_notable_update_stream.is_empty());
569 }
570
571 fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
572 Raw::from_json_string(
573 json!({
574 "type": event_type,
575 "event_id": id,
576 "content": { "msgtype": "m.text", "body": "my msg" },
577 "sender": "@u:h.uk",
578 "origin_server_ts": 12344445,
579 })
580 .to_string(),
581 )
582 .unwrap()
583 }
584}