1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28 collections::{btree_map::Entry, BTreeMap},
29 fmt::Debug,
30 future::Future,
31 sync::{Arc, RwLock as StdRwLock},
32 time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38use matrix_sdk_base::RequestedRequiredStates;
39use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
40use ruma::{
41 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42 assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46 select,
47 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51#[cfg(feature = "e2e-encryption")]
52use self::utils::JoinHandleExt as _;
53pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
54use self::{
55 cache::restore_sliding_sync_state,
56 client::SlidingSyncResponseProcessor,
57 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
58};
59use crate::{config::RequestConfig, Client, Result};
60
61#[derive(Clone, Debug)]
65pub struct SlidingSync {
66 inner: Arc<SlidingSyncInner>,
68}
69
70#[derive(Debug)]
71pub(super) struct SlidingSyncInner {
72 id: String,
76
77 client: Client,
79
80 poll_timeout: Duration,
82
83 network_timeout: Duration,
86
87 storage_key: String,
89
90 share_pos: bool,
97
98 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
111
112 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
114
115 rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
117
118 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
120
121 internal_channel: Sender<SlidingSyncInternalMessage>,
124}
125
126impl SlidingSync {
127 pub(super) fn new(inner: SlidingSyncInner) -> Self {
128 Self { inner: Arc::new(inner) }
129 }
130
131 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
132 cache::store_sliding_sync_state(self, position).await
133 }
134
135 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
137 SlidingSyncBuilder::new(id, client)
138 }
139
140 pub fn subscribe_to_rooms(
147 &self,
148 room_ids: &[&RoomId],
149 settings: Option<http::request::RoomSubscription>,
150 cancel_in_flight_request: bool,
151 ) {
152 let settings = settings.unwrap_or_default();
153 let mut sticky = self.inner.sticky.write().unwrap();
154 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
155
156 let mut skip_over_current_sync_loop_iteration = false;
157
158 for room_id in room_ids {
159 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
166 if let Some(room) = self.inner.client.get_room(room_id) {
167 room.mark_members_missing();
168 }
169
170 entry.insert((RoomSubscriptionState::default(), settings.clone()));
171
172 skip_over_current_sync_loop_iteration = true;
173 }
174 }
175
176 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
177 self.inner.internal_channel_send_if_possible(
178 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
179 );
180 }
181 }
182
183 pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
185 self.inner.rooms.read().await.get(room_id).cloned()
186 }
187
188 pub fn get_number_of_rooms(&self) -> usize {
190 self.inner.rooms.blocking_read().len()
191 }
192
193 pub async fn on_list<Function, FunctionOutput, R>(
195 &self,
196 list_name: &str,
197 function: Function,
198 ) -> Option<R>
199 where
200 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
201 FunctionOutput: Future<Output = R>,
202 {
203 let lists = self.inner.lists.read().await;
204
205 match lists.get(list_name) {
206 Some(list) => Some(function(list).await),
207 None => None,
208 }
209 }
210
211 pub async fn add_list(
217 &self,
218 list_builder: SlidingSyncListBuilder,
219 ) -> Result<Option<SlidingSyncList>> {
220 let list = list_builder.build(self.inner.internal_channel.clone());
221
222 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
223
224 self.inner.internal_channel_send_if_possible(
225 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
226 );
227
228 Ok(old_list)
229 }
230
231 pub async fn add_cached_list(
238 &self,
239 mut list_builder: SlidingSyncListBuilder,
240 ) -> Result<Option<SlidingSyncList>> {
241 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
242
243 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
244
245 self.add_list(list_builder).await
246 }
247
248 pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
250 &self,
251 room_ids: I,
252 ) -> Vec<Option<SlidingSyncRoom>> {
253 let rooms = self.inner.rooms.read().await;
254
255 room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
256 }
257
258 pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
260 self.inner.rooms.read().await.values().cloned().collect()
261 }
262
263 #[instrument(skip_all)]
265 async fn handle_response(
266 &self,
267 sliding_sync_response: http::Response,
268 position: &mut SlidingSyncPositionMarkers,
269 requested_required_states: RequestedRequiredStates,
270 ) -> Result<UpdateSummary, crate::Error> {
271 let pos = Some(sliding_sync_response.pos.clone());
272
273 let must_process_rooms_response = self.must_process_rooms_response().await;
274
275 trace!(yes = must_process_rooms_response, "Must process rooms response?");
276
277 let mut sync_response = {
285 let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
288
289 let rooms = &*self.inner.rooms.read().await;
290 let mut response_processor =
291 SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
292
293 #[cfg(feature = "e2e-encryption")]
294 if self.is_e2ee_enabled() {
295 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
296 }
297
298 if must_process_rooms_response {
301 response_processor
302 .handle_room_response(&sliding_sync_response, &requested_required_states)
303 .await?;
304 }
305
306 response_processor.process_and_take_response().await?
307 };
308
309 debug!(?sync_response, "Sliding Sync response has been handled by the client");
310
311 if let Some(ref txn_id) = sliding_sync_response.txn_id {
313 let txn_id = txn_id.as_str().into();
314 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
315 let mut lists = self.inner.lists.write().await;
316 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
317 }
318
319 let update_summary = {
320 let updated_rooms = {
322 let mut rooms_map = self.inner.rooms.write().await;
323
324 let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
325
326 for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
327 let timeline =
331 if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
332 joined_room.timeline.events
333 } else {
334 room_data.timeline.drain(..).map(TimelineEvent::new).collect()
335 };
336
337 match rooms_map.get_mut(&room_id) {
338 Some(room) => {
340 room.update(room_data, timeline);
341 }
342
343 None => {
345 rooms_map.insert(
346 room_id.clone(),
347 SlidingSyncRoom::new(
348 room_id.clone(),
349 room_data.prev_batch,
350 timeline,
351 ),
352 );
353 }
354 }
355
356 updated_rooms.push(room_id);
357 }
358
359 updated_rooms.extend(sync_response.rooms.join.keys().cloned());
367
368 updated_rooms
369 };
370
371 let updated_lists = {
373 debug!(
374 lists = ?sliding_sync_response.lists,
375 "Update lists"
376 );
377
378 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
379 let mut lists = self.inner.lists.write().await;
380
381 for (name, list) in lists.iter_mut() {
384 if let Some(updates) = sliding_sync_response.lists.get(name) {
385 let maximum_number_of_rooms: u32 =
386 updates.count.try_into().expect("failed to convert `count` to `u32`");
387
388 if list.update(Some(maximum_number_of_rooms))? {
389 updated_lists.push(name.clone());
390 }
391 } else if list.update(None)? {
392 updated_lists.push(name.clone());
393 }
394 }
395
396 for name in sliding_sync_response.lists.keys() {
398 if !lists.contains_key(name) {
399 error!("Response for list `{name}` - unknown to us; skipping");
400 }
401 }
402
403 updated_lists
404 };
405
406 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
407 };
408
409 position.pos = pos;
413
414 Ok(update_summary)
415 }
416
417 async fn generate_sync_request(
418 &self,
419 txn_id: &mut LazyTransactionId,
420 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
421 let mut requests_lists = BTreeMap::new();
423
424 let require_timeout = {
425 let lists = self.inner.lists.read().await;
426
427 let mut require_timeout = true;
429
430 for (name, list) in lists.iter() {
431 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
432 require_timeout = require_timeout && list.requires_timeout();
433 }
434
435 require_timeout
436 };
437
438 let mut position_guard = self.inner.position.clone().lock_owned().await;
446
447 let to_device_enabled =
448 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
449
450 let restored_fields = if self.inner.share_pos || to_device_enabled {
451 let lists = self.inner.lists.read().await;
452 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
453 } else {
454 None
455 };
456
457 let pos = if self.inner.share_pos {
460 if let Some(fields) = &restored_fields {
461 if fields.pos != position_guard.pos {
463 info!(
464 "Pos from previous request ('{:?}') was different from \
465 pos in database ('{:?}').",
466 position_guard.pos, fields.pos
467 );
468 position_guard.pos = fields.pos.clone();
469 }
470 fields.pos.clone()
471 } else {
472 position_guard.pos.clone()
473 }
474 } else {
475 position_guard.pos.clone()
476 };
477
478 Span::current().record("pos", &pos);
479
480 #[cfg(feature = "e2e-encryption")]
489 if pos.is_none() && self.is_e2ee_enabled() {
490 info!("Marking all tracked users as dirty");
491
492 let olm_machine = self.inner.client.olm_machine().await;
493 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
494 olm_machine.mark_all_tracked_users_as_dirty().await?;
495 }
496
497 let timeout = require_timeout.then(|| self.inner.poll_timeout);
502
503 let mut request = assign!(http::Request::new(), {
504 conn_id: Some(self.inner.id.clone()),
505 pos,
506 timeout,
507 lists: requests_lists,
508 });
509
510 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
512
513 if to_device_enabled {
517 request.extensions.to_device.since =
518 restored_fields.and_then(|fields| fields.to_device_token);
519 }
520
521 if let Some(txn_id) = txn_id.get() {
523 request.txn_id = Some(txn_id.to_string());
524 }
525
526 Ok((
527 request,
529 RequestConfig::default()
532 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
533 .retry_limit(3),
534 position_guard,
535 ))
536 }
537
538 async fn send_sync_request(
542 &self,
543 request: http::Request,
544 request_config: RequestConfig,
545 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
546 ) -> Result<UpdateSummary> {
547 debug!("Sending request");
548
549 let requested_required_states = RequestedRequiredStates::from(&request);
551 let request = self.inner.client.send(request).with_request_config(request_config);
552
553 #[cfg(feature = "e2e-encryption")]
560 let response = {
561 if self.is_e2ee_enabled() {
562 let client = self.inner.client.clone();
579 let e2ee_uploads = spawn(async move {
580 if let Err(error) = client.send_outgoing_requests().await {
581 error!(?error, "Error while sending outgoing E2EE requests");
582 }
583 })
584 .abort_on_drop();
587
588 let response = request.await?;
590
591 e2ee_uploads.await.map_err(|error| Error::JoinError {
596 task_description: "e2ee_uploads".to_owned(),
597 error,
598 })?;
599
600 response
601 } else {
602 request.await?
603 }
604 };
605
606 #[cfg(not(feature = "e2e-encryption"))]
608 let response = request.await?;
609
610 debug!("Received response");
611
612 let this = self.clone();
622
623 let future = async move {
626 debug!("Start handling response");
627
628 let updates = this
634 .handle_response(response, &mut position_guard, requested_required_states)
635 .await?;
636
637 this.cache_to_storage(&position_guard).await?;
638
639 drop(position_guard);
642
643 debug!("Done handling response");
644
645 Ok(updates)
646 };
647
648 spawn(future.instrument(Span::current())).await.unwrap()
649 }
650
651 #[cfg(feature = "e2e-encryption")]
653 fn is_e2ee_enabled(&self) -> bool {
654 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
655 }
656
657 #[cfg(not(feature = "e2e-encryption"))]
658 fn is_e2ee_enabled(&self) -> bool {
659 false
660 }
661
662 async fn must_process_rooms_response(&self) -> bool {
664 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
667 || !self.inner.lists.read().await.is_empty()
668 }
669
670 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
671 async fn sync_once(&self) -> Result<UpdateSummary> {
672 let (request, request_config, position_guard) =
673 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
674
675 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
677
678 self.inner.client.inner.sync_beat.notify(usize::MAX);
680
681 Ok(summaries)
682 }
683
684 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
694 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
695 debug!("Starting sync stream");
696
697 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
698
699 stream! {
700 loop {
701 debug!("Sync stream is running");
702
703 select! {
704 biased;
705
706 internal_message = internal_channel_receiver.recv() => {
707 use SlidingSyncInternalMessage::*;
708
709 debug!(?internal_message, "Sync stream has received an internal message");
710
711 match internal_message {
712 Err(_) | Ok(SyncLoopStop) => {
713 break;
714 }
715
716 Ok(SyncLoopSkipOverCurrentIteration) => {
717 continue;
718 }
719 }
720 }
721
722 update_summary = self.sync_once() => {
723 match update_summary {
724 Ok(updates) => {
725 yield Ok(updates);
726 }
727
728 Err(error) => {
730 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
731 self.expire_session().await;
733 }
734
735 yield Err(error);
736
737 break;
739 }
740 }
741 }
742 }
743 }
744
745 debug!("Sync stream has exited.");
746 }
747 }
748
749 pub fn stop_sync(&self) -> Result<()> {
758 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
759 }
760
761 #[doc(hidden)]
773 pub async fn expire_session(&self) {
774 info!("Session expired; resetting `pos` and sticky parameters");
775
776 {
777 let mut position = self.inner.position.lock().await;
778 position.pos = None;
779
780 if let Err(err) = self.cache_to_storage(&position).await {
781 error!(
782 "couldn't invalidate sliding sync frozen state when expiring session: {err}"
783 );
784 }
785 }
786
787 {
788 let mut sticky = self.inner.sticky.write().unwrap();
789
790 sticky.data_mut().room_subscriptions.clear();
793 }
794
795 self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
796 }
797}
798
799impl SlidingSyncInner {
800 #[instrument]
802 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
803 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
804 }
805
806 #[instrument]
809 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
810 let _ = self.internal_channel.send(message);
812 }
813}
814
815#[derive(Copy, Clone, Debug, PartialEq)]
816enum SlidingSyncInternalMessage {
817 SyncLoopStop,
819
820 SyncLoopSkipOverCurrentIteration,
823}
824
825#[cfg(any(test, feature = "testing"))]
826impl SlidingSync {
827 pub async fn set_pos(&self, new_pos: String) {
829 let mut position_lock = self.inner.position.lock().await;
830 position_lock.pos = Some(new_pos);
831 }
832
833 pub fn extensions_config(&self) -> http::request::Extensions {
839 let sticky = self.inner.sticky.read().unwrap();
840 sticky.data().extensions.clone()
841 }
842}
843
844#[derive(Clone, Debug)]
845pub(super) struct SlidingSyncPositionMarkers {
846 pos: Option<String>,
851}
852
853#[derive(Debug, Serialize, Deserialize)]
855struct FrozenSlidingSync {
856 #[serde(skip_serializing_if = "Option::is_none")]
858 to_device_since: Option<String>,
859 #[serde(default, skip_serializing_if = "Vec::is_empty")]
860 rooms: Vec<FrozenSlidingSyncRoom>,
861}
862
863impl FrozenSlidingSync {
864 fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
865 Self {
867 to_device_since: None,
868 rooms: rooms
869 .iter()
870 .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
871 .collect::<Vec<_>>(),
872 }
873 }
874}
875
876#[derive(Serialize, Deserialize)]
877struct FrozenSlidingSyncPos {
878 #[serde(skip_serializing_if = "Option::is_none")]
879 pos: Option<String>,
880}
881
882#[derive(Debug, Clone)]
885pub struct UpdateSummary {
886 pub lists: Vec<String>,
888 pub rooms: Vec<OwnedRoomId>,
890}
891
892#[derive(Debug, Default)]
896enum RoomSubscriptionState {
897 #[default]
901 Pending,
902
903 Applied,
906}
907
908#[derive(Debug)]
911pub(super) struct SlidingSyncStickyParameters {
912 room_subscriptions:
915 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
916
917 extensions: http::request::Extensions,
920}
921
922impl SlidingSyncStickyParameters {
923 pub fn new(
925 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
926 extensions: http::request::Extensions,
927 ) -> Self {
928 Self {
929 room_subscriptions: room_subscriptions
930 .into_iter()
931 .map(|(room_id, room_subscription)| {
932 (room_id, (RoomSubscriptionState::Pending, room_subscription))
933 })
934 .collect(),
935 extensions,
936 }
937 }
938}
939
940impl StickyData for SlidingSyncStickyParameters {
941 type Request = http::Request;
942
943 fn apply(&self, request: &mut Self::Request) {
944 request.room_subscriptions = self
945 .room_subscriptions
946 .iter()
947 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
948 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
949 .collect();
950 request.extensions = self.extensions.clone();
951 }
952
953 fn on_commit(&mut self) {
954 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
956 if matches!(state, RoomSubscriptionState::Pending) {
957 *state = RoomSubscriptionState::Applied;
958 }
959 }
960 }
961}
962
963#[cfg(all(test, not(target_family = "wasm")))]
964#[allow(clippy::dbg_macro)]
965mod tests {
966 use std::{
967 collections::BTreeMap,
968 future::ready,
969 ops::Not,
970 sync::{Arc, Mutex},
971 time::Duration,
972 };
973
974 use assert_matches::assert_matches;
975 use event_listener::Listener;
976 use futures_util::{future::join_all, pin_mut, StreamExt};
977 use matrix_sdk_base::RequestedRequiredStates;
978 use matrix_sdk_test::async_test;
979 use ruma::{
980 api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
981 OwnedRoomId, TransactionId,
982 };
983 use serde::Deserialize;
984 use serde_json::json;
985 use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
986
987 use super::{
988 http,
989 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
990 FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
991 SlidingSyncStickyParameters,
992 };
993 use crate::{
994 sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
995 };
996
997 #[derive(Copy, Clone)]
998 struct SlidingSyncMatcher;
999
1000 impl Match for SlidingSyncMatcher {
1001 fn matches(&self, request: &Request) -> bool {
1002 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
1003 && request.method == Method::POST
1004 }
1005 }
1006
1007 async fn new_sliding_sync(
1008 lists: Vec<SlidingSyncListBuilder>,
1009 ) -> Result<(MockServer, SlidingSync)> {
1010 let server = MockServer::start().await;
1011 let client = logged_in_client(Some(server.uri())).await;
1012
1013 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1014
1015 for list in lists {
1016 sliding_sync_builder = sliding_sync_builder.add_list(list);
1017 }
1018
1019 let sliding_sync = sliding_sync_builder.build().await?;
1020
1021 Ok((server, sliding_sync))
1022 }
1023
1024 #[async_test]
1025 async fn test_subscribe_to_rooms() -> Result<()> {
1026 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1027 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1028 .await?;
1029
1030 let stream = sliding_sync.sync();
1031 pin_mut!(stream);
1032
1033 let room_id_0 = room_id!("!r0:bar.org");
1034 let room_id_1 = room_id!("!r1:bar.org");
1035 let room_id_2 = room_id!("!r2:bar.org");
1036
1037 {
1038 let _mock_guard = Mock::given(SlidingSyncMatcher)
1039 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1040 "pos": "1",
1041 "lists": {},
1042 "rooms": {
1043 room_id_0: {
1044 "name": "Room #0",
1045 "initial": true,
1046 },
1047 room_id_1: {
1048 "name": "Room #1",
1049 "initial": true,
1050 },
1051 room_id_2: {
1052 "name": "Room #2",
1053 "initial": true,
1054 },
1055 }
1056 })))
1057 .mount_as_scoped(&server)
1058 .await;
1059
1060 let _ = stream.next().await.unwrap()?;
1061 }
1062
1063 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1064
1065 assert!(room0.are_members_synced().not());
1069
1070 {
1071 struct MemberMatcher(OwnedRoomId);
1072
1073 impl Match for MemberMatcher {
1074 fn matches(&self, request: &Request) -> bool {
1075 request.url.path()
1076 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1077 && request.method == Method::GET
1078 }
1079 }
1080
1081 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1082 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1083 "chunk": [],
1084 })))
1085 .mount_as_scoped(&server)
1086 .await;
1087
1088 assert_matches!(room0.request_members().await, Ok(()));
1089 }
1090
1091 assert!(room0.are_members_synced());
1093
1094 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1095
1096 assert!(room0.are_members_synced().not());
1099
1100 {
1101 let sticky = sliding_sync.inner.sticky.read().unwrap();
1102 let room_subscriptions = &sticky.data().room_subscriptions;
1103
1104 assert!(room_subscriptions.contains_key(room_id_0));
1105 assert!(room_subscriptions.contains_key(room_id_1));
1106 assert!(!room_subscriptions.contains_key(room_id_2));
1107 }
1108
1109 {
1112 struct MemberMatcher(OwnedRoomId);
1113
1114 impl Match for MemberMatcher {
1115 fn matches(&self, request: &Request) -> bool {
1116 request.url.path()
1117 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1118 && request.method == Method::GET
1119 }
1120 }
1121
1122 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1123 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1124 "chunk": [],
1125 })))
1126 .mount_as_scoped(&server)
1127 .await;
1128
1129 assert_matches!(room0.request_members().await, Ok(()));
1130 }
1131
1132 assert!(room0.are_members_synced());
1134
1135 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1136
1137 assert!(room0.are_members_synced());
1140
1141 Ok(())
1142 }
1143
1144 #[async_test]
1145 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1146 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1147 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1148 .await?;
1149
1150 let room_id_0 = room_id!("!r0:bar.org");
1151 let room_id_1 = room_id!("!r1:bar.org");
1152 let room_id_2 = room_id!("!r2:bar.org");
1153
1154 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1156
1157 {
1158 let sticky = sliding_sync.inner.sticky.read().unwrap();
1159 let room_subscriptions = &sticky.data().room_subscriptions;
1160
1161 assert!(room_subscriptions.contains_key(room_id_0));
1162 assert!(room_subscriptions.contains_key(room_id_1));
1163 assert!(room_subscriptions.contains_key(room_id_2).not());
1164 }
1165
1166 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1168
1169 {
1170 let sticky = sliding_sync.inner.sticky.read().unwrap();
1171 let room_subscriptions = &sticky.data().room_subscriptions;
1172
1173 assert!(room_subscriptions.contains_key(room_id_0));
1174 assert!(room_subscriptions.contains_key(room_id_1));
1175 assert!(room_subscriptions.contains_key(room_id_2));
1176 }
1177
1178 sliding_sync.expire_session().await;
1180
1181 {
1182 let sticky = sliding_sync.inner.sticky.read().unwrap();
1183 let room_subscriptions = &sticky.data().room_subscriptions;
1184
1185 assert!(room_subscriptions.is_empty());
1186 }
1187
1188 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1190
1191 {
1192 let sticky = sliding_sync.inner.sticky.read().unwrap();
1193 let room_subscriptions = &sticky.data().room_subscriptions;
1194
1195 assert!(room_subscriptions.contains_key(room_id_0).not());
1196 assert!(room_subscriptions.contains_key(room_id_1).not());
1197 assert!(room_subscriptions.contains_key(room_id_2));
1198 }
1199
1200 Ok(())
1201 }
1202
1203 #[async_test]
1204 async fn test_to_device_token_properly_cached() -> Result<()> {
1205 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1206 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1207 .await?;
1208
1209 let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1212 assert!(frozen.to_device_since.is_none());
1213
1214 Ok(())
1215 }
1216
1217 #[async_test]
1218 async fn test_add_list() -> Result<()> {
1219 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1220 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1221 .await?;
1222
1223 let _stream = sliding_sync.sync();
1224 pin_mut!(_stream);
1225
1226 sliding_sync
1227 .add_list(
1228 SlidingSyncList::builder("bar")
1229 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1230 )
1231 .await?;
1232
1233 let lists = sliding_sync.inner.lists.read().await;
1234
1235 assert!(lists.contains_key("foo"));
1236 assert!(lists.contains_key("bar"));
1237
1238 Ok(())
1241 }
1242
1243 #[test]
1244 fn test_sticky_parameters_api_invalidated_flow() {
1245 let r0 = room_id!("!r0.matrix.org");
1246 let r1 = room_id!("!r1:matrix.org");
1247
1248 let mut room_subscriptions = BTreeMap::new();
1249 room_subscriptions.insert(r0.to_owned(), Default::default());
1250
1251 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1253 room_subscriptions,
1254 Default::default(),
1255 ));
1256 assert!(sticky.is_invalidated());
1257
1258 let txn_id: &TransactionId = "tid123".into();
1260
1261 let mut request = http::Request::default();
1262 request.txn_id = Some(txn_id.to_string());
1263
1264 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1265
1266 assert!(request.txn_id.is_some());
1267 assert_eq!(request.room_subscriptions.len(), 1);
1268 assert!(request.room_subscriptions.contains_key(r0));
1269
1270 let tid = request.txn_id.unwrap();
1271
1272 sticky.maybe_commit(tid.as_str().into());
1273 assert!(!sticky.is_invalidated());
1274
1275 sticky
1277 .data_mut()
1278 .room_subscriptions
1279 .insert(r1.to_owned(), (Default::default(), Default::default()));
1280 assert!(sticky.is_invalidated());
1281
1282 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1284 assert!(sticky.is_invalidated());
1285
1286 let txn_id1: &TransactionId = "tid456".into();
1288 let mut request1 = http::Request::default();
1289 request1.txn_id = Some(txn_id1.to_string());
1290 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1291
1292 assert!(sticky.is_invalidated());
1293 assert_eq!(request1.room_subscriptions.len(), 1);
1297 assert!(request1.room_subscriptions.contains_key(r1));
1298
1299 let txn_id2: &TransactionId = "tid789".into();
1300 let mut request2 = http::Request::default();
1301 request2.txn_id = Some(txn_id2.to_string());
1302
1303 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1304 assert!(sticky.is_invalidated());
1305 assert_eq!(request2.room_subscriptions.len(), 1);
1308 assert!(request2.room_subscriptions.contains_key(r1));
1309
1310 sticky.maybe_commit(txn_id1);
1313 assert!(sticky.is_invalidated());
1314
1315 sticky.maybe_commit(txn_id2);
1317 assert!(!sticky.is_invalidated());
1318 }
1319
1320 #[test]
1321 fn test_room_subscriptions_are_sticky() {
1322 let r0 = room_id!("!r0.matrix.org");
1323 let r1 = room_id!("!r1:matrix.org");
1324
1325 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1326 BTreeMap::new(),
1327 Default::default(),
1328 ));
1329
1330 {
1332 sticky
1334 .data_mut()
1335 .room_subscriptions
1336 .insert(r0.to_owned(), (Default::default(), Default::default()));
1337
1338 let txn_id: &TransactionId = "tid0".into();
1340 let mut request = http::Request::default();
1341 request.txn_id = Some(txn_id.to_string());
1342
1343 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1344
1345 assert!(request.txn_id.is_some());
1346 assert_eq!(request.room_subscriptions.len(), 1);
1347 assert!(request.room_subscriptions.contains_key(r0));
1348
1349 let tid = request.txn_id.unwrap();
1351
1352 sticky.maybe_commit(tid.as_str().into());
1353 }
1354
1355 {
1357 sticky
1359 .data_mut()
1360 .room_subscriptions
1361 .insert(r1.to_owned(), (Default::default(), Default::default()));
1362
1363 let txn_id: &TransactionId = "tid1".into();
1365 let mut request = http::Request::default();
1366 request.txn_id = Some(txn_id.to_string());
1367
1368 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1369
1370 assert!(request.txn_id.is_some());
1371 assert_eq!(request.room_subscriptions.len(), 1);
1372 assert!(request.room_subscriptions.contains_key(r1));
1374
1375 }
1379
1380 {
1382 let txn_id: &TransactionId = "tid2".into();
1384 let mut request = http::Request::default();
1385 request.txn_id = Some(txn_id.to_string());
1386
1387 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1388
1389 assert!(request.txn_id.is_some());
1390 assert_eq!(request.room_subscriptions.len(), 1);
1391 assert!(request.room_subscriptions.contains_key(r1));
1393
1394 let tid = request.txn_id.unwrap();
1396
1397 sticky.maybe_commit(tid.as_str().into());
1398 }
1399
1400 {
1402 let txn_id: &TransactionId = "tid3".into();
1404 let mut request = http::Request::default();
1405 request.txn_id = Some(txn_id.to_string());
1406
1407 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1408
1409 assert!(request.txn_id.is_some());
1410 assert!(request.room_subscriptions.is_empty());
1412 }
1413 }
1414
1415 #[test]
1416 fn test_extensions_are_sticky() {
1417 let mut extensions = http::request::Extensions::default();
1418 extensions.account_data.enabled = Some(true);
1419
1420 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1422 Default::default(),
1423 extensions,
1424 ));
1425
1426 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1427
1428 let extensions = &sticky.data().extensions;
1431 assert_eq!(extensions.e2ee.enabled, None);
1432 assert_eq!(extensions.to_device.enabled, None);
1433 assert_eq!(extensions.to_device.since, None);
1434
1435 assert_eq!(extensions.account_data.enabled, Some(true));
1437
1438 let txn_id: &TransactionId = "tid123".into();
1439 let mut request = http::Request::default();
1440 request.txn_id = Some(txn_id.to_string());
1441 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1442 assert!(sticky.is_invalidated());
1443 assert_eq!(request.extensions.to_device.enabled, None);
1444 assert_eq!(request.extensions.to_device.since, None);
1445 assert_eq!(request.extensions.e2ee.enabled, None);
1446 assert_eq!(request.extensions.account_data.enabled, Some(true));
1447 }
1448
1449 #[async_test]
1450 async fn test_sticky_extensions_plus_since() -> Result<()> {
1451 let server = MockServer::start().await;
1452 let client = logged_in_client(Some(server.uri())).await;
1453
1454 let sync = client
1455 .sliding_sync("test-slidingsync")?
1456 .add_list(SlidingSyncList::builder("new_list"))
1457 .build()
1458 .await?;
1459
1460 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1462 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1463 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1464
1465 let sync = client
1467 .sliding_sync("test-slidingsync")?
1468 .add_list(SlidingSyncList::builder("new_list"))
1469 .with_to_device_extension(
1470 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1471 )
1472 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1473 .build()
1474 .await?;
1475
1476 let txn_id = TransactionId::new();
1479 let (request, _, _) = sync
1480 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1481 .await?;
1482
1483 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1484 assert_eq!(request.extensions.to_device.enabled, Some(true));
1485 assert!(request.extensions.to_device.since.is_none());
1486
1487 {
1488 let mut sticky = sync.inner.sticky.write().unwrap();
1490 assert!(sticky.is_invalidated());
1491 sticky.maybe_commit(
1492 "hopefully the rng won't generate this very specific transaction id".into(),
1493 );
1494 assert!(sticky.is_invalidated());
1495 }
1496
1497 let txn_id2 = TransactionId::new();
1499 let (request, _, _) = sync
1500 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1501 .await?;
1502
1503 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1504 assert_eq!(request.extensions.to_device.enabled, Some(true));
1505 assert!(request.extensions.to_device.since.is_none());
1506
1507 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1508
1509 {
1510 let mut sticky = sync.inner.sticky.write().unwrap();
1512 assert!(sticky.is_invalidated());
1513 sticky.maybe_commit(txn_id2.as_str().into());
1514 assert!(!sticky.is_invalidated());
1515 }
1516
1517 let txn_id = TransactionId::new();
1519 let (request, _, _) = sync
1520 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1521 .await?;
1522 assert!(request.extensions.e2ee.enabled.is_none());
1523 assert!(request.extensions.to_device.enabled.is_none());
1524 assert!(request.extensions.to_device.since.is_none());
1525
1526 let _since_token = "since";
1530
1531 #[cfg(feature = "e2e-encryption")]
1532 {
1533 use matrix_sdk_base::crypto::store::Changes;
1534 if let Some(olm_machine) = &*client.olm_machine().await {
1535 olm_machine
1536 .store()
1537 .save_changes(Changes {
1538 next_batch_token: Some(_since_token.to_owned()),
1539 ..Default::default()
1540 })
1541 .await?;
1542 }
1543 }
1544
1545 let txn_id = TransactionId::new();
1546 let (request, _, _) = sync
1547 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1548 .await?;
1549
1550 assert!(request.extensions.e2ee.enabled.is_none());
1551 assert!(request.extensions.to_device.enabled.is_none());
1552
1553 #[cfg(feature = "e2e-encryption")]
1554 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1555
1556 Ok(())
1557 }
1558
1559 #[async_test]
1565 #[cfg(feature = "e2e-encryption")]
1566 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1567 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1568 use matrix_sdk_test::ruma_response_from_json;
1569 use ruma::user_id;
1570
1571 let server = MockServer::start().await;
1572 let client = logged_in_client(Some(server.uri())).await;
1573
1574 let alice = user_id!("@alice:localhost");
1575 let bob = user_id!("@bob:localhost");
1576 let me = user_id!("@example:localhost");
1577
1578 {
1581 let olm_machine = client.olm_machine().await;
1582 let olm_machine = olm_machine.as_ref().unwrap();
1583
1584 olm_machine.update_tracked_users([alice, bob]).await?;
1585
1586 let outgoing_requests = olm_machine.outgoing_requests().await?;
1588
1589 assert_eq!(outgoing_requests.len(), 2);
1590 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1591 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1592
1593 olm_machine
1595 .mark_request_as_sent(
1596 outgoing_requests[0].request_id(),
1597 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1598 "one_time_key_counts": {}
1599 }))),
1600 )
1601 .await?;
1602
1603 olm_machine
1604 .mark_request_as_sent(
1605 outgoing_requests[1].request_id(),
1606 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1607 "device_keys": {
1608 alice: {},
1609 bob: {},
1610 }
1611 }))),
1612 )
1613 .await?;
1614
1615 let outgoing_requests = olm_machine.outgoing_requests().await?;
1617
1618 assert_eq!(outgoing_requests.len(), 1);
1619 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1620
1621 olm_machine
1622 .mark_request_as_sent(
1623 outgoing_requests[0].request_id(),
1624 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1625 "device_keys": {
1626 me: {},
1627 }
1628 }))),
1629 )
1630 .await?;
1631
1632 let outgoing_requests = olm_machine.outgoing_requests().await?;
1634
1635 assert!(outgoing_requests.is_empty());
1636 }
1637
1638 let sync = client
1639 .sliding_sync("test-slidingsync")?
1640 .add_list(SlidingSyncList::builder("new_list"))
1641 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1642 .build()
1643 .await?;
1644
1645 let txn_id = TransactionId::new();
1647 let (_request, _, _) = sync
1648 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1649 .await?;
1650
1651 {
1653 let olm_machine = client.olm_machine().await;
1654 let olm_machine = olm_machine.as_ref().unwrap();
1655
1656 let outgoing_requests = olm_machine.outgoing_requests().await?;
1658
1659 assert_eq!(outgoing_requests.len(), 1);
1660 assert_matches!(
1661 outgoing_requests[0].request(),
1662 AnyOutgoingRequest::KeysQuery(request) => {
1663 assert!(request.device_keys.contains_key(alice));
1664 assert!(request.device_keys.contains_key(bob));
1665 assert!(request.device_keys.contains_key(me));
1666 }
1667 );
1668
1669 olm_machine
1671 .mark_request_as_sent(
1672 outgoing_requests[0].request_id(),
1673 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1674 "device_keys": {
1675 alice: {},
1676 bob: {},
1677 me: {},
1678 }
1679 }))),
1680 )
1681 .await?;
1682 }
1683
1684 sync.set_pos("chocolat".to_owned()).await;
1686
1687 let txn_id = TransactionId::new();
1688 let (_request, _, _) = sync
1689 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1690 .await?;
1691
1692 {
1694 let olm_machine = client.olm_machine().await;
1695 let olm_machine = olm_machine.as_ref().unwrap();
1696
1697 let outgoing_requests = olm_machine.outgoing_requests().await?;
1699
1700 assert!(outgoing_requests.is_empty());
1701 }
1702
1703 Ok(())
1704 }
1705
1706 #[async_test]
1707 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1708 let server = MockServer::start().await;
1709 let client = logged_in_client(Some(server.uri())).await;
1710
1711 let sliding_sync = client
1712 .sliding_sync("test-slidingsync")?
1713 .with_to_device_extension(
1714 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1715 )
1716 .build()
1717 .await?;
1718
1719 let (request, _, _) =
1721 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1722 assert!(request.extensions.to_device.enabled.is_some());
1723
1724 let sync = sliding_sync.sync();
1725 pin_mut!(sync);
1726
1727 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1729
1730 #[derive(Deserialize)]
1731 struct PartialRequest {
1732 txn_id: Option<String>,
1733 }
1734
1735 {
1736 let _mock_guard = Mock::given(SlidingSyncMatcher)
1737 .respond_with(|request: &Request| {
1738 let request: PartialRequest = request.body_json().unwrap();
1740
1741 ResponseTemplate::new(200).set_body_json(json!({
1742 "txn_id": request.txn_id,
1743 "pos": "0",
1744 }))
1745 })
1746 .mount_as_scoped(&server)
1747 .await;
1748
1749 let next = sync.next().await;
1750 assert_matches!(next, Some(Ok(_update_summary)));
1751
1752 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1754 }
1755
1756 let (request, _, _) =
1758 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1759 assert!(request.extensions.to_device.enabled.is_none());
1760
1761 {
1763 let _mock_guard = Mock::given(SlidingSyncMatcher)
1764 .respond_with(|request: &Request| {
1765 let request: PartialRequest = request.body_json().unwrap();
1767
1768 ResponseTemplate::new(200).set_body_json(json!({
1769 "txn_id": request.txn_id,
1770 "pos": "1",
1771 }))
1772 })
1773 .mount_as_scoped(&server)
1774 .await;
1775
1776 let next = sync.next().await;
1777 assert_matches!(next, Some(Ok(_update_summary)));
1778
1779 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1781 }
1782
1783 {
1786 let _mock_guard = Mock::given(SlidingSyncMatcher)
1787 .respond_with(|request: &Request| {
1788 let request: PartialRequest = request.body_json().unwrap();
1790
1791 ResponseTemplate::new(200).set_body_json(json!({
1792 "txn_id": request.txn_id,
1793 "pos": "0", }))
1795 })
1796 .up_to_n_times(1) .mount_as_scoped(&server)
1798 .await;
1799
1800 let next = sync.next().await;
1801 assert_matches!(next, Some(Ok(_update_summary)));
1802
1803 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1805 }
1806
1807 {
1812 let _mock_guard = Mock::given(SlidingSyncMatcher)
1813 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1814 "error": "foo",
1815 "errcode": "M_UNKNOWN_POS",
1816 })))
1817 .mount_as_scoped(&server)
1818 .await;
1819
1820 let next = sync.next().await;
1821
1822 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1824
1825 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1827
1828 let (request, _, _) =
1830 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1831
1832 assert!(request.extensions.to_device.enabled.is_some());
1833
1834 assert!(sync.next().await.is_none());
1836 }
1837
1838 Ok(())
1839 }
1840
1841 #[cfg(feature = "e2e-encryption")]
1842 #[async_test]
1843 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1844 let server = MockServer::start().await;
1845
1846 #[derive(Deserialize)]
1847 struct PartialRequest {
1848 txn_id: Option<String>,
1849 }
1850
1851 let server_pos = Arc::new(Mutex::new(0));
1852 let _mock_guard = Mock::given(SlidingSyncMatcher)
1853 .respond_with(move |request: &Request| {
1854 let request: PartialRequest = request.body_json().unwrap();
1856 let pos = {
1857 let mut pos = server_pos.lock().unwrap();
1858 let prev = *pos;
1859 *pos += 1;
1860 prev
1861 };
1862
1863 ResponseTemplate::new(200).set_body_json(json!({
1864 "txn_id": request.txn_id,
1865 "pos": pos.to_string(),
1866 }))
1867 })
1868 .mount_as_scoped(&server)
1869 .await;
1870
1871 let client = logged_in_client(Some(server.uri())).await;
1872
1873 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1874
1875 {
1877 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1878
1879 let (request, _, _) =
1880 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1881 assert!(request.pos.is_none());
1882 }
1883
1884 let sync = sliding_sync.sync();
1885 pin_mut!(sync);
1886
1887 let next = sync.next().await;
1890 assert_matches!(next, Some(Ok(_update_summary)));
1891
1892 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1893
1894 let restored_fields = restore_sliding_sync_state(
1895 &client,
1896 &sliding_sync.inner.storage_key,
1897 &*sliding_sync.inner.lists.read().await,
1898 )
1899 .await?
1900 .expect("must have restored fields");
1901
1902 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1905
1906 {
1910 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1911
1912 let mut position_guard = other_sync.inner.position.lock().await;
1913 position_guard.pos = Some("yolo".to_owned());
1914
1915 other_sync.cache_to_storage(&position_guard).await?;
1916 }
1917
1918 {
1920 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1921 let (request, _, _) =
1922 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1923 assert_eq!(request.pos.as_deref(), Some("0"));
1924 }
1925
1926 {
1929 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1930 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1931 }
1932
1933 Ok(())
1934 }
1935
1936 #[cfg(feature = "e2e-encryption")]
1937 #[async_test]
1938 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1939 let server = MockServer::start().await;
1940
1941 #[derive(Deserialize)]
1942 struct PartialRequest {
1943 txn_id: Option<String>,
1944 }
1945
1946 let server_pos = Arc::new(Mutex::new(0));
1947 let _mock_guard = Mock::given(SlidingSyncMatcher)
1948 .respond_with(move |request: &Request| {
1949 let request: PartialRequest = request.body_json().unwrap();
1951 let pos = {
1952 let mut pos = server_pos.lock().unwrap();
1953 let prev = *pos;
1954 *pos += 1;
1955 prev
1956 };
1957
1958 ResponseTemplate::new(200).set_body_json(json!({
1959 "txn_id": request.txn_id,
1960 "pos": pos.to_string(),
1961 }))
1962 })
1963 .mount_as_scoped(&server)
1964 .await;
1965
1966 let client = logged_in_client(Some(server.uri())).await;
1967
1968 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1969
1970 {
1972 let (request, _, _) =
1973 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1974
1975 assert!(request.pos.is_none());
1976 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1977 }
1978
1979 let sync = sliding_sync.sync();
1980 pin_mut!(sync);
1981
1982 let next = sync.next().await;
1985 assert_matches!(next, Some(Ok(_update_summary)));
1986
1987 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1988
1989 let restored_fields = restore_sliding_sync_state(
1990 &client,
1991 &sliding_sync.inner.storage_key,
1992 &*sliding_sync.inner.lists.read().await,
1993 )
1994 .await?
1995 .expect("must have restored fields");
1996
1997 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2000
2001 {
2003 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
2004
2005 let mut position_guard = other_sync.inner.position.lock().await;
2006 position_guard.pos = Some("42".to_owned());
2007
2008 other_sync.cache_to_storage(&position_guard).await?;
2009 }
2010
2011 {
2013 let (request, _, _) =
2014 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2015 assert_eq!(request.pos.as_deref(), Some("42"));
2016 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2017 }
2018
2019 {
2021 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2022 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2023
2024 let (request, _, _) =
2025 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2026 assert_eq!(request.pos.as_deref(), Some("42"));
2027 }
2028
2029 sliding_sync.expire_session().await;
2032
2033 {
2034 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2035
2036 let (request, _, _) =
2037 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2038 assert!(request.pos.is_none());
2039 }
2040
2041 {
2043 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2044 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2045
2046 let (request, _, _) =
2047 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2048 assert!(request.pos.is_none());
2049 }
2050
2051 Ok(())
2052 }
2053
2054 #[async_test]
2055 async fn test_stop_sync_loop() -> Result<()> {
2056 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2057 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2058 .await?;
2059
2060 let stream = sliding_sync.sync();
2062 pin_mut!(stream);
2063
2064 assert!(stream.next().await.is_some());
2066
2067 sliding_sync.stop_sync()?;
2069
2070 assert!(stream.next().await.is_none());
2072
2073 let stream = sliding_sync.sync();
2075 pin_mut!(stream);
2076
2077 assert!(stream.next().await.is_some());
2079
2080 Ok(())
2081 }
2082
2083 #[async_test]
2084 async fn test_process_read_receipts() -> Result<()> {
2085 let room = owned_room_id!("!pony:example.org");
2086
2087 let server = MockServer::start().await;
2088 let client = logged_in_client(Some(server.uri())).await;
2089
2090 let sliding_sync = client
2091 .sliding_sync("test")?
2092 .with_receipt_extension(
2093 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2094 )
2095 .add_list(
2096 SlidingSyncList::builder("all")
2097 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2098 )
2099 .build()
2100 .await?;
2101
2102 {
2104 let server_response = assign!(http::Response::new("0".to_owned()), {
2105 rooms: BTreeMap::from([(
2106 room.clone(),
2107 http::response::Room::default(),
2108 )])
2109 });
2110
2111 let _summary = {
2112 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2113 sliding_sync
2114 .handle_response(
2115 server_response.clone(),
2116 &mut pos_guard,
2117 RequestedRequiredStates::default(),
2118 )
2119 .await?
2120 };
2121 }
2122
2123 let server_response = assign!(http::Response::new("1".to_owned()), {
2124 extensions: assign!(http::response::Extensions::default(), {
2125 receipts: assign!(http::response::Receipts::default(), {
2126 rooms: BTreeMap::from([
2127 (
2128 room.clone(),
2129 Raw::from_json_string(
2130 json!({
2131 "room_id": room,
2132 "type": "m.receipt",
2133 "content": {
2134 "$event:bar.org": {
2135 "m.read": {
2136 client.user_id().unwrap(): {
2137 "ts": 1436451550,
2138 }
2139 }
2140 }
2141 }
2142 })
2143 .to_string(),
2144 ).unwrap()
2145 )
2146 ])
2147 })
2148 })
2149 });
2150
2151 let summary = {
2152 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2153 sliding_sync
2154 .handle_response(
2155 server_response.clone(),
2156 &mut pos_guard,
2157 RequestedRequiredStates::default(),
2158 )
2159 .await?
2160 };
2161
2162 assert!(summary.rooms.contains(&room));
2163
2164 Ok(())
2165 }
2166
2167 #[async_test]
2168 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2169 let room_id = owned_room_id!("!unicorn:example.org");
2170
2171 let server = MockServer::start().await;
2172 let client = logged_in_client(Some(server.uri())).await;
2173
2174 let sliding_sync = client
2177 .sliding_sync("test")?
2178 .with_account_data_extension(
2179 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2180 )
2181 .add_list(
2182 SlidingSyncList::builder("all")
2183 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2184 )
2185 .build()
2186 .await?;
2187
2188 {
2190 let server_response = assign!(http::Response::new("0".to_owned()), {
2191 rooms: BTreeMap::from([(
2192 room_id.clone(),
2193 http::response::Room::default(),
2194 )])
2195 });
2196
2197 let _summary = {
2198 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2199 sliding_sync
2200 .handle_response(
2201 server_response.clone(),
2202 &mut pos_guard,
2203 RequestedRequiredStates::default(),
2204 )
2205 .await?
2206 };
2207 }
2208
2209 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2213
2214 let update_summary = {
2215 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2216 sliding_sync
2217 .handle_response(
2218 server_response.clone(),
2219 &mut pos_guard,
2220 RequestedRequiredStates::default(),
2221 )
2222 .await?
2223 };
2224
2225 assert!(update_summary.rooms.contains(&room_id));
2228
2229 let room = client.get_room(&room_id).unwrap();
2230
2231 assert!(room.is_marked_unread());
2234
2235 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2238
2239 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2240 sliding_sync
2241 .handle_response(
2242 server_response.clone(),
2243 &mut pos_guard,
2244 RequestedRequiredStates::default(),
2245 )
2246 .await?;
2247
2248 let room = client.get_room(&room_id).unwrap();
2249
2250 assert!(!room.is_marked_unread());
2251
2252 Ok(())
2253 }
2254
2255 fn make_mark_unread_response(
2256 response_number: &str,
2257 room_id: OwnedRoomId,
2258 unread: bool,
2259 add_rooms_section: bool,
2260 ) -> http::Response {
2261 let rooms = if add_rooms_section {
2262 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2263 } else {
2264 BTreeMap::new()
2265 };
2266
2267 let extensions = assign!(http::response::Extensions::default(), {
2268 account_data: assign!(http::response::AccountData::default(), {
2269 rooms: BTreeMap::from([
2270 (
2271 room_id,
2272 vec![
2273 Raw::from_json_string(
2274 json!({
2275 "content": {
2276 "unread": unread
2277 },
2278 "type": "com.famedly.marked_unread"
2279 })
2280 .to_string(),
2281 ).unwrap()
2282 ]
2283 )
2284 ])
2285 })
2286 });
2287
2288 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2289 }
2290
2291 #[async_test]
2292 async fn test_process_rooms_account_data() -> Result<()> {
2293 let room = owned_room_id!("!pony:example.org");
2294
2295 let server = MockServer::start().await;
2296 let client = logged_in_client(Some(server.uri())).await;
2297
2298 let sliding_sync = client
2299 .sliding_sync("test")?
2300 .with_account_data_extension(
2301 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2302 )
2303 .add_list(
2304 SlidingSyncList::builder("all")
2305 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2306 )
2307 .build()
2308 .await?;
2309
2310 {
2312 let server_response = assign!(http::Response::new("0".to_owned()), {
2313 rooms: BTreeMap::from([(
2314 room.clone(),
2315 http::response::Room::default(),
2316 )])
2317 });
2318
2319 let _summary = {
2320 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2321 sliding_sync
2322 .handle_response(
2323 server_response.clone(),
2324 &mut pos_guard,
2325 RequestedRequiredStates::default(),
2326 )
2327 .await?
2328 };
2329 }
2330
2331 let server_response = assign!(http::Response::new("1".to_owned()), {
2332 extensions: assign!(http::response::Extensions::default(), {
2333 account_data: assign!(http::response::AccountData::default(), {
2334 rooms: BTreeMap::from([
2335 (
2336 room.clone(),
2337 vec![
2338 Raw::from_json_string(
2339 json!({
2340 "content": {
2341 "tags": {
2342 "u.work": {
2343 "order": 0.9
2344 }
2345 }
2346 },
2347 "type": "m.tag"
2348 })
2349 .to_string(),
2350 ).unwrap()
2351 ]
2352 )
2353 ])
2354 })
2355 })
2356 });
2357 let summary = {
2358 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2359 sliding_sync
2360 .handle_response(
2361 server_response.clone(),
2362 &mut pos_guard,
2363 RequestedRequiredStates::default(),
2364 )
2365 .await?
2366 };
2367
2368 assert!(summary.rooms.contains(&room));
2369
2370 Ok(())
2371 }
2372
2373 #[async_test]
2374 #[cfg(feature = "e2e-encryption")]
2375 async fn test_process_only_encryption_events() -> Result<()> {
2376 use ruma::OneTimeKeyAlgorithm;
2377
2378 let room = owned_room_id!("!croissant:example.org");
2379
2380 let server = MockServer::start().await;
2381 let client = logged_in_client(Some(server.uri())).await;
2382
2383 let server_response = assign!(http::Response::new("0".to_owned()), {
2384 rooms: BTreeMap::from([(
2385 room.clone(),
2386 assign!(http::response::Room::default(), {
2387 name: Some("Croissants lovers".to_owned()),
2388 timeline: Vec::new(),
2389 }),
2390 )]),
2391
2392 extensions: assign!(http::response::Extensions::default(), {
2393 e2ee: assign!(http::response::E2EE::default(), {
2394 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2395 }),
2396 to_device: Some(assign!(http::response::ToDevice::default(), {
2397 next_batch: "to-device-token".to_owned(),
2398 })),
2399 })
2400 });
2401
2402 let sliding_sync = client
2406 .sliding_sync("test")?
2407 .with_to_device_extension(
2408 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2409 )
2410 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2411 .build()
2412 .await?;
2413
2414 {
2415 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2416
2417 sliding_sync
2418 .handle_response(
2419 server_response.clone(),
2420 &mut position_guard,
2421 RequestedRequiredStates::default(),
2422 )
2423 .await?;
2424 }
2425
2426 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2428 assert_eq!(uploaded_key_count, 42);
2429
2430 {
2431 let olm_machine = &*client.olm_machine_for_testing().await;
2432 assert_eq!(
2433 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2434 Some("to-device-token")
2435 );
2436 }
2437
2438 assert!(client.get_room(&room).is_none());
2440
2441 let client = logged_in_client(Some(server.uri())).await;
2444
2445 let sliding_sync = client
2446 .sliding_sync("test")?
2447 .add_list(SlidingSyncList::builder("thelist"))
2448 .build()
2449 .await?;
2450
2451 {
2452 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2453
2454 sliding_sync
2455 .handle_response(
2456 server_response.clone(),
2457 &mut position_guard,
2458 RequestedRequiredStates::default(),
2459 )
2460 .await?;
2461 }
2462
2463 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2465 assert_eq!(uploaded_key_count, 0);
2466
2467 {
2468 let olm_machine = &*client.olm_machine_for_testing().await;
2469 assert_eq!(
2470 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2471 None
2472 );
2473 }
2474
2475 assert!(client.get_room(&room).is_some());
2477
2478 let client = logged_in_client(Some(server.uri())).await;
2480
2481 let sliding_sync = client
2482 .sliding_sync("test")?
2483 .add_list(SlidingSyncList::builder("thelist"))
2484 .with_to_device_extension(
2485 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2486 )
2487 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2488 .build()
2489 .await?;
2490
2491 {
2492 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2493
2494 sliding_sync
2495 .handle_response(
2496 server_response.clone(),
2497 &mut position_guard,
2498 RequestedRequiredStates::default(),
2499 )
2500 .await?;
2501 }
2502
2503 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2505 assert_eq!(uploaded_key_count, 42);
2506
2507 {
2508 let olm_machine = &*client.olm_machine_for_testing().await;
2509 assert_eq!(
2510 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2511 Some("to-device-token")
2512 );
2513 }
2514
2515 assert!(client.get_room(&room).is_some());
2517
2518 Ok(())
2519 }
2520
2521 #[async_test]
2522 async fn test_lock_multiple_requests() -> Result<()> {
2523 let server = MockServer::start().await;
2524 let client = logged_in_client(Some(server.uri())).await;
2525
2526 let pos = Arc::new(Mutex::new(0));
2527 let _mock_guard = Mock::given(SlidingSyncMatcher)
2528 .respond_with(move |_: &Request| {
2529 let mut pos = pos.lock().unwrap();
2530 *pos += 1;
2531 ResponseTemplate::new(200).set_body_json(json!({
2532 "pos": pos.to_string(),
2533 "lists": {},
2534 "rooms": {}
2535 }))
2536 })
2537 .mount_as_scoped(&server)
2538 .await;
2539
2540 let sliding_sync = client
2541 .sliding_sync("test")?
2542 .with_to_device_extension(
2543 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2544 )
2545 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2546 .build()
2547 .await?;
2548
2549 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2552
2553 for result in requests.await {
2554 result?;
2555 }
2556
2557 Ok(())
2558 }
2559
2560 #[async_test]
2561 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2562 let server = MockServer::start().await;
2563 let client = logged_in_client(Some(server.uri())).await;
2564
2565 let pos = Arc::new(Mutex::new(0));
2566 let _mock_guard = Mock::given(SlidingSyncMatcher)
2567 .respond_with(move |_: &Request| {
2568 let mut pos = pos.lock().unwrap();
2569 *pos += 1;
2570 ResponseTemplate::new(200)
2572 .set_body_json(json!({
2573 "pos": pos.to_string(),
2574 "lists": {},
2575 "rooms": {}
2576 }))
2577 .set_delay(Duration::from_secs(2))
2578 })
2579 .mount_as_scoped(&server)
2580 .await;
2581
2582 let sliding_sync =
2583 client
2584 .sliding_sync("test")?
2585 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2586 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2587 ))
2588 .add_list(
2589 SlidingSyncList::builder("another-list")
2590 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2591 )
2592 .build()
2593 .await?;
2594
2595 let stream = sliding_sync.sync();
2596 pin_mut!(stream);
2597
2598 let cloned_sync = sliding_sync.clone();
2599 tokio::spawn(async move {
2600 tokio::time::sleep(Duration::from_millis(100)).await;
2601
2602 cloned_sync
2603 .on_list("another-list", |list| {
2604 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2605 ready(())
2606 })
2607 .await;
2608 });
2609
2610 assert_matches!(stream.next().await, Some(Ok(_)));
2611
2612 sliding_sync.stop_sync().unwrap();
2613
2614 assert_matches!(stream.next().await, None);
2615
2616 let mut num_requests = 0;
2617
2618 for request in server.received_requests().await.unwrap() {
2619 if !SlidingSyncMatcher.matches(&request) {
2620 continue;
2621 }
2622
2623 let another_list_ranges = if num_requests == 0 {
2624 json!([[0, 10]])
2626 } else {
2627 json!([[10, 20]])
2629 };
2630
2631 num_requests += 1;
2632 assert!(num_requests <= 2, "more than one request hit the server");
2633
2634 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2635
2636 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2637 &json_value,
2638 &json!({
2639 "conn_id": "test",
2640 "lists": {
2641 "room-list": {
2642 "ranges": [[0, 9]],
2643 "required_state": [
2644 ["m.room.encryption", ""],
2645 ["m.room.tombstone", ""]
2646 ],
2647 },
2648 "another-list": {
2649 "ranges": another_list_ranges,
2650 "required_state": [
2651 ["m.room.encryption", ""],
2652 ["m.room.tombstone", ""]
2653 ],
2654 },
2655 }
2656 }),
2657 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2658 ) {
2659 dbg!(json_value);
2660 panic!("json differ: {err}");
2661 }
2662 }
2663
2664 assert_eq!(num_requests, 2);
2665
2666 Ok(())
2667 }
2668
2669 #[async_test]
2670 async fn test_timeout_zero_list() -> Result<()> {
2671 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2672
2673 let (request, _, _) =
2674 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2675
2676 assert!(request.timeout.is_some());
2679
2680 Ok(())
2681 }
2682
2683 #[async_test]
2684 async fn test_timeout_one_list() -> Result<()> {
2685 let (_server, sliding_sync) = new_sliding_sync(vec![
2686 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2687 ])
2688 .await?;
2689
2690 let (request, _, _) =
2691 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2692
2693 assert!(request.timeout.is_none());
2695
2696 {
2698 let server_response = assign!(http::Response::new("0".to_owned()), {
2699 lists: BTreeMap::from([(
2700 "foo".to_owned(),
2701 assign!(http::response::List::default(), {
2702 count: uint!(7),
2703 })
2704 )])
2705 });
2706
2707 let _summary = {
2708 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2709 sliding_sync
2710 .handle_response(
2711 server_response.clone(),
2712 &mut pos_guard,
2713 RequestedRequiredStates::default(),
2714 )
2715 .await?
2716 };
2717 }
2718
2719 let (request, _, _) =
2720 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2721
2722 assert!(request.timeout.is_some());
2724
2725 Ok(())
2726 }
2727
2728 #[async_test]
2729 async fn test_timeout_three_lists() -> Result<()> {
2730 let (_server, sliding_sync) = new_sliding_sync(vec![
2731 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2732 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2733 SlidingSyncList::builder("baz")
2734 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2735 ])
2736 .await?;
2737
2738 let (request, _, _) =
2739 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2740
2741 assert!(request.timeout.is_none());
2743
2744 {
2746 let server_response = assign!(http::Response::new("0".to_owned()), {
2747 lists: BTreeMap::from([(
2748 "foo".to_owned(),
2749 assign!(http::response::List::default(), {
2750 count: uint!(7),
2751 })
2752 )])
2753 });
2754
2755 let _summary = {
2756 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2757 sliding_sync
2758 .handle_response(
2759 server_response.clone(),
2760 &mut pos_guard,
2761 RequestedRequiredStates::default(),
2762 )
2763 .await?
2764 };
2765 }
2766
2767 let (request, _, _) =
2768 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2769
2770 assert!(request.timeout.is_none());
2772
2773 {
2775 let server_response = assign!(http::Response::new("1".to_owned()), {
2776 lists: BTreeMap::from([(
2777 "bar".to_owned(),
2778 assign!(http::response::List::default(), {
2779 count: uint!(7),
2780 })
2781 )])
2782 });
2783
2784 let _summary = {
2785 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2786 sliding_sync
2787 .handle_response(
2788 server_response.clone(),
2789 &mut pos_guard,
2790 RequestedRequiredStates::default(),
2791 )
2792 .await?
2793 };
2794 }
2795
2796 let (request, _, _) =
2797 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2798
2799 assert!(request.timeout.is_some());
2801
2802 Ok(())
2803 }
2804
2805 #[async_test]
2806 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2807 let server = MockServer::start().await;
2808 let client = logged_in_client(Some(server.uri())).await;
2809
2810 let _mock_guard = Mock::given(SlidingSyncMatcher)
2811 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2812 "pos": "0",
2813 "lists": {},
2814 "rooms": {}
2815 })))
2816 .mount_as_scoped(&server)
2817 .await;
2818
2819 let sliding_sync = client
2820 .sliding_sync("test")?
2821 .with_to_device_extension(
2822 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2823 )
2824 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2825 .build()
2826 .await?;
2827
2828 let sliding_sync = Arc::new(sliding_sync);
2829
2830 let sync_beat_listener = client.inner.sync_beat.listen();
2832 sliding_sync.sync_once().await?;
2833
2834 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2836 Ok(())
2837 }
2838
2839 #[async_test]
2840 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2841 let server = MockServer::start().await;
2842 let client = logged_in_client(Some(server.uri())).await;
2843
2844 let _mock_guard = Mock::given(SlidingSyncMatcher)
2845 .respond_with(ResponseTemplate::new(404))
2846 .mount_as_scoped(&server)
2847 .await;
2848
2849 let sliding_sync = client
2850 .sliding_sync("test")?
2851 .with_to_device_extension(
2852 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2853 )
2854 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2855 .build()
2856 .await?;
2857
2858 let sliding_sync = Arc::new(sliding_sync);
2859
2860 let sync_beat_listener = client.inner.sync_beat.listen();
2862 let sync_result = sliding_sync.sync_once().await;
2863 assert!(sync_result.is_err());
2864
2865 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2867
2868 Ok(())
2869 }
2870}