matrix_sdk/sliding_sync/
client.rs

1use std::collections::BTreeSet;
2
3use futures_util::future::try_join_all;
4use matrix_sdk_base::{
5    RequestedRequiredStates, ThreadSubscriptionCatchupToken, sync::SyncResponse, timer,
6};
7use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
8use ruma::{
9    OwnedRoomId,
10    api::{
11        FeatureFlag, SupportedVersions,
12        client::sync::sync_events::v5::{self as http, response},
13    },
14    events::GlobalAccountDataEventType,
15};
16use tracing::error;
17
18use super::{SlidingSync, SlidingSyncBuilder};
19use crate::{Client, Result};
20
21/// A sliding sync version.
22#[derive(Clone, Debug)]
23pub enum Version {
24    /// No version. Useful to represent that sliding sync is disabled for
25    /// example, and that the version is unknown.
26    None,
27
28    /// Use the version of the sliding sync implementation inside Synapse, i.e.
29    /// MSC4186.
30    Native,
31}
32
33impl Version {
34    #[cfg(test)]
35    pub(crate) fn is_native(&self) -> bool {
36        matches!(self, Self::Native)
37    }
38}
39
40/// An error when building a version.
41#[derive(thiserror::Error, Debug)]
42pub enum VersionBuilderError {
43    /// The `.well-known` response is not set.
44    #[error("`.well-known` is not set")]
45    WellKnownNotSet,
46
47    /// The `/versions` response is not set.
48    #[error("The `/versions` response is not set")]
49    MissingVersionsResponse,
50
51    /// `/versions` does not contain `org.matrix.simplified_msc3575` in its
52    /// `unstable_features`, or it's not set to true.
53    #[error(
54        "`/versions` does not contain `org.matrix.simplified_msc3575` in its `unstable_features`, \
55         or it's not set to true."
56    )]
57    NativeVersionIsUnset,
58}
59
60/// A builder for [`Version`].
61#[derive(Clone, Debug)]
62pub enum VersionBuilder {
63    /// Build a [`Version::None`].
64    None,
65
66    /// Build a [`Version::Native`].
67    Native,
68
69    /// Build a [`Version::Native`] by auto-discovering it.
70    ///
71    /// It is available if the server enables it via `/versions`.
72    DiscoverNative,
73}
74
75impl VersionBuilder {
76    pub(crate) fn needs_get_supported_versions(&self) -> bool {
77        matches!(self, Self::DiscoverNative)
78    }
79
80    /// Build a [`Version`].
81    ///
82    /// It can fail if auto-discovering fails, e.g. if `/versions` do contain
83    /// invalid data.
84    pub fn build(
85        self,
86        supported: Option<&SupportedVersions>,
87    ) -> Result<Version, VersionBuilderError> {
88        Ok(match self {
89            Self::None => Version::None,
90
91            Self::Native => Version::Native,
92
93            Self::DiscoverNative => {
94                let Some(supported) = supported else {
95                    return Err(VersionBuilderError::MissingVersionsResponse);
96                };
97
98                if supported.features.contains(&FeatureFlag::Msc4186) {
99                    Version::Native
100                } else {
101                    return Err(VersionBuilderError::NativeVersionIsUnset);
102                }
103            }
104        })
105    }
106}
107
108impl Client {
109    /// Find all sliding sync versions that are available.
110    ///
111    /// Be careful: This method may hit the store and will send new requests for
112    /// each call. It can be costly to call it repeatedly.
113    ///
114    /// If `.well-known` or `/versions` is unreachable, it will simply move
115    /// potential sliding sync versions aside. No error will be reported.
116    pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
117        let supported_versions = self.supported_versions().await.ok();
118
119        [VersionBuilder::DiscoverNative]
120            .into_iter()
121            .filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
122            .collect()
123    }
124
125    /// Create a [`SlidingSyncBuilder`] tied to this client, with the given
126    /// identifier.
127    ///
128    /// Note: the identifier must not be more than 16 chars long!
129    pub fn sliding_sync(&self, id: impl Into<String>) -> Result<SlidingSyncBuilder> {
130        Ok(SlidingSync::builder(id.into(), self.clone())?)
131    }
132
133    /// Handle all the information provided in a sliding sync response, except
134    /// for the e2ee bits.
135    ///
136    /// If you need to handle encryption too, use the internal
137    /// `SlidingSyncResponseProcessor` instead.
138    #[cfg(any(test, feature = "testing"))]
139    #[tracing::instrument(skip(self, response))]
140    pub async fn process_sliding_sync_test_helper(
141        &self,
142        response: &http::Response,
143        requested_required_states: &RequestedRequiredStates,
144    ) -> Result<SyncResponse> {
145        let response =
146            self.base_client().process_sliding_sync(response, requested_required_states).await?;
147
148        tracing::debug!("done processing on base_client");
149        self.call_sync_response_handlers(&response).await?;
150
151        Ok(response)
152    }
153}
154
155/// Small helper to handle a `SlidingSync` response's sub parts.
156///
157/// This will properly handle the encryption and the room response
158/// independently, if needs be, making sure that both are properly processed by
159/// event handlers.
160#[must_use]
161pub(crate) struct SlidingSyncResponseProcessor {
162    client: Client,
163    to_device_events: Vec<ProcessedToDeviceEvent>,
164    response: Option<SyncResponse>,
165}
166
167impl SlidingSyncResponseProcessor {
168    pub fn new(client: Client) -> Self {
169        Self { client, to_device_events: Vec::new(), response: None }
170    }
171
172    #[cfg(feature = "e2e-encryption")]
173    pub async fn handle_encryption(&mut self, extensions: &response::Extensions) -> Result<()> {
174        // This is an internal API misuse if this is triggered (calling
175        // `handle_room_response` before this function), so panic is fine.
176        assert!(self.response.is_none());
177
178        self.to_device_events = if let Some(to_device_events) = self
179            .client
180            .base_client()
181            .process_sliding_sync_e2ee(extensions.to_device.as_ref(), &extensions.e2ee)
182            .await?
183        {
184            // Some new keys might have been received, so trigger a backup if needed.
185            self.client.encryption().backups().maybe_trigger_backup();
186
187            to_device_events
188        } else {
189            Vec::new()
190        };
191
192        Ok(())
193    }
194
195    pub async fn handle_room_response(
196        &mut self,
197        response: &http::Response,
198        requested_required_states: &RequestedRequiredStates,
199    ) -> Result<()> {
200        let previously_joined_rooms = self
201            .client
202            .joined_rooms()
203            .into_iter()
204            .map(|r| r.room_id().to_owned())
205            .collect::<BTreeSet<_>>();
206
207        let mut sync_response = self
208            .client
209            .base_client()
210            .process_sliding_sync(response, requested_required_states)
211            .await?;
212
213        handle_receipts_extension(&self.client, response, &mut sync_response).await?;
214
215        update_in_memory_caches(&self.client, &previously_joined_rooms, &sync_response).await;
216
217        self.response = Some(sync_response);
218
219        Ok(())
220    }
221
222    pub async fn handle_thread_subscriptions(
223        &mut self,
224        previous_pos: Option<&str>,
225        thread_subs: response::ThreadSubscriptions,
226    ) -> Result<()> {
227        let catchup_token =
228            thread_subs.prev_batch.map(|prev_batch| ThreadSubscriptionCatchupToken {
229                from: prev_batch,
230                to: previous_pos.map(|s| s.to_owned()),
231            });
232
233        self.client
234            .thread_subscription_catchup()
235            .sync_subscriptions(thread_subs.subscribed, thread_subs.unsubscribed, catchup_token)
236            .await?;
237
238        Ok(())
239    }
240
241    pub async fn process_and_take_response(mut self) -> Result<SyncResponse> {
242        let mut response = self.response.take().unwrap_or_default();
243
244        response.to_device.extend(self.to_device_events);
245
246        self.client.call_sync_response_handlers(&response).await?;
247
248        Ok(response)
249    }
250}
251
252/// Update the caches for the rooms that received updates.
253///
254/// This will only fill the in-memory caches, not save the info on disk.
255async fn update_in_memory_caches(
256    client: &Client,
257    previously_joined_rooms: &BTreeSet<OwnedRoomId>,
258    response: &SyncResponse,
259) {
260    let _timer = timer!(tracing::Level::TRACE, "update_in_memory_caches");
261
262    // If the push rules have changed, update the cached notification mode for *all*
263    // the joined rooms.
264    if response.account_data.iter().any(|event| {
265        event
266            .get_field::<GlobalAccountDataEventType>("type")
267            .ok()
268            .flatten()
269            .is_some_and(|event_type| event_type == GlobalAccountDataEventType::PushRules)
270    }) {
271        let notification_settings = client.notification_settings().await;
272        let rules = notification_settings.rules().await;
273
274        // Update all joined rooms.
275        for room in client.joined_rooms() {
276            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
277                room.update_cached_user_defined_notification_mode(mode);
278            }
279        }
280    } else {
281        // Otherwise, precompute the cached user-defined notification mode only for the
282        // newly joined rooms.
283
284        // We'll compute the rules only once, lazily, if needs be.
285        let mut rules = None;
286
287        for room_id in response
288            .rooms
289            .joined
290            .keys()
291            .filter(|room_id| !previously_joined_rooms.contains(*room_id))
292        {
293            let Some(room) = client.get_room(room_id) else {
294                error!(?room_id, "The room must exist since it has been joined");
295                continue;
296            };
297
298            // Reuse the previous `Rules` instance, or compute it once and for all.
299            let rules = if let Some(rules) = &mut rules {
300                rules
301            } else {
302                rules.insert(client.notification_settings().await.rules().await.clone())
303            };
304
305            // Define an initial value for the cached user-defined notification mode.
306            if let Some(mode) = rules.get_user_defined_room_notification_mode(room.room_id()) {
307                room.update_cached_user_defined_notification_mode(mode);
308            }
309        }
310    }
311}
312
313/// Update the receipts extension and compute the read receipt accordingly.
314async fn handle_receipts_extension(
315    client: &Client,
316    response: &http::Response,
317    sync_response: &mut SyncResponse,
318) -> Result<()> {
319    let _timer = timer!(tracing::Level::TRACE, "handle_receipts_extension");
320
321    // We need to compute read receipts for each joined room that has received an
322    // update, or from each room that has received a receipt ephemeral event.
323    let room_ids = BTreeSet::from_iter(
324        sync_response
325            .rooms
326            .joined
327            .keys()
328            .cloned()
329            .chain(response.extensions.receipts.rooms.keys().cloned()),
330    );
331
332    // Process each room concurrently.
333    let futures = room_ids.into_iter().map(|room_id| {
334        let new_sync_events = sync_response
335            .rooms
336            .joined
337            .entry(room_id.to_owned())
338            .or_default()
339            .timeline
340            .events
341            .clone();
342
343        async {
344            let Ok((room_event_cache, _drop_handle)) =
345                client.event_cache().for_room(&room_id).await
346            else {
347                tracing::info!(
348                    ?room_id,
349                    "Failed to fetch the `RoomEventCache` when computing unread counts"
350                );
351                return Ok::<_, crate::Error>(None);
352            };
353
354            let previous_events = room_event_cache.events().await;
355
356            let receipt_event = client
357                .base_client()
358                .process_sliding_sync_receipts_extension_for_room(
359                    &room_id,
360                    response,
361                    new_sync_events,
362                    previous_events,
363                )
364                .await?;
365
366            Ok(Some((room_id, receipt_event)))
367        }
368    });
369
370    let updates = try_join_all(futures).await?;
371
372    for (room_id, receipt_event_content) in updates.into_iter().flatten() {
373        if let Some(event) = receipt_event_content {
374            sync_response.rooms.joined.entry(room_id).or_default().ephemeral.push(event.cast());
375        }
376    }
377
378    Ok(())
379}
380
381#[cfg(all(test, not(target_family = "wasm")))]
382mod tests {
383    use std::collections::BTreeMap;
384
385    use assert_matches::assert_matches;
386    use matrix_sdk_base::{
387        RequestedRequiredStates, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
388        notification_settings::RoomNotificationMode,
389    };
390    use matrix_sdk_test::async_test;
391    use ruma::{
392        api::client::discovery::get_supported_versions, assign, events::AnySyncTimelineEvent,
393        room_id, serde::Raw,
394    };
395    use serde_json::json;
396
397    use super::{Version, VersionBuilder};
398    use crate::{
399        SlidingSyncList, SlidingSyncMode,
400        error::Result,
401        sliding_sync::{VersionBuilderError, client::SlidingSyncResponseProcessor, http},
402        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
403    };
404
405    #[test]
406    fn test_version_builder_none() {
407        assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
408    }
409
410    #[test]
411    fn test_version_builder_native() {
412        assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
413    }
414
415    #[test]
416    fn test_version_builder_discover_native() {
417        let mut response = get_supported_versions::Response::new(vec![]);
418        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
419
420        assert_matches!(
421            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
422            Ok(Version::Native)
423        );
424    }
425
426    #[test]
427    fn test_version_builder_discover_native_no_supported_versions() {
428        assert_matches!(
429            VersionBuilder::DiscoverNative.build(None),
430            Err(VersionBuilderError::MissingVersionsResponse)
431        );
432    }
433
434    #[test]
435    fn test_version_builder_discover_native_unstable_features_is_disabled() {
436        let mut response = get_supported_versions::Response::new(vec![]);
437        response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
438
439        assert_matches!(
440            VersionBuilder::DiscoverNative.build(Some(&response.as_supported_versions())),
441            Err(VersionBuilderError::NativeVersionIsUnset)
442        );
443    }
444
445    #[async_test]
446    async fn test_available_sliding_sync_versions_none() {
447        let client = MockClientBuilder::new(None).build().await;
448        let available_versions = client.available_sliding_sync_versions().await;
449
450        // `.well-known` and `/versions` aren't available. It's impossible to find any
451        // versions.
452        assert!(available_versions.is_empty());
453    }
454
455    #[async_test]
456    async fn test_available_sliding_sync_versions_native() {
457        let server = MatrixMockServer::new().await;
458        let client = server.client_builder().no_server_versions().build().await;
459
460        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
461
462        let available_versions = client.available_sliding_sync_versions().await;
463
464        // `/versions` is available.
465        assert_eq!(available_versions.len(), 1);
466        assert_matches!(available_versions[0], Version::Native);
467    }
468
469    #[async_test]
470    async fn test_cache_user_defined_notification_mode() -> Result<()> {
471        let client = MockClientBuilder::new(None).build().await;
472        let room_id = room_id!("!r0:matrix.org");
473
474        let sliding_sync = client
475            .sliding_sync("test")?
476            .with_account_data_extension(
477                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
478            )
479            .add_list(
480                SlidingSyncList::builder("all")
481                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
482            )
483            .build()
484            .await?;
485
486        // Mock a sync response.
487        // A `m.push_rules` with `room` is cached during the sync.
488        {
489            let server_response = assign!(http::Response::new("0".to_owned()), {
490                rooms: BTreeMap::from([(
491                    room_id.to_owned(),
492                    http::response::Room::default(),
493                )]),
494                extensions: assign!(http::response::Extensions::default(), {
495                    account_data: assign!(http::response::AccountData::default(), {
496                        global: vec![
497                            Raw::from_json_string(
498                                json!({
499                                    "type": "m.push_rules",
500                                    "content": {
501                                        "global": {
502                                            "room": [
503                                                {
504                                                    "actions": ["notify"],
505                                                    "rule_id": room_id,
506                                                    "default": false,
507                                                    "enabled": true,
508                                                },
509                                            ],
510                                        },
511                                    },
512                                })
513                                .to_string(),
514                            ).unwrap()
515                        ]
516                    })
517                })
518            });
519
520            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
521            sliding_sync
522                .handle_response(
523                    server_response.clone(),
524                    &mut pos_guard,
525                    RequestedRequiredStates::default(),
526                )
527                .await?;
528        }
529
530        // The room must exist, since it's been synced.
531        let room = client.get_room(room_id).unwrap();
532
533        // The room has a cached user-defined notification mode.
534        assert_eq!(
535            room.cached_user_defined_notification_mode(),
536            Some(RoomNotificationMode::AllMessages),
537        );
538
539        // Mock a sync response.
540        // A `m.push_rules` with `room` is cached during the sync.
541        // It overwrites the previous cache.
542        {
543            let server_response = assign!(http::Response::new("0".to_owned()), {
544                rooms: BTreeMap::from([(
545                    room_id.to_owned(),
546                    http::response::Room::default(),
547                )]),
548                extensions: assign!(http::response::Extensions::default(), {
549                    account_data: assign!(http::response::AccountData::default(), {
550                        global: vec![
551                            Raw::from_json_string(
552                                json!({
553                                    "type": "m.push_rules",
554                                    "content": {
555                                        "global": {
556                                            "room": [
557                                                {
558                                                    "actions": [],
559                                                    "rule_id": room_id,
560                                                    "default": false,
561                                                    "enabled": true,
562                                                },
563                                            ],
564                                        },
565                                    },
566                                })
567                                .to_string(),
568                            ).unwrap()
569                        ]
570                    })
571                })
572            });
573
574            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
575            sliding_sync
576                .handle_response(
577                    server_response.clone(),
578                    &mut pos_guard,
579                    RequestedRequiredStates::default(),
580                )
581                .await?;
582        }
583
584        // The room has an updated cached user-defined notification mode.
585        assert_eq!(
586            room.cached_user_defined_notification_mode(),
587            Some(RoomNotificationMode::MentionsAndKeywordsOnly),
588        );
589
590        // Mock a sync response.
591        // Even if the room doesn't appear in the response, its notification mode will
592        // be updated immediately if a new `m.push_rules` is received.
593        {
594            let server_response = assign!(http::Response::new("0".to_owned()), {
595                extensions: assign!(http::response::Extensions::default(), {
596                    account_data: assign!(http::response::AccountData::default(), {
597                        global: vec![
598                            Raw::from_json_string(
599                                json!({
600                                    "type": "m.push_rules",
601                                    "content": {
602                                        "global": {
603                                            "room": [
604                                                {
605                                                    "actions": ["notify"],
606                                                    "rule_id": room_id,
607                                                    "default": false,
608                                                    "enabled": true,
609                                                },
610                                            ],
611                                        },
612                                    },
613                                })
614                                .to_string(),
615                            ).unwrap()
616                        ]
617                    })
618                })
619            });
620
621            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
622            sliding_sync
623                .handle_response(
624                    server_response.clone(),
625                    &mut pos_guard,
626                    RequestedRequiredStates::default(),
627                )
628                .await?;
629        }
630
631        // The room notification mode has been updated again!
632        assert_eq!(
633            room.cached_user_defined_notification_mode(),
634            Some(RoomNotificationMode::AllMessages),
635        );
636
637        Ok(())
638    }
639
640    #[async_test]
641    async fn test_read_receipt_can_trigger_a_notable_update_reason() {
642        use ruma::api::client::sync::sync_events::v5 as http;
643
644        // Given a logged-in client.
645        let client = MockClientBuilder::new(None).build().await;
646        client.event_cache().subscribe().unwrap();
647
648        let mut room_info_notable_update_stream = client.room_info_notable_update_receiver();
649
650        // When I send sliding sync response containing a new room.
651        let room_id = room_id!("!r:e.uk");
652        let room = http::response::Room::new();
653        let mut response = http::Response::new("5".to_owned());
654        response.rooms.insert(room_id.to_owned(), room);
655
656        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
657        processor
658            .handle_room_response(&response, &RequestedRequiredStates::default())
659            .await
660            .expect("Failed to process sync");
661        processor.process_and_take_response().await.expect("Failed to finish processing sync");
662
663        // Then room info notable updates are received.
664        assert_matches!(
665            room_info_notable_update_stream.recv().await,
666            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
667                assert_eq!(received_room_id, room_id);
668                assert!(!received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
669            }
670        );
671        assert_matches!(
672            room_info_notable_update_stream.recv().await,
673            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
674                assert_eq!(received_room_id, room_id);
675                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::DISPLAY_NAME), "{received_reasons:?}");
676            }
677        );
678        assert!(room_info_notable_update_stream.is_empty());
679
680        // When I send sliding sync response containing a couple of events with no read
681        // receipt.
682        let room_id = room_id!("!r:e.uk");
683        let events = vec![
684            make_raw_event("m.room.message", "$3"),
685            make_raw_event("m.room.message", "$4"),
686            make_raw_event("m.read", "$5"),
687        ];
688        let room = assign!(http::response::Room::new(), {
689            timeline: events,
690        });
691        let mut response = http::Response::new("5".to_owned());
692        response.rooms.insert(room_id.to_owned(), room);
693
694        let mut processor = SlidingSyncResponseProcessor::new(client.clone());
695        processor
696            .handle_room_response(&response, &RequestedRequiredStates::default())
697            .await
698            .expect("Failed to process sync");
699        processor.process_and_take_response().await.expect("Failed to finish processing sync");
700
701        // Then room info notable updates are received.
702        //
703        // `NONE` because the regular sync process ends up to updating nothing.
704        assert_matches!(
705            room_info_notable_update_stream.recv().await,
706            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
707                assert_eq!(received_room_id, room_id);
708                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::NONE), "{received_reasons:?}");
709            }
710        );
711        // `READ_RECEIPT` because this is what we expect.
712        assert_matches!(
713            room_info_notable_update_stream.recv().await,
714            Ok(RoomInfoNotableUpdate { room_id: received_room_id, reasons: received_reasons }) => {
715                assert_eq!(received_room_id, room_id);
716                assert!(received_reasons.contains(RoomInfoNotableUpdateReasons::READ_RECEIPT), "{received_reasons:?}");
717            }
718        );
719        assert!(room_info_notable_update_stream.is_empty());
720    }
721
722    fn make_raw_event(event_type: &str, id: &str) -> Raw<AnySyncTimelineEvent> {
723        Raw::from_json_string(
724            json!({
725                "type": event_type,
726                "event_id": id,
727                "content": { "msgtype": "m.text", "body": "my msg" },
728                "sender": "@u:h.uk",
729                "origin_server_ts": 12344445,
730            })
731            .to_string(),
732        )
733        .unwrap()
734    }
735}