1use std::collections::BTreeMap;
2
3use imbl::Vector;
4use matrix_sdk_base::{sync::SyncResponse, PreviousEventsProvider, RequestedRequiredStates};
5use ruma::{
6 api::client::{discovery::get_supported_versions, sync::sync_events::v5 as http},
7 events::AnyToDeviceEvent,
8 serde::Raw,
9 OwnedRoomId,
10};
11use tracing::error;
12
13use super::{SlidingSync, SlidingSyncBuilder};
14use crate::{Client, Result, SlidingSyncRoom};
15
16#[derive(Clone, Debug)]
18pub enum Version {
19 None,
22
23 Native,
26}
27
28impl Version {
29 #[cfg(test)]
30 pub(crate) fn is_native(&self) -> bool {
31 matches!(self, Self::Native)
32 }
33}
34
35#[derive(thiserror::Error, Debug)]
37pub enum VersionBuilderError {
38 #[error("`.well-known` is not set")]
40 WellKnownNotSet,
41
42 #[error("The `/versions` response is not set")]
44 MissingVersionsResponse,
45
46 #[error("`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, or it's not set to true.")]
49 NativeVersionIsUnset,
50}
51
52#[derive(Clone, Debug)]
54pub enum VersionBuilder {
55 None,
57
58 Native,
60
61 DiscoverNative,
65}
66
67impl VersionBuilder {
68 pub(crate) fn needs_get_supported_versions(&self) -> bool {
69 matches!(self, Self::DiscoverNative)
70 }
71
72 pub fn build(
77 self,
78 versions: Option<&get_supported_versions::Response>,
79 ) -> Result<Version, VersionBuilderError> {
80 Ok(match self {
81 Self::None => Version::None,
82
83 Self::Native => Version::Native,
84
85 Self::DiscoverNative => {
86 let Some(versions) = versions else {
87 return Err(VersionBuilderError::MissingVersionsResponse);
88 };
89
90 match versions.unstable_features.get("org.matrix.simplified_msc3575") {
91 Some(value) if *value => Version::Native,
92 _ => return Err(VersionBuilderError::NativeVersionIsUnset),
93 }
94 }
95 })
96 }
97}
98
99impl Client {
100 pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
108 let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
109 let mut response = get_supported_versions::Response::new(vec![]);
110 response.unstable_features = unstable_features;
111
112 response
113 });
114
115 [VersionBuilder::DiscoverNative]
116 .into_iter()
117 .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
118 .collect()
119 }
120
121 pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
126 Ok(SlidingSync::builder(id.into(), self.clone())?)
127 }
128
129 #[cfg(any(test, feature = "testing"))]
135 #[tracing::instrument(skip(self, response))]
136 pub async fn process_sliding_sync_test_helper(
137 &self,
138 response: &http::Response,
139 requested_required_states: &RequestedRequiredStates,
140 ) -> Result<SyncResponse> {
141 let response = self
142 .base_client()
143 .process_sliding_sync(response, &(), requested_required_states)
144 .await?;
145
146 tracing::debug!("done processing on base_client");
147 self.call_sync_response_handlers(&response).await?;
148
149 Ok(response)
150 }
151}
152
153struct SlidingSyncPreviousEventsProvider<'a>(&'a BTreeMap<OwnedRoomId, SlidingSyncRoom>);
154
155impl PreviousEventsProvider for SlidingSyncPreviousEventsProvider<'_> {
156 fn for_room(
157 &self,
158 room_id: &ruma::RoomId,
159 ) -> Vector<matrix_sdk_common::deserialized_responses::TimelineEvent> {
160 self.0.get(room_id).map(|room| room.timeline_queue()).unwrap_or_default()
161 }
162}
163
164#[must_use]
170pub(crate) struct SlidingSyncResponseProcessor<'a> {
171 client: Client,
172 to_device_events: Vec<Raw<AnyToDeviceEvent>>,
173 response: Option<SyncResponse>,
174 rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>,
175}
176
177impl<'a> SlidingSyncResponseProcessor<'a> {
178 pub fn new(client: Client, rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
179 Self { client, to_device_events: Vec::new(), response: None, rooms }
180 }
181
182 #[cfg(feature = "e2e-encryption")]
183 pub async fn handle_encryption(
184 &mut self,
185 extensions: &http::response::Extensions,
186 ) -> Result<()> {
187 assert!(self.response.is_none());
190
191 self.to_device_events = if let Some(to_device_events) = self
192 .client
193 .base_client()
194 .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
195 .await?
196 {
197 self.client.encryption().backups().maybe_trigger_backup();
199
200 to_device_events
201 } else {
202 Vec::new()
203 };
204
205 Ok(())
206 }
207
208 pub async fn handle_room_response(
209 &mut self,
210 response: &http::Response,
211 requested_required_states: &RequestedRequiredStates,
212 ) -> Result<()> {
213 self.response = Some(
214 self.client
215 .base_client()
216 .process_sliding_sync(
217 response,
218 &SlidingSyncPreviousEventsProvider(self.rooms),
219 requested_required_states,
220 )
221 .await?,
222 );
223 self.post_process().await
224 }
225
226 async fn post_process(&mut self) -> Result<()> {
227 let response = self.response.as_ref().unwrap();
230
231 update_in_memory_caches(&self.client, response).await?;
232
233 Ok(())
234 }
235
236 pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
237 let mut response = self.response.take().unwrap_or_default();
238
239 response.to_device.extend(self.to_device_events);
240
241 self.client.call_sync_response_handlers(&response).await?;
242
243 Ok(response)
244 }
245}
246
247async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
251 for room_id in response.rooms.join.keys() {
252 let Some(room) = client.get_room(room_id) else {
253 error!(room_id = ?room_id, "Cannot post process a room in sliding sync because it is missing");
254 continue;
255 };
256
257 room.user_defined_notification_mode().await;
258 }
259
260 Ok(())
261}
262
263#[cfg(all(test, not(target_family = "wasm")))]
264mod tests {
265 use std::collections::BTreeMap;
266
267 use assert_matches::assert_matches;
268 use matrix_sdk_base::{notification_settings::RoomNotificationMode, RequestedRequiredStates};
269 use matrix_sdk_test::async_test;
270 use ruma::{assign, room_id, serde::Raw};
271 use serde_json::json;
272 use wiremock::{
273 matchers::{method, path},
274 Mock, ResponseTemplate,
275 };
276
277 use super::{get_supported_versions, Version, VersionBuilder};
278 use crate::{
279 error::Result,
280 sliding_sync::{http, VersionBuilderError},
281 test_utils::logged_in_client_with_server,
282 SlidingSyncList, SlidingSyncMode,
283 };
284
285 #[test]
286 fn test_version_builder_none() {
287 assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
288 }
289
290 #[test]
291 fn test_version_builder_native() {
292 assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
293 }
294
295 #[test]
296 fn test_version_builder_discover_native() {
297 let mut response = get_supported_versions::Response::new(vec![]);
298 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
299
300 assert_matches!(VersionBuilder::DiscoverNative.build(Some(&response)), Ok(Version::Native));
301 }
302
303 #[test]
304 fn test_version_builder_discover_native_no_supported_versions() {
305 assert_matches!(
306 VersionBuilder::DiscoverNative.build(None),
307 Err(VersionBuilderError::MissingVersionsResponse)
308 );
309 }
310
311 #[test]
312 fn test_version_builder_discover_native_unstable_features_is_disabled() {
313 let mut response = get_supported_versions::Response::new(vec![]);
314 response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
315
316 assert_matches!(
317 VersionBuilder::DiscoverNative.build(Some(&response)),
318 Err(VersionBuilderError::NativeVersionIsUnset)
319 );
320 }
321
322 #[async_test]
323 async fn test_available_sliding_sync_versions_none() {
324 let (client, _server) = logged_in_client_with_server().await;
325 let available_versions = client.available_sliding_sync_versions().await;
326
327 assert!(available_versions.is_empty());
330 }
331
332 #[async_test]
333 async fn test_available_sliding_sync_versions_native() {
334 let (client, server) = logged_in_client_with_server().await;
335
336 Mock::given(method("GET"))
337 .and(path("/_matrix/client/versions"))
338 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
339 "versions": [],
340 "unstable_features": {
341 "org.matrix.simplified_msc3575": true,
342 },
343 })))
344 .mount(&server)
345 .await;
346
347 let available_versions = client.available_sliding_sync_versions().await;
348
349 assert_eq!(available_versions.len(), 1);
351 assert_matches!(available_versions[0], Version::Native);
352 }
353
354 #[async_test]
355 async fn test_cache_user_defined_notification_mode() -> Result<()> {
356 let (client, _server) = logged_in_client_with_server().await;
357 let room_id = room_id!("!r0:matrix.org");
358
359 let sliding_sync = client
360 .sliding_sync("test")?
361 .with_account_data_extension(
362 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
363 )
364 .add_list(
365 SlidingSyncList::builder("all")
366 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
367 )
368 .build()
369 .await?;
370
371 {
374 let server_response = assign!(http::Response::new("0".to_owned()), {
375 rooms: BTreeMap::from([(
376 room_id.to_owned(),
377 http::response::Room::default(),
378 )]),
379 extensions: assign!(http::response::Extensions::default(), {
380 account_data: assign!(http::response::AccountData::default(), {
381 global: vec![
382 Raw::from_json_string(
383 json!({
384 "type": "m.push_rules",
385 "content": {
386 "global": {
387 "room": [
388 {
389 "actions": ["notify"],
390 "rule_id": room_id,
391 "default": false,
392 "enabled": true,
393 },
394 ],
395 },
396 },
397 })
398 .to_string(),
399 ).unwrap()
400 ]
401 })
402 })
403 });
404
405 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
406 sliding_sync
407 .handle_response(
408 server_response.clone(),
409 &mut pos_guard,
410 RequestedRequiredStates::default(),
411 )
412 .await?;
413 }
414
415 let room = client.get_room(room_id).unwrap();
417
418 assert_eq!(
420 room.cached_user_defined_notification_mode(),
421 Some(RoomNotificationMode::AllMessages),
422 );
423
424 {
428 let server_response = assign!(http::Response::new("0".to_owned()), {
429 rooms: BTreeMap::from([(
430 room_id.to_owned(),
431 http::response::Room::default(),
432 )]),
433 extensions: assign!(http::response::Extensions::default(), {
434 account_data: assign!(http::response::AccountData::default(), {
435 global: vec![
436 Raw::from_json_string(
437 json!({
438 "type": "m.push_rules",
439 "content": {
440 "global": {
441 "room": [
442 {
443 "actions": [],
444 "rule_id": room_id,
445 "default": false,
446 "enabled": true,
447 },
448 ],
449 },
450 },
451 })
452 .to_string(),
453 ).unwrap()
454 ]
455 })
456 })
457 });
458
459 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
460 sliding_sync
461 .handle_response(
462 server_response.clone(),
463 &mut pos_guard,
464 RequestedRequiredStates::default(),
465 )
466 .await?;
467 }
468
469 assert_eq!(
471 room.cached_user_defined_notification_mode(),
472 Some(RoomNotificationMode::MentionsAndKeywordsOnly),
473 );
474
475 Ok(())
476 }
477}