matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use matrix_sdk_base::{sync::SyncResponse, RequestedRequiredStates};
4use ruma::{
5    api::client::{discovery::get_supported_versions, sync::sync_events::v5 as http},
6    events::AnyToDeviceEvent,
7    serde::Raw,
8};
9use tracing::error;
10
11use super::{SlidingSync, SlidingSyncBuilder};
12use crate::{Client, Result};
13
14/// A sliding sync version.
15#[derive(Clone, Debug)]
16pub enum Version {
17    /// No version. Useful to represent that sliding sync is disabled for
18    /// example, and that the version is unknown.
19    None,
20
21    /// Use the version of the sliding sync implementation inside Synapse, i.e.
22    /// MSC4186.
23    Native,
24}
25
26impl Version {
27    #[cfg(test)]
28    pub(crate) fn is_native(&self) -> bool {
29        matches!(self, Self::Native)
30    }
31}
32
33/// An error when building a version.
34#[derive(thiserror::Error, Debug)]
35pub enum VersionBuilderError {
36    /// The `.well-known` response is not set.
37    #[error("`.well-known` is not set")]
38    WellKnownNotSet,
39
40    /// The `/versions` response is not set.
41    #[error("The `/versions` response is not set")]
42    MissingVersionsResponse,
43
44    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
45    /// `unstable_features`, or it's not set to true.
46    #[error("`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, or it's not set to true.")]
47    NativeVersionIsUnset,
48}
49
50/// A builder for [`Version`].
51#[derive(Clone, Debug)]
52pub enum VersionBuilder {
53    /// Build a [`Version::None`].
54    None,
55
56    /// Build a [`Version::Native`].
57    Native,
58
59    /// Build a [`Version::Native`] by auto-discovering it.
60    ///
61    /// It is available if the server enables it via `/versions`.
62    DiscoverNative,
63}
64
65impl VersionBuilder {
66    pub(crate) fn needs_get_supported_versions(&self) -> bool {
67        matches!(self, Self::DiscoverNative)
68    }
69
70    /// Build a [`Version`].
71    ///
72    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
73    /// invalid data.
74    pub fn build(
75        self,
76        versions: Option<&get_supported_versions::Response>,
77    ) -> Result<Version, VersionBuilderError> {
78        Ok(match self {
79            Self::None => Version::None,
80
81            Self::Native => Version::Native,
82
83            Self::DiscoverNative => {
84                let Some(versions) = versions else {
85                    return Err(VersionBuilderError::MissingVersionsResponse);
86                };
87
88                match versions.unstable_features.get("org.matrix.simplified_msc3575") {
89                    Some(value) if *value => Version::Native,
90                    _ => return Err(VersionBuilderError::NativeVersionIsUnset),
91                }
92            }
93        })
94    }
95}
96
97impl Client {
98    /// Find all sliding sync versions that are available.
99    ///
100    /// Be careful: This method may hit the store and will send new requests for
101    /// each call. It can be costly to call it repeatedly.
102    ///
103    /// If `.well-known` or `/versions` is unreachable, it will simply move
104    /// potential sliding sync versions aside. No error will be reported.
105    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
106        let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
107            let mut response = get_supported_versions::Response::new(vec![]);
108            response.unstable_features = unstable_features;
109
110            response
111        });
112
113        [VersionBuilder::DiscoverNative]
114            .into_iter()
115            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
116            .collect()
117    }
118
119    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
120    /// identifier.
121    ///
122    /// Note: the identifier must not be more than 16 chars long!
123    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
124        Ok(SlidingSync::builder(id.into(), self.clone())?)
125    }
126
127    /// Handle all the information provided in a sliding sync response, except
128    /// for the e2ee bits.
129    ///
130    /// If you need to handle encryption too, use the internal
131    /// `SlidingSyncResponseProcessor` instead.
132    #[cfg(any(test, feature = "testing"))]
133    #[tracing::instrument(skip(self, response))]
134    pub async fn process_sliding_sync_test_helper(
135        &self,
136        response: &http::Response,
137        requested_required_states: &RequestedRequiredStates,
138    ) -> Result<SyncResponse> {
139        let response =
140            self.base_client().process_sliding_sync(response, requested_required_states).await?;
141
142        tracing::debug!("done processing on base_client");
143        self.call_sync_response_handlers(&response).await?;
144
145        Ok(response)
146    }
147}
148
149/// Small helper to handle a `SlidingSync` response's sub parts.
150///
151/// This will properly handle the encryption and the room response
152/// independently, if needs be, making sure that both are properly processed by
153/// event handlers.
154#[must_use]
155pub(crate) struct SlidingSyncResponseProcessor {
156    client: Client,
157    to_device_events: Vec<Raw<AnyToDeviceEvent>>,
158    response: Option<SyncResponse>,
159}
160
161impl SlidingSyncResponseProcessor {
162    pub fn new(client: Client) -> Self {
163        Self { client, to_device_events: Vec::new(), response: None }
164    }
165
166    #[cfg(feature = "e2e-encryption")]
167    pub async fn handle_encryption(
168        &mut self,
169        extensions: &http::response::Extensions,
170    ) -> Result<()> {
171        // This is an internal API misuse if this is triggered (calling
172        // `handle_room_response` before this function), so panic is fine.
173        assert!(self.response.is_none());
174
175        self.to_device_events = if let Some(to_device_events) = self
176            .client
177            .base_client()
178            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
179            .await?
180        {
181            // Some new keys might have been received, so trigger a backup if needed.
182            self.client.encryption().backups().maybe_trigger_backup();
183
184            to_device_events
185        } else {
186            Vec::new()
187        };
188
189        Ok(())
190    }
191
192    pub async fn handle_room_response(
193        &mut self,
194        response: &http::Response,
195        requested_required_states: &RequestedRequiredStates,
196    ) -> Result<()> {
197        let mut sync_response = self
198            .client
199            .base_client()
200            .process_sliding_sync(response, requested_required_states)
201            .await?;
202        handle_receipts_extension(&self.client, response, &mut sync_response).await?;
203
204        self.response = Some(sync_response);
205        self.post_process().await
206    }
207
208    async fn post_process(&mut self) -> Result<()> {
209        // This is an internal API misuse if this is triggered (calling
210        // `handle_room_response` after this function), so panic is fine.
211        let response = self.response.as_ref().unwrap();
212
213        update_in_memory_caches(&self.client, response).await?;
214
215        Ok(())
216    }
217
218    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
219        let mut response = self.response.take().unwrap_or_default();
220
221        response.to_device.extend(self.to_device_events);
222
223        self.client.call_sync_response_handlers(&response).await?;
224
225        Ok(response)
226    }
227}
228
229/// Update the caches for the rooms that received updates.
230///
231/// This will only fill the in-memory caches, not save the info on disk.
232async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
233    for room_id in response.rooms.joined.keys() {
234        let Some(room) = client.get_room(room_id) else {
235            error!(?room_id, "Cannot post process a room in sliding sync because it is missing");
236            continue;
237        };
238
239        room.user_defined_notification_mode().await;
240    }
241
242    Ok(())
243}
244
245/// Update the receipts extension and compute the read receipt accordingly.
246async fn handle_receipts_extension(
247    client: &Client,
248    response: &http::Response,
249    sync_response: &mut SyncResponse,
250) -> Result<()> {
251    // We need to compute read receipts for each joined room that has received an
252    // update, or from each room that has received a receipt ephemeral event.
253    let room_ids = BTreeSet::from_iter(
254        sync_response
255            .rooms
256            .joined
257            .keys()
258            .cloned()
259            .chain(response.extensions.receipts.rooms.keys().cloned()),
260    );
261
262    for room_id in room_ids {
263        let Ok((room_event_cache, _drop_handle)) = client.event_cache().for_room(&room_id).await
264        else {
265            tracing::info!(
266                ?room_id,
267                "Failed to fetch the `RoomEventCache` when computing unread counts"
268            );
269
270            continue;
271        };
272
273        let previous_events = room_event_cache.events().await;
274
275        client
276            .base_client()
277            .process_sliding_sync_receipts_extension_for_room(
278                &room_id,
279                response,
280                sync_response,
281                previous_events,
282            )
283            .await?;
284    }
285    Ok(())
286}
287
288#[cfg(all(test, not(target_family = "wasm")))]
289mod tests {
290    use std::collections::BTreeMap;
291
292    use assert_matches::assert_matches;
293    use matrix_sdk_base::{
294        notification_settings::RoomNotificationMode, RequestedRequiredStates,
295        RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
296    };
297    use matrix_sdk_test::async_test;
298    use ruma::{assign, events::AnySyncTimelineEvent, room_id, serde::Raw};
299    use serde_json::json;
300    use wiremock::{
301        matchers::{method, path},
302        Mock, ResponseTemplate,
303    };
304
305    use super::{get_supported_versions, Version, VersionBuilder};
306    use crate::{
307        error::Result,
308        sliding_sync::{client::SlidingSyncResponseProcessor, http, VersionBuilderError},
309        test_utils::{logged_in_client, logged_in_client_with_server},
310        SlidingSyncList, SlidingSyncMode,
311    };
312
313    #[test]
314    fn test_version_builder_none() {
315        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
316    }
317
318    #[test]
319    fn test_version_builder_native() {
320        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
321    }
322
323    #[test]
324    fn test_version_builder_discover_native() {
325        let mut response = get_supported_versions::Response::new(vec![]);
326        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
327
328        assert_matches!(VersionBuilder::DiscoverNative.build(Some(&response)), Ok(Version::Native));
329    }
330
331    #[test]
332    fn test_version_builder_discover_native_no_supported_versions() {
333        assert_matches!(
334            VersionBuilder::DiscoverNative.build(None),
335            Err(VersionBuilderError::MissingVersionsResponse)
336        );
337    }
338
339    #[test]
340    fn test_version_builder_discover_native_unstable_features_is_disabled() {
341        let mut response = get_supported_versions::Response::new(vec![]);
342        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
343
344        assert_matches!(
345            VersionBuilder::DiscoverNative.build(Some(&response)),
346            Err(VersionBuilderError::NativeVersionIsUnset)
347        );
348    }
349
350    #[async_test]
351    async fn test_available_sliding_sync_versions_none() {
352        let (client, _server) = logged_in_client_with_server().await;
353        let available_versions = client.available_sliding_sync_versions().await;
354
355        // `.well-known` and `/versions` aren't available. It's impossible to find any
356        // versions.
357        assert!(available_versions.is_empty());
358    }
359
360    #[async_test]
361    async fn test_available_sliding_sync_versions_native() {
362        let (client, server) = logged_in_client_with_server().await;
363
364        Mock::given(method("GET"))
365            .and(path("/_matrix/client/versions"))
366            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
367                "versions": [],
368                "unstable_features": {
369                    "org.matrix.simplified_msc3575": true,
370                },
371            })))
372            .mount(&server)
373            .await;
374
375        let available_versions = client.available_sliding_sync_versions().await;
376
377        // `/versions` is available.
378        assert_eq!(available_versions.len(), 1);
379        assert_matches!(available_versions[0], Version::Native);
380    }
381
382    #[async_test]
383    async fn test_cache_user_defined_notification_mode() -> Result<()> {
384        let (client, _server) = logged_in_client_with_server().await;
385        let room_id = room_id!("!r0:matrix.org");
386
387        let sliding_sync = client
388            .sliding_sync("test")?
389            .with_account_data_extension(
390                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
391            )
392            .add_list(
393                SlidingSyncList::builder("all")
394                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
395            )
396            .build()
397            .await?;
398
399        // Mock a sync response.
400        // A `m.push_rules` with `room` is cached during the sync.
401        {
402            let server_response = assign!(http::Response::new("0".to_owned()), {
403                rooms: BTreeMap::from([(
404                    room_id.to_owned(),
405                    http::response::Room::default(),
406                )]),
407                extensions: assign!(http::response::Extensions::default(), {
408                    account_data: assign!(http::response::AccountData::default(), {
409                        global: vec![
410                            Raw::from_json_string(
411                                json!({
412                                    "type": "m.push_rules",
413                                    "content": {
414                                        "global": {
415                                            "room": [
416                                                {
417                                                    "actions": ["notify"],
418                                                    "rule_id": room_id,
419                                                    "default": false,
420                                                    "enabled": true,
421                                                },
422                                            ],
423                                        },
424                                    },
425                                })
426                                .to_string(),
427                            ).unwrap()
428                        ]
429                    })
430                })
431            });
432
433            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
434            sliding_sync
435                .handle_response(
436                    server_response.clone(),
437                    &mut pos_guard,
438                    RequestedRequiredStates::default(),
439                )
440                .await?;
441        }
442
443        // The room must exist, since it's been synced.
444        let room = client.get_room(room_id).unwrap();
445
446        // The room has a cached user-defined notification mode.
447        assert_eq!(
448            room.cached_user_defined_notification_mode(),
449            Some(RoomNotificationMode::AllMessages),
450        );
451
452        // Mock a sync response.
453        // A `m.push_rules` with `room` is cached during the sync.
454        // It overwrites the previous cache.
455        {
456            let server_response = assign!(http::Response::new("0".to_owned()), {
457                rooms: BTreeMap::from([(
458                    room_id.to_owned(),
459                    http::response::Room::default(),
460                )]),
461                extensions: assign!(http::response::Extensions::default(), {
462                    account_data: assign!(http::response::AccountData::default(), {
463                        global: vec![
464                            Raw::from_json_string(
465                                json!({
466                                    "type": "m.push_rules",
467                                    "content": {
468                                        "global": {
469                                            "room": [
470                                                {
471                                                    "actions": [],
472                                                    "rule_id": room_id,
473                                                    "default": false,
474                                                    "enabled": true,
475                                                },
476                                            ],
477                                        },
478                                    },
479                                })
480                                .to_string(),
481                            ).unwrap()
482                        ]
483                    })
484                })
485            });
486
487            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
488            sliding_sync
489                .handle_response(
490                    server_response.clone(),
491                    &mut pos_guard,
492                    RequestedRequiredStates::default(),
493                )
494                .await?;
495        }
496
497        // The room has an updated cached user-defined notification mode.
498        assert_eq!(
499            room.cached_user_defined_notification_mode(),
500            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
501        );
502
503        Ok(())
504    }
505
506    #[async_test]
507    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
508        use ruma::api::client::sync::sync_events::v5 as http;
509
510        // Given a logged-in client
511        let client = logged_in_client(None).await;
512        client.event_cache().subscribe().unwrap();
513
514        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
515
516        // When I send sliding sync response containing a new room.
517        let room_id = room_id!("!r:e.uk");
518        let room = http::response::Room::new();
519        let mut response = http::Response::new("5".to_owned());
520        response.rooms.insert(room_id.to_owned(), room);
521
522        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
523        processor
524            .handle_room_response(&response, &RequestedRequiredStates::default())
525            .await
526            .expect("Failed to process sync");
527        processor.process_and_take_response().await.expect("Failed to finish processing sync");
528
529        // Then room info notable updates are received.
530        assert_matches!(
531            room_info_notable_update_stream.recv().await,
532            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
533                assert_eq!(received_room_id, room_id);
534                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
535            }
536        );
537        assert_matches!(
538            room_info_notable_update_stream.recv().await,
539            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
540                assert_eq!(received_room_id, room_id);
541                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
542            }
543        );
544        assert!(room_info_notable_update_stream.is_empty());
545
546        // When I send sliding sync response containing a couple of events with no read
547        // receipt.
548        let room_id = room_id!("!r:e.uk");
549        let events = vec![
550            make_raw_event("m.room.message", "$3"),
551            make_raw_event("m.room.message", "$4"),
552            make_raw_event("m.read", "$5"),
553        ];
554        let room = assign!(http::response::Room::new(), {
555            timeline: events,
556        });
557        let mut response = http::Response::new("5".to_owned());
558        response.rooms.insert(room_id.to_owned(), room);
559
560        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
561        processor
562            .handle_room_response(&response, &RequestedRequiredStates::default())
563            .await
564            .expect("Failed to process sync");
565        processor.process_and_take_response().await.expect("Failed to finish processing sync");
566
567        // Then room info notable updates are received.
568        //
569        // `NONE` because the regular sync process ends up to updating nothing.
570        assert_matches!(
571            room_info_notable_update_stream.recv().await,
572            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
573                assert_eq!(received_room_id, room_id);
574                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
575            }
576        );
577        // `READ_RECEIPT` because this is what we expect.
578        assert_matches!(
579            room_info_notable_update_stream.recv().await,
580            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
581                assert_eq!(received_room_id, room_id);
582                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
583            }
584        );
585        assert!(room_info_notable_update_stream.is_empty());
586    }
587
588    fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
589        Raw::from_json_string(
590            json!({
591                "type": event_type,
592                "event_id": id,
593                "content": { "msgtype": "m.text", "body": "my msg" },
594                "sender": "@u:h.uk",
595                "origin_server_ts": 12344445,
596            })
597            .to_string(),
598        )
599        .unwrap()
600    }
601}