matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use matrix_sdk_base::{sync::SyncResponse, RequestedRequiredStates};
4use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
5use ruma::api::{client::sync::sync_events::v5 as http, FeatureFlag, SupportedVersions};
6use tracing::error;
7
8use super::{SlidingSync, SlidingSyncBuilder};
9use crate::{Client, Result};
10
11/// A sliding sync version.
12#[derive(Clone, Debug)]
13pub enum Version {
14    /// No version. Useful to represent that sliding sync is disabled for
15    /// example, and that the version is unknown.
16    None,
17
18    /// Use the version of the sliding sync implementation inside Synapse, i.e.
19    /// MSC4186.
20    Native,
21}
22
23impl Version {
24    #[cfg(test)]
25    pub(crate) fn is_native(&self) -> bool {
26        matches!(self, Self::Native)
27    }
28}
29
30/// An error when building a version.
31#[derive(thiserror::Error, Debug)]
32pub enum VersionBuilderError {
33    /// The `.well-known` response is not set.
34    #[error("`.well-known` is not set")]
35    WellKnownNotSet,
36
37    /// The `/versions` response is not set.
38    #[error("The `/versions` response is not set")]
39    MissingVersionsResponse,
40
41    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
42    /// `unstable_features`, or it's not set to true.
43    #[error(
44        "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
45         or it's not set to true."
46    )]
47    NativeVersionIsUnset,
48}
49
50/// A builder for [`Version`].
51#[derive(Clone, Debug)]
52pub enum VersionBuilder {
53    /// Build a [`Version::None`].
54    None,
55
56    /// Build a [`Version::Native`].
57    Native,
58
59    /// Build a [`Version::Native`] by auto-discovering it.
60    ///
61    /// It is available if the server enables it via `/versions`.
62    DiscoverNative,
63}
64
65impl VersionBuilder {
66    pub(crate) fn needs_get_supported_versions(&self) -> bool {
67        matches!(self, Self::DiscoverNative)
68    }
69
70    /// Build a [`Version`].
71    ///
72    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
73    /// invalid data.
74    pub fn build(
75        self,
76        supported: Option<&SupportedVersions>,
77    ) -> Result<Version, VersionBuilderError> {
78        Ok(match self {
79            Self::None => Version::None,
80
81            Self::Native => Version::Native,
82
83            Self::DiscoverNative => {
84                let Some(supported) = supported else {
85                    return Err(VersionBuilderError::MissingVersionsResponse);
86                };
87
88                if supported.features.contains(&FeatureFlag::Msc4186) {
89                    Version::Native
90                } else {
91                    return Err(VersionBuilderError::NativeVersionIsUnset);
92                }
93            }
94        })
95    }
96}
97
98impl Client {
99    /// Find all sliding sync versions that are available.
100    ///
101    /// Be careful: This method may hit the store and will send new requests for
102    /// each call. It can be costly to call it repeatedly.
103    ///
104    /// If `.well-known` or `/versions` is unreachable, it will simply move
105    /// potential sliding sync versions aside. No error will be reported.
106    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
107        let supported_versions = self.supported_versions().await.ok();
108
109        [VersionBuilder::DiscoverNative]
110            .into_iter()
111            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
112            .collect()
113    }
114
115    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
116    /// identifier.
117    ///
118    /// Note: the identifier must not be more than 16 chars long!
119    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
120        Ok(SlidingSync::builder(id.into(), self.clone())?)
121    }
122
123    /// Handle all the information provided in a sliding sync response, except
124    /// for the e2ee bits.
125    ///
126    /// If you need to handle encryption too, use the internal
127    /// `SlidingSyncResponseProcessor` instead.
128    #[cfg(any(test, feature = "testing"))]
129    #[tracing::instrument(skip(self, response))]
130    pub async fn process_sliding_sync_test_helper(
131        &self,
132        response: &http::Response,
133        requested_required_states: &RequestedRequiredStates,
134    ) -> Result<SyncResponse> {
135        let response =
136            self.base_client().process_sliding_sync(response, requested_required_states).await?;
137
138        tracing::debug!("done processing on base_client");
139        self.call_sync_response_handlers(&response).await?;
140
141        Ok(response)
142    }
143}
144
145/// Small helper to handle a `SlidingSync` response's sub parts.
146///
147/// This will properly handle the encryption and the room response
148/// independently, if needs be, making sure that both are properly processed by
149/// event handlers.
150#[must_use]
151pub(crate) struct SlidingSyncResponseProcessor {
152    client: Client,
153    to_device_events: Vec<ProcessedToDeviceEvent>,
154    response: Option<SyncResponse>,
155}
156
157impl SlidingSyncResponseProcessor {
158    pub fn new(client: Client) -> Self {
159        Self { client, to_device_events: Vec::new(), response: None }
160    }
161
162    #[cfg(feature = "e2e-encryption")]
163    pub async fn handle_encryption(
164        &mut self,
165        extensions: &http::response::Extensions,
166    ) -> Result<()> {
167        // This is an internal API misuse if this is triggered (calling
168        // `handle_room_response` before this function), so panic is fine.
169        assert!(self.response.is_none());
170
171        self.to_device_events = if let Some(to_device_events) = self
172            .client
173            .base_client()
174            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
175            .await?
176        {
177            // Some new keys might have been received, so trigger a backup if needed.
178            self.client.encryption().backups().maybe_trigger_backup();
179
180            to_device_events
181        } else {
182            Vec::new()
183        };
184
185        Ok(())
186    }
187
188    pub async fn handle_room_response(
189        &mut self,
190        response: &http::Response,
191        requested_required_states: &RequestedRequiredStates,
192    ) -> Result<()> {
193        let mut sync_response = self
194            .client
195            .base_client()
196            .process_sliding_sync(response, requested_required_states)
197            .await?;
198        handle_receipts_extension(&self.client, response, &mut sync_response).await?;
199
200        update_in_memory_caches(&self.client, &sync_response).await?;
201
202        self.response = Some(sync_response);
203
204        Ok(())
205    }
206
207    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
208        let mut response = self.response.take().unwrap_or_default();
209
210        response.to_device.extend(self.to_device_events);
211
212        self.client.call_sync_response_handlers(&response).await?;
213
214        Ok(response)
215    }
216}
217
218/// Update the caches for the rooms that received updates.
219///
220/// This will only fill the in-memory caches, not save the info on disk.
221async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
222    for room_id in response.rooms.joined.keys() {
223        let Some(room) = client.get_room(room_id) else {
224            error!(?room_id, "Cannot post process a room in sliding sync because it is missing");
225            continue;
226        };
227
228        room.user_defined_notification_mode().await;
229    }
230
231    Ok(())
232}
233
234/// Update the receipts extension and compute the read receipt accordingly.
235async fn handle_receipts_extension(
236    client: &Client,
237    response: &http::Response,
238    sync_response: &mut SyncResponse,
239) -> Result<()> {
240    // We need to compute read receipts for each joined room that has received an
241    // update, or from each room that has received a receipt ephemeral event.
242    let room_ids = BTreeSet::from_iter(
243        sync_response
244            .rooms
245            .joined
246            .keys()
247            .cloned()
248            .chain(response.extensions.receipts.rooms.keys().cloned()),
249    );
250
251    for room_id in room_ids {
252        let Ok((room_event_cache, _drop_handle)) = client.event_cache().for_room(&room_id).await
253        else {
254            tracing::info!(
255                ?room_id,
256                "Failed to fetch the `RoomEventCache` when computing unread counts"
257            );
258
259            continue;
260        };
261
262        let previous_events = room_event_cache.events().await;
263
264        client
265            .base_client()
266            .process_sliding_sync_receipts_extension_for_room(
267                &room_id,
268                response,
269                sync_response,
270                previous_events,
271            )
272            .await?;
273    }
274    Ok(())
275}
276
277#[cfg(all(test, not(target_family = "wasm")))]
278mod tests {
279    use std::collections::BTreeMap;
280
281    use assert_matches::assert_matches;
282    use matrix_sdk_base::{
283        notification_settings::RoomNotificationMode, RequestedRequiredStates,
284        RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
285    };
286    use matrix_sdk_test::async_test;
287    use ruma::{
288        api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
289        room_id, serde::Raw,
290    };
291    use serde_json::json;
292
293    use super::{Version, VersionBuilder};
294    use crate::{
295        error::Result,
296        sliding_sync::{client::SlidingSyncResponseProcessor, http, VersionBuilderError},
297        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
298        SlidingSyncList, SlidingSyncMode,
299    };
300
301    #[test]
302    fn test_version_builder_none() {
303        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
304    }
305
306    #[test]
307    fn test_version_builder_native() {
308        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
309    }
310
311    #[test]
312    fn test_version_builder_discover_native() {
313        let mut response = get_supported_versions::Response::new(vec![]);
314        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
315
316        assert_matches!(
317            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
318            Ok(Version::Native)
319        );
320    }
321
322    #[test]
323    fn test_version_builder_discover_native_no_supported_versions() {
324        assert_matches!(
325            VersionBuilder::DiscoverNative.build(None),
326            Err(VersionBuilderError::MissingVersionsResponse)
327        );
328    }
329
330    #[test]
331    fn test_version_builder_discover_native_unstable_features_is_disabled() {
332        let mut response = get_supported_versions::Response::new(vec![]);
333        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
334
335        assert_matches!(
336            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
337            Err(VersionBuilderError::NativeVersionIsUnset)
338        );
339    }
340
341    #[async_test]
342    async fn test_available_sliding_sync_versions_none() {
343        let client = MockClientBuilder::new(None).build().await;
344        let available_versions = client.available_sliding_sync_versions().await;
345
346        // `.well-known` and `/versions` aren't available. It's impossible to find any
347        // versions.
348        assert!(available_versions.is_empty());
349    }
350
351    #[async_test]
352    async fn test_available_sliding_sync_versions_native() {
353        let server = MatrixMockServer::new().await;
354        let client = server.client_builder().no_server_versions().build().await;
355
356        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
357
358        let available_versions = client.available_sliding_sync_versions().await;
359
360        // `/versions` is available.
361        assert_eq!(available_versions.len(), 1);
362        assert_matches!(available_versions[0], Version::Native);
363    }
364
365    #[async_test]
366    async fn test_cache_user_defined_notification_mode() -> Result<()> {
367        let client = MockClientBuilder::new(None).build().await;
368        let room_id = room_id!("!r0:matrix.org");
369
370        let sliding_sync = client
371            .sliding_sync("test")?
372            .with_account_data_extension(
373                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
374            )
375            .add_list(
376                SlidingSyncList::builder("all")
377                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
378            )
379            .build()
380            .await?;
381
382        // Mock a sync response.
383        // A `m.push_rules` with `room` is cached during the sync.
384        {
385            let server_response = assign!(http::Response::new("0".to_owned()), {
386                rooms: BTreeMap::from([(
387                    room_id.to_owned(),
388                    http::response::Room::default(),
389                )]),
390                extensions: assign!(http::response::Extensions::default(), {
391                    account_data: assign!(http::response::AccountData::default(), {
392                        global: vec![
393                            Raw::from_json_string(
394                                json!({
395                                    "type": "m.push_rules",
396                                    "content": {
397                                        "global": {
398                                            "room": [
399                                                {
400                                                    "actions": ["notify"],
401                                                    "rule_id": room_id,
402                                                    "default": false,
403                                                    "enabled": true,
404                                                },
405                                            ],
406                                        },
407                                    },
408                                })
409                                .to_string(),
410                            ).unwrap()
411                        ]
412                    })
413                })
414            });
415
416            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
417            sliding_sync
418                .handle_response(
419                    server_response.clone(),
420                    &mut pos_guard,
421                    RequestedRequiredStates::default(),
422                )
423                .await?;
424        }
425
426        // The room must exist, since it's been synced.
427        let room = client.get_room(room_id).unwrap();
428
429        // The room has a cached user-defined notification mode.
430        assert_eq!(
431            room.cached_user_defined_notification_mode(),
432            Some(RoomNotificationMode::AllMessages),
433        );
434
435        // Mock a sync response.
436        // A `m.push_rules` with `room` is cached during the sync.
437        // It overwrites the previous cache.
438        {
439            let server_response = assign!(http::Response::new("0".to_owned()), {
440                rooms: BTreeMap::from([(
441                    room_id.to_owned(),
442                    http::response::Room::default(),
443                )]),
444                extensions: assign!(http::response::Extensions::default(), {
445                    account_data: assign!(http::response::AccountData::default(), {
446                        global: vec![
447                            Raw::from_json_string(
448                                json!({
449                                    "type": "m.push_rules",
450                                    "content": {
451                                        "global": {
452                                            "room": [
453                                                {
454                                                    "actions": [],
455                                                    "rule_id": room_id,
456                                                    "default": false,
457                                                    "enabled": true,
458                                                },
459                                            ],
460                                        },
461                                    },
462                                })
463                                .to_string(),
464                            ).unwrap()
465                        ]
466                    })
467                })
468            });
469
470            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
471            sliding_sync
472                .handle_response(
473                    server_response.clone(),
474                    &mut pos_guard,
475                    RequestedRequiredStates::default(),
476                )
477                .await?;
478        }
479
480        // The room has an updated cached user-defined notification mode.
481        assert_eq!(
482            room.cached_user_defined_notification_mode(),
483            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
484        );
485
486        Ok(())
487    }
488
489    #[async_test]
490    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
491        use ruma::api::client::sync::sync_events::v5 as http;
492
493        // Given a logged-in client.
494        let client = MockClientBuilder::new(None).build().await;
495        client.event_cache().subscribe().unwrap();
496
497        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
498
499        // When I send sliding sync response containing a new room.
500        let room_id = room_id!("!r:e.uk");
501        let room = http::response::Room::new();
502        let mut response = http::Response::new("5".to_owned());
503        response.rooms.insert(room_id.to_owned(), room);
504
505        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
506        processor
507            .handle_room_response(&response, &RequestedRequiredStates::default())
508            .await
509            .expect("Failed to process sync");
510        processor.process_and_take_response().await.expect("Failed to finish processing sync");
511
512        // Then room info notable updates are received.
513        assert_matches!(
514            room_info_notable_update_stream.recv().await,
515            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
516                assert_eq!(received_room_id, room_id);
517                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
518            }
519        );
520        assert_matches!(
521            room_info_notable_update_stream.recv().await,
522            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
523                assert_eq!(received_room_id, room_id);
524                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
525            }
526        );
527        assert!(room_info_notable_update_stream.is_empty());
528
529        // When I send sliding sync response containing a couple of events with no read
530        // receipt.
531        let room_id = room_id!("!r:e.uk");
532        let events = vec![
533            make_raw_event("m.room.message", "$3"),
534            make_raw_event("m.room.message", "$4"),
535            make_raw_event("m.read", "$5"),
536        ];
537        let room = assign!(http::response::Room::new(), {
538            timeline: events,
539        });
540        let mut response = http::Response::new("5".to_owned());
541        response.rooms.insert(room_id.to_owned(), room);
542
543        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
544        processor
545            .handle_room_response(&response, &RequestedRequiredStates::default())
546            .await
547            .expect("Failed to process sync");
548        processor.process_and_take_response().await.expect("Failed to finish processing sync");
549
550        // Then room info notable updates are received.
551        //
552        // `NONE` because the regular sync process ends up to updating nothing.
553        assert_matches!(
554            room_info_notable_update_stream.recv().await,
555            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
556                assert_eq!(received_room_id, room_id);
557                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
558            }
559        );
560        // `READ_RECEIPT` because this is what we expect.
561        assert_matches!(
562            room_info_notable_update_stream.recv().await,
563            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
564                assert_eq!(received_room_id, room_id);
565                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
566            }
567        );
568        assert!(room_info_notable_update_stream.is_empty());
569    }
570
571    fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
572        Raw::from_json_string(
573            json!({
574                "type": event_type,
575                "event_id": id,
576                "content": { "msgtype": "m.text", "body": "my msg" },
577                "sender": "@u:h.uk",
578                "origin_server_ts": 12344445,
579            })
580            .to_string(),
581        )
582        .unwrap()
583    }
584}