matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeMap;
2
3use imbl::Vector;
4use matrix_sdk_base::{sync::SyncResponse, PreviousEventsProvider, RequestedRequiredStates};
5use ruma::{
6    api::client::{discovery::get_supported_versions, sync::sync_events::v5 as http},
7    events::AnyToDeviceEvent,
8    serde::Raw,
9    OwnedRoomId,
10};
11use tracing::error;
12
13use super::{SlidingSync, SlidingSyncBuilder};
14use crate::{Client, Result, SlidingSyncRoom};
15
16/// A sliding sync version.
17#[derive(Clone, Debug)]
18pub enum Version {
19    /// No version. Useful to represent that sliding sync is disabled for
20    /// example, and that the version is unknown.
21    None,
22
23    /// Use the version of the sliding sync implementation inside Synapse, i.e.
24    /// MSC4186.
25    Native,
26}
27
28impl Version {
29    #[cfg(test)]
30    pub(crate) fn is_native(&self) -> bool {
31        matches!(self, Self::Native)
32    }
33}
34
35/// An error when building a version.
36#[derive(thiserror::Error, Debug)]
37pub enum VersionBuilderError {
38    /// The `.well-known` response is not set.
39    #[error("`.well-known` is not set")]
40    WellKnownNotSet,
41
42    /// The `/versions` response is not set.
43    #[error("The `/versions` response is not set")]
44    MissingVersionsResponse,
45
46    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
47    /// `unstable_features`, or it's not set to true.
48    #[error("`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, or it's not set to true.")]
49    NativeVersionIsUnset,
50}
51
52/// A builder for [`Version`].
53#[derive(Clone, Debug)]
54pub enum VersionBuilder {
55    /// Build a [`Version::None`].
56    None,
57
58    /// Build a [`Version::Native`].
59    Native,
60
61    /// Build a [`Version::Native`] by auto-discovering it.
62    ///
63    /// It is available if the server enables it via `/versions`.
64    DiscoverNative,
65}
66
67impl VersionBuilder {
68    pub(crate) fn needs_get_supported_versions(&self) -> bool {
69        matches!(self, Self::DiscoverNative)
70    }
71
72    /// Build a [`Version`].
73    ///
74    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
75    /// invalid data.
76    pub fn build(
77        self,
78        versions: Option<&get_supported_versions::Response>,
79    ) -> Result<Version, VersionBuilderError> {
80        Ok(match self {
81            Self::None => Version::None,
82
83            Self::Native => Version::Native,
84
85            Self::DiscoverNative => {
86                let Some(versions) = versions else {
87                    return Err(VersionBuilderError::MissingVersionsResponse);
88                };
89
90                match versions.unstable_features.get("org.matrix.simplified_msc3575") {
91                    Some(value) if *value => Version::Native,
92                    _ => return Err(VersionBuilderError::NativeVersionIsUnset),
93                }
94            }
95        })
96    }
97}
98
99impl Client {
100    /// Find all sliding sync versions that are available.
101    ///
102    /// Be careful: This method may hit the store and will send new requests for
103    /// each call. It can be costly to call it repeatedly.
104    ///
105    /// If `.well-known` or `/versions` is unreachable, it will simply move
106    /// potential sliding sync versions aside. No error will be reported.
107    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
108        let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
109            let mut response = get_supported_versions::Response::new(vec![]);
110            response.unstable_features = unstable_features;
111
112            response
113        });
114
115        [VersionBuilder::DiscoverNative]
116            .into_iter()
117            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
118            .collect()
119    }
120
121    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
122    /// identifier.
123    ///
124    /// Note: the identifier must not be more than 16 chars long!
125    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
126        Ok(SlidingSync::builder(id.into(), self.clone())?)
127    }
128
129    /// Handle all the information provided in a sliding sync response, except
130    /// for the e2ee bits.
131    ///
132    /// If you need to handle encryption too, use the internal
133    /// `SlidingSyncResponseProcessor` instead.
134    #[cfg(any(test, feature = "testing"))]
135    #[tracing::instrument(skip(self, response))]
136    pub async fn process_sliding_sync_test_helper(
137        &self,
138        response: &http::Response,
139        requested_required_states: &RequestedRequiredStates,
140    ) -> Result<SyncResponse> {
141        let response = self
142            .base_client()
143            .process_sliding_sync(response, &(), requested_required_states)
144            .await?;
145
146        tracing::debug!("done processing on base_client");
147        self.call_sync_response_handlers(&response).await?;
148
149        Ok(response)
150    }
151}
152
153struct SlidingSyncPreviousEventsProvider<'a>(&'a BTreeMap<OwnedRoomId, SlidingSyncRoom>);
154
155impl PreviousEventsProvider for SlidingSyncPreviousEventsProvider<'_> {
156    fn for_room(
157        &self,
158        room_id: &ruma::RoomId,
159    ) -> Vector<matrix_sdk_common::deserialized_responses::TimelineEvent> {
160        self.0.get(room_id).map(|room| room.timeline_queue()).unwrap_or_default()
161    }
162}
163
164/// Small helper to handle a `SlidingSync` response's sub parts.
165///
166/// This will properly handle the encryption and the room response
167/// independently, if needs be, making sure that both are properly processed by
168/// event handlers.
169#[must_use]
170pub(crate) struct SlidingSyncResponseProcessor<'a> {
171    client: Client,
172    to_device_events: Vec<Raw<AnyToDeviceEvent>>,
173    response: Option<SyncResponse>,
174    rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>,
175}
176
177impl<'a> SlidingSyncResponseProcessor<'a> {
178    pub fn new(client: Client, rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
179        Self { client, to_device_events: Vec::new(), response: None, rooms }
180    }
181
182    #[cfg(feature = "e2e-encryption")]
183    pub async fn handle_encryption(
184        &mut self,
185        extensions: &http::response::Extensions,
186    ) -> Result<()> {
187        // This is an internal API misuse if this is triggered (calling
188        // `handle_room_response` before this function), so panic is fine.
189        assert!(self.response.is_none());
190
191        self.to_device_events = if let Some(to_device_events) = self
192            .client
193            .base_client()
194            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
195            .await?
196        {
197            // Some new keys might have been received, so trigger a backup if needed.
198            self.client.encryption().backups().maybe_trigger_backup();
199
200            to_device_events
201        } else {
202            Vec::new()
203        };
204
205        Ok(())
206    }
207
208    pub async fn handle_room_response(
209        &mut self,
210        response: &http::Response,
211        requested_required_states: &RequestedRequiredStates,
212    ) -> Result<()> {
213        self.response = Some(
214            self.client
215                .base_client()
216                .process_sliding_sync(
217                    response,
218                    &SlidingSyncPreviousEventsProvider(self.rooms),
219                    requested_required_states,
220                )
221                .await?,
222        );
223        self.post_process().await
224    }
225
226    async fn post_process(&mut self) -> Result<()> {
227        // This is an internal API misuse if this is triggered (calling
228        // `handle_room_response` after this function), so panic is fine.
229        let response = self.response.as_ref().unwrap();
230
231        update_in_memory_caches(&self.client, response).await?;
232
233        Ok(())
234    }
235
236    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
237        let mut response = self.response.take().unwrap_or_default();
238
239        response.to_device.extend(self.to_device_events);
240
241        self.client.call_sync_response_handlers(&response).await?;
242
243        Ok(response)
244    }
245}
246
247/// Update the caches for the rooms that received updates.
248///
249/// This will only fill the in-memory caches, not save the info on disk.
250async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
251    for room_id in response.rooms.join.keys() {
252        let Some(room) = client.get_room(room_id) else {
253            error!(room_id = ?room_id, "Cannot post process a room in sliding sync because it is missing");
254            continue;
255        };
256
257        room.user_defined_notification_mode().await;
258    }
259
260    Ok(())
261}
262
263#[cfg(all(test, not(target_family = "wasm")))]
264mod tests {
265    use std::collections::BTreeMap;
266
267    use assert_matches::assert_matches;
268    use matrix_sdk_base::{notification_settings::RoomNotificationMode, RequestedRequiredStates};
269    use matrix_sdk_test::async_test;
270    use ruma::{assign, room_id, serde::Raw};
271    use serde_json::json;
272    use wiremock::{
273        matchers::{method, path},
274        Mock, ResponseTemplate,
275    };
276
277    use super::{get_supported_versions, Version, VersionBuilder};
278    use crate::{
279        error::Result,
280        sliding_sync::{http, VersionBuilderError},
281        test_utils::logged_in_client_with_server,
282        SlidingSyncList, SlidingSyncMode,
283    };
284
285    #[test]
286    fn test_version_builder_none() {
287        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
288    }
289
290    #[test]
291    fn test_version_builder_native() {
292        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
293    }
294
295    #[test]
296    fn test_version_builder_discover_native() {
297        let mut response = get_supported_versions::Response::new(vec![]);
298        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
299
300        assert_matches!(VersionBuilder::DiscoverNative.build(Some(&response)), Ok(Version::Native));
301    }
302
303    #[test]
304    fn test_version_builder_discover_native_no_supported_versions() {
305        assert_matches!(
306            VersionBuilder::DiscoverNative.build(None),
307            Err(VersionBuilderError::MissingVersionsResponse)
308        );
309    }
310
311    #[test]
312    fn test_version_builder_discover_native_unstable_features_is_disabled() {
313        let mut response = get_supported_versions::Response::new(vec![]);
314        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
315
316        assert_matches!(
317            VersionBuilder::DiscoverNative.build(Some(&response)),
318            Err(VersionBuilderError::NativeVersionIsUnset)
319        );
320    }
321
322    #[async_test]
323    async fn test_available_sliding_sync_versions_none() {
324        let (client, _server) = logged_in_client_with_server().await;
325        let available_versions = client.available_sliding_sync_versions().await;
326
327        // `.well-known` and `/versions` aren't available. It's impossible to find any
328        // versions.
329        assert!(available_versions.is_empty());
330    }
331
332    #[async_test]
333    async fn test_available_sliding_sync_versions_native() {
334        let (client, server) = logged_in_client_with_server().await;
335
336        Mock::given(method("GET"))
337            .and(path("/_matrix/client/versions"))
338            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
339                "versions": [],
340                "unstable_features": {
341                    "org.matrix.simplified_msc3575": true,
342                },
343            })))
344            .mount(&server)
345            .await;
346
347        let available_versions = client.available_sliding_sync_versions().await;
348
349        // `/versions` is available.
350        assert_eq!(available_versions.len(), 1);
351        assert_matches!(available_versions[0], Version::Native);
352    }
353
354    #[async_test]
355    async fn test_cache_user_defined_notification_mode() -> Result<()> {
356        let (client, _server) = logged_in_client_with_server().await;
357        let room_id = room_id!("!r0:matrix.org");
358
359        let sliding_sync = client
360            .sliding_sync("test")?
361            .with_account_data_extension(
362                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
363            )
364            .add_list(
365                SlidingSyncList::builder("all")
366                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
367            )
368            .build()
369            .await?;
370
371        // Mock a sync response.
372        // A `m.push_rules` with `room` is cached during the sync.
373        {
374            let server_response = assign!(http::Response::new("0".to_owned()), {
375                rooms: BTreeMap::from([(
376                    room_id.to_owned(),
377                    http::response::Room::default(),
378                )]),
379                extensions: assign!(http::response::Extensions::default(), {
380                    account_data: assign!(http::response::AccountData::default(), {
381                        global: vec![
382                            Raw::from_json_string(
383                                json!({
384                                    "type": "m.push_rules",
385                                    "content": {
386                                        "global": {
387                                            "room": [
388                                                {
389                                                    "actions": ["notify"],
390                                                    "rule_id": room_id,
391                                                    "default": false,
392                                                    "enabled": true,
393                                                },
394                                            ],
395                                        },
396                                    },
397                                })
398                                .to_string(),
399                            ).unwrap()
400                        ]
401                    })
402                })
403            });
404
405            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
406            sliding_sync
407                .handle_response(
408                    server_response.clone(),
409                    &mut pos_guard,
410                    RequestedRequiredStates::default(),
411                )
412                .await?;
413        }
414
415        // The room must exist, since it's been synced.
416        let room = client.get_room(room_id).unwrap();
417
418        // The room has a cached user-defined notification mode.
419        assert_eq!(
420            room.cached_user_defined_notification_mode(),
421            Some(RoomNotificationMode::AllMessages),
422        );
423
424        // Mock a sync response.
425        // A `m.push_rules` with `room` is cached during the sync.
426        // It overwrites the previous cache.
427        {
428            let server_response = assign!(http::Response::new("0".to_owned()), {
429                rooms: BTreeMap::from([(
430                    room_id.to_owned(),
431                    http::response::Room::default(),
432                )]),
433                extensions: assign!(http::response::Extensions::default(), {
434                    account_data: assign!(http::response::AccountData::default(), {
435                        global: vec![
436                            Raw::from_json_string(
437                                json!({
438                                    "type": "m.push_rules",
439                                    "content": {
440                                        "global": {
441                                            "room": [
442                                                {
443                                                    "actions": [],
444                                                    "rule_id": room_id,
445                                                    "default": false,
446                                                    "enabled": true,
447                                                },
448                                            ],
449                                        },
450                                    },
451                                })
452                                .to_string(),
453                            ).unwrap()
454                        ]
455                    })
456                })
457            });
458
459            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
460            sliding_sync
461                .handle_response(
462                    server_response.clone(),
463                    &mut pos_guard,
464                    RequestedRequiredStates::default(),
465                )
466                .await?;
467        }
468
469        // The room has an updated cached user-defined notification mode.
470        assert_eq!(
471            room.cached_user_defined_notification_mode(),
472            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
473        );
474
475        Ok(())
476    }
477}