1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28 collections::{btree_map::Entry, BTreeMap},
29 fmt::Debug,
30 future::Future,
31 sync::{Arc, RwLock as StdRwLock},
32 time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
39use ruma::{
40 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
41 assign, OwnedRoomId, RoomId,
42};
43use serde::{Deserialize, Serialize};
44use tokio::{
45 select,
46 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
47};
48use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
49
50#[cfg(feature = "e2e-encryption")]
51use self::utils::JoinHandleExt as _;
52pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
53use self::{
54 cache::restore_sliding_sync_state,
55 client::SlidingSyncResponseProcessor,
56 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
57};
58use crate::{config::RequestConfig, Client, Result};
59
60#[derive(Clone, Debug)]
64pub struct SlidingSync {
65 inner: Arc<SlidingSyncInner>,
67}
68
69#[derive(Debug)]
70pub(super) struct SlidingSyncInner {
71 id: String,
75
76 client: Client,
78
79 poll_timeout: Duration,
81
82 network_timeout: Duration,
85
86 storage_key: String,
88
89 share_pos: bool,
96
97 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
110
111 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
113
114 rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
116
117 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
119
120 internal_channel: Sender<SlidingSyncInternalMessage>,
123}
124
125impl SlidingSync {
126 pub(super) fn new(inner: SlidingSyncInner) -> Self {
127 Self { inner: Arc::new(inner) }
128 }
129
130 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
131 cache::store_sliding_sync_state(self, position).await
132 }
133
134 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
136 SlidingSyncBuilder::new(id, client)
137 }
138
139 pub fn subscribe_to_rooms(
146 &self,
147 room_ids: &[&RoomId],
148 settings: Option<http::request::RoomSubscription>,
149 cancel_in_flight_request: bool,
150 ) {
151 let settings = settings.unwrap_or_default();
152 let mut sticky = self.inner.sticky.write().unwrap();
153 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
154
155 let mut skip_over_current_sync_loop_iteration = false;
156
157 for room_id in room_ids {
158 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
165 if let Some(room) = self.inner.client.get_room(room_id) {
166 room.mark_members_missing();
167 }
168
169 entry.insert((RoomSubscriptionState::default(), settings.clone()));
170
171 skip_over_current_sync_loop_iteration = true;
172 }
173 }
174
175 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
176 self.inner.internal_channel_send_if_possible(
177 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
178 );
179 }
180 }
181
182 pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
184 self.inner.rooms.read().await.get(room_id).cloned()
185 }
186
187 pub fn get_number_of_rooms(&self) -> usize {
189 self.inner.rooms.blocking_read().len()
190 }
191
192 pub async fn on_list<Function, FunctionOutput, R>(
194 &self,
195 list_name: &str,
196 function: Function,
197 ) -> Option<R>
198 where
199 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
200 FunctionOutput: Future<Output = R>,
201 {
202 let lists = self.inner.lists.read().await;
203
204 match lists.get(list_name) {
205 Some(list) => Some(function(list).await),
206 None => None,
207 }
208 }
209
210 pub async fn add_list(
216 &self,
217 list_builder: SlidingSyncListBuilder,
218 ) -> Result<Option<SlidingSyncList>> {
219 let list = list_builder.build(self.inner.internal_channel.clone());
220
221 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
222
223 self.inner.internal_channel_send_if_possible(
224 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
225 );
226
227 Ok(old_list)
228 }
229
230 pub async fn add_cached_list(
237 &self,
238 mut list_builder: SlidingSyncListBuilder,
239 ) -> Result<Option<SlidingSyncList>> {
240 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
241
242 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
243
244 self.add_list(list_builder).await
245 }
246
247 pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
249 &self,
250 room_ids: I,
251 ) -> Vec<Option<SlidingSyncRoom>> {
252 let rooms = self.inner.rooms.read().await;
253
254 room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
255 }
256
257 pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
259 self.inner.rooms.read().await.values().cloned().collect()
260 }
261
262 #[instrument(skip_all)]
264 async fn handle_response(
265 &self,
266 sliding_sync_response: http::Response,
267 position: &mut SlidingSyncPositionMarkers,
268 ) -> Result<UpdateSummary, crate::Error> {
269 let pos = Some(sliding_sync_response.pos.clone());
270
271 let must_process_rooms_response = self.must_process_rooms_response().await;
272
273 trace!(yes = must_process_rooms_response, "Must process rooms response?");
274
275 let mut sync_response = {
283 let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
286
287 let rooms = &*self.inner.rooms.read().await;
288 let mut response_processor =
289 SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
290
291 #[cfg(feature = "e2e-encryption")]
292 if self.is_e2ee_enabled() {
293 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
294 }
295
296 if must_process_rooms_response {
299 response_processor.handle_room_response(&sliding_sync_response).await?;
300 }
301
302 response_processor.process_and_take_response().await?
303 };
304
305 debug!(?sync_response, "Sliding Sync response has been handled by the client");
306
307 if let Some(ref txn_id) = sliding_sync_response.txn_id {
309 let txn_id = txn_id.as_str().into();
310 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
311 let mut lists = self.inner.lists.write().await;
312 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
313 }
314
315 let update_summary = {
316 let updated_rooms = {
318 let mut rooms_map = self.inner.rooms.write().await;
319
320 let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
321
322 for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
323 let timeline =
327 if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
328 joined_room.timeline.events
329 } else {
330 room_data.timeline.drain(..).map(TimelineEvent::new).collect()
331 };
332
333 match rooms_map.get_mut(&room_id) {
334 Some(room) => {
336 room.update(room_data, timeline);
337 }
338
339 None => {
341 rooms_map.insert(
342 room_id.clone(),
343 SlidingSyncRoom::new(
344 room_id.clone(),
345 room_data.prev_batch,
346 timeline,
347 ),
348 );
349 }
350 }
351
352 updated_rooms.push(room_id);
353 }
354
355 updated_rooms.extend(sync_response.rooms.join.keys().cloned());
363
364 updated_rooms
365 };
366
367 let updated_lists = {
369 debug!(
370 lists = ?sliding_sync_response.lists,
371 "Update lists"
372 );
373
374 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
375 let mut lists = self.inner.lists.write().await;
376
377 for (name, list) in lists.iter_mut() {
380 if let Some(updates) = sliding_sync_response.lists.get(name) {
381 let maximum_number_of_rooms: u32 =
382 updates.count.try_into().expect("failed to convert `count` to `u32`");
383
384 if list.update(Some(maximum_number_of_rooms))? {
385 updated_lists.push(name.clone());
386 }
387 } else if list.update(None)? {
388 updated_lists.push(name.clone());
389 }
390 }
391
392 for name in sliding_sync_response.lists.keys() {
394 if !lists.contains_key(name) {
395 error!("Response for list `{name}` - unknown to us; skipping");
396 }
397 }
398
399 updated_lists
400 };
401
402 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
403 };
404
405 position.pos = pos;
409
410 Ok(update_summary)
411 }
412
413 async fn generate_sync_request(
414 &self,
415 txn_id: &mut LazyTransactionId,
416 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
417 let mut requests_lists = BTreeMap::new();
419
420 let require_timeout = {
421 let lists = self.inner.lists.read().await;
422
423 let mut require_timeout = true;
425
426 for (name, list) in lists.iter() {
427 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
428 require_timeout = require_timeout && list.requires_timeout();
429 }
430
431 require_timeout
432 };
433
434 let mut position_guard = self.inner.position.clone().lock_owned().await;
442
443 let to_device_enabled =
444 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
445
446 let restored_fields = if self.inner.share_pos || to_device_enabled {
447 let lists = self.inner.lists.read().await;
448 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
449 } else {
450 None
451 };
452
453 let pos = if self.inner.share_pos {
456 if let Some(fields) = &restored_fields {
457 if fields.pos != position_guard.pos {
459 info!(
460 "Pos from previous request ('{:?}') was different from \
461 pos in database ('{:?}').",
462 position_guard.pos, fields.pos
463 );
464 position_guard.pos = fields.pos.clone();
465 }
466 fields.pos.clone()
467 } else {
468 position_guard.pos.clone()
469 }
470 } else {
471 position_guard.pos.clone()
472 };
473
474 Span::current().record("pos", &pos);
475
476 #[cfg(feature = "e2e-encryption")]
485 if pos.is_none() && self.is_e2ee_enabled() {
486 info!("Marking all tracked users as dirty");
487
488 let olm_machine = self.inner.client.olm_machine().await;
489 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
490 olm_machine.mark_all_tracked_users_as_dirty().await?;
491 }
492
493 let timeout = require_timeout.then(|| self.inner.poll_timeout);
498
499 let mut request = assign!(http::Request::new(), {
500 conn_id: Some(self.inner.id.clone()),
501 pos,
502 timeout,
503 lists: requests_lists,
504 });
505
506 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
508
509 if to_device_enabled {
513 request.extensions.to_device.since =
514 restored_fields.and_then(|fields| fields.to_device_token);
515 }
516
517 if let Some(txn_id) = txn_id.get() {
519 request.txn_id = Some(txn_id.to_string());
520 }
521
522 Ok((
523 request,
525 RequestConfig::default()
528 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
529 .retry_limit(3),
530 position_guard,
531 ))
532 }
533
534 async fn send_sync_request(
538 &self,
539 request: http::Request,
540 request_config: RequestConfig,
541 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
542 ) -> Result<UpdateSummary> {
543 debug!("Sending request");
544
545 let request = self.inner.client.send(request).with_request_config(request_config);
547
548 #[cfg(feature = "e2e-encryption")]
555 let response = {
556 if self.is_e2ee_enabled() {
557 let client = self.inner.client.clone();
574 let e2ee_uploads = spawn(async move {
575 if let Err(error) = client.send_outgoing_requests().await {
576 error!(?error, "Error while sending outgoing E2EE requests");
577 }
578 })
579 .abort_on_drop();
582
583 let response = request.await?;
585
586 e2ee_uploads.await.map_err(|error| Error::JoinError {
591 task_description: "e2ee_uploads".to_owned(),
592 error,
593 })?;
594
595 response
596 } else {
597 request.await?
598 }
599 };
600
601 #[cfg(not(feature = "e2e-encryption"))]
603 let response = request.await?;
604
605 debug!("Received response");
606
607 let this = self.clone();
617
618 let future = async move {
621 debug!("Start handling response");
622
623 let updates = this.handle_response(response, &mut position_guard).await?;
629
630 this.cache_to_storage(&position_guard).await?;
631
632 drop(position_guard);
635
636 debug!("Done handling response");
637
638 Ok(updates)
639 };
640
641 spawn(future.instrument(Span::current())).await.unwrap()
642 }
643
644 #[cfg(feature = "e2e-encryption")]
646 fn is_e2ee_enabled(&self) -> bool {
647 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
648 }
649
650 #[cfg(not(feature = "e2e-encryption"))]
651 fn is_e2ee_enabled(&self) -> bool {
652 false
653 }
654
655 async fn must_process_rooms_response(&self) -> bool {
657 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
660 || !self.inner.lists.read().await.is_empty()
661 }
662
663 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
664 async fn sync_once(&self) -> Result<UpdateSummary> {
665 let (request, request_config, position_guard) =
666 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
667
668 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
670
671 self.inner.client.inner.sync_beat.notify(usize::MAX);
673
674 Ok(summaries)
675 }
676
677 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
687 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
688 debug!("Starting sync stream");
689
690 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
691
692 stream! {
693 loop {
694 debug!("Sync stream is running");
695
696 select! {
697 biased;
698
699 internal_message = internal_channel_receiver.recv() => {
700 use SlidingSyncInternalMessage::*;
701
702 debug!(?internal_message, "Sync stream has received an internal message");
703
704 match internal_message {
705 Err(_) | Ok(SyncLoopStop) => {
706 break;
707 }
708
709 Ok(SyncLoopSkipOverCurrentIteration) => {
710 continue;
711 }
712 }
713 }
714
715 update_summary = self.sync_once() => {
716 match update_summary {
717 Ok(updates) => {
718 yield Ok(updates);
719 }
720
721 Err(error) => {
723 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
724 self.expire_session().await;
726 }
727
728 yield Err(error);
729
730 break;
732 }
733 }
734 }
735 }
736 }
737
738 debug!("Sync stream has exited.");
739 }
740 }
741
742 pub fn stop_sync(&self) -> Result<()> {
751 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
752 }
753
754 #[doc(hidden)]
766 pub async fn expire_session(&self) {
767 info!("Session expired; resetting `pos` and sticky parameters");
768
769 {
770 let mut position = self.inner.position.lock().await;
771 position.pos = None;
772
773 if let Err(err) = self.cache_to_storage(&position).await {
774 error!(
775 "couldn't invalidate sliding sync frozen state when expiring session: {err}"
776 );
777 }
778 }
779
780 {
781 let mut sticky = self.inner.sticky.write().unwrap();
782
783 sticky.data_mut().room_subscriptions.clear();
786 }
787
788 self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
789 }
790}
791
792impl SlidingSyncInner {
793 #[instrument]
795 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
796 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
797 }
798
799 #[instrument]
802 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
803 let _ = self.internal_channel.send(message);
805 }
806}
807
808#[derive(Copy, Clone, Debug, PartialEq)]
809enum SlidingSyncInternalMessage {
810 SyncLoopStop,
812
813 SyncLoopSkipOverCurrentIteration,
816}
817
818#[cfg(any(test, feature = "testing"))]
819impl SlidingSync {
820 pub async fn set_pos(&self, new_pos: String) {
822 let mut position_lock = self.inner.position.lock().await;
823 position_lock.pos = Some(new_pos);
824 }
825
826 pub fn extensions_config(&self) -> http::request::Extensions {
832 let sticky = self.inner.sticky.read().unwrap();
833 sticky.data().extensions.clone()
834 }
835}
836
837#[derive(Clone, Debug)]
838pub(super) struct SlidingSyncPositionMarkers {
839 pos: Option<String>,
844}
845
846#[derive(Debug, Serialize, Deserialize)]
848struct FrozenSlidingSync {
849 #[serde(skip_serializing_if = "Option::is_none")]
851 to_device_since: Option<String>,
852 #[serde(default, skip_serializing_if = "Vec::is_empty")]
853 rooms: Vec<FrozenSlidingSyncRoom>,
854}
855
856impl FrozenSlidingSync {
857 fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
858 Self {
860 to_device_since: None,
861 rooms: rooms
862 .iter()
863 .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
864 .collect::<Vec<_>>(),
865 }
866 }
867}
868
869#[derive(Serialize, Deserialize)]
870struct FrozenSlidingSyncPos {
871 #[serde(skip_serializing_if = "Option::is_none")]
872 pos: Option<String>,
873}
874
875#[derive(Debug, Clone)]
878pub struct UpdateSummary {
879 pub lists: Vec<String>,
881 pub rooms: Vec<OwnedRoomId>,
883}
884
885#[derive(Debug, Default)]
889enum RoomSubscriptionState {
890 #[default]
894 Pending,
895
896 Applied,
899}
900
901#[derive(Debug)]
904pub(super) struct SlidingSyncStickyParameters {
905 room_subscriptions:
908 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
909
910 extensions: http::request::Extensions,
913}
914
915impl SlidingSyncStickyParameters {
916 pub fn new(
918 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
919 extensions: http::request::Extensions,
920 ) -> Self {
921 Self {
922 room_subscriptions: room_subscriptions
923 .into_iter()
924 .map(|(room_id, room_subscription)| {
925 (room_id, (RoomSubscriptionState::Pending, room_subscription))
926 })
927 .collect(),
928 extensions,
929 }
930 }
931}
932
933impl StickyData for SlidingSyncStickyParameters {
934 type Request = http::Request;
935
936 fn apply(&self, request: &mut Self::Request) {
937 request.room_subscriptions = self
938 .room_subscriptions
939 .iter()
940 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
941 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
942 .collect();
943 request.extensions = self.extensions.clone();
944 }
945
946 fn on_commit(&mut self) {
947 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
949 if matches!(state, RoomSubscriptionState::Pending) {
950 *state = RoomSubscriptionState::Applied;
951 }
952 }
953 }
954}
955
956#[cfg(all(test, not(target_family = "wasm")))]
957#[allow(clippy::dbg_macro)]
958mod tests {
959 use std::{
960 collections::BTreeMap,
961 future::ready,
962 ops::Not,
963 sync::{Arc, Mutex},
964 time::Duration,
965 };
966
967 use assert_matches::assert_matches;
968 use event_listener::Listener;
969 use futures_util::{future::join_all, pin_mut, StreamExt};
970 use matrix_sdk_test::async_test;
971 use ruma::{
972 api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
973 OwnedRoomId, TransactionId,
974 };
975 use serde::Deserialize;
976 use serde_json::json;
977 use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
978
979 use super::{
980 http,
981 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
982 FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
983 SlidingSyncStickyParameters,
984 };
985 use crate::{
986 sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
987 };
988
989 #[derive(Copy, Clone)]
990 struct SlidingSyncMatcher;
991
992 impl Match for SlidingSyncMatcher {
993 fn matches(&self, request: &Request) -> bool {
994 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
995 && request.method == Method::POST
996 }
997 }
998
999 async fn new_sliding_sync(
1000 lists: Vec<SlidingSyncListBuilder>,
1001 ) -> Result<(MockServer, SlidingSync)> {
1002 let server = MockServer::start().await;
1003 let client = logged_in_client(Some(server.uri())).await;
1004
1005 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1006
1007 for list in lists {
1008 sliding_sync_builder = sliding_sync_builder.add_list(list);
1009 }
1010
1011 let sliding_sync = sliding_sync_builder.build().await?;
1012
1013 Ok((server, sliding_sync))
1014 }
1015
1016 #[async_test]
1017 async fn test_subscribe_to_rooms() -> Result<()> {
1018 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1019 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1020 .await?;
1021
1022 let stream = sliding_sync.sync();
1023 pin_mut!(stream);
1024
1025 let room_id_0 = room_id!("!r0:bar.org");
1026 let room_id_1 = room_id!("!r1:bar.org");
1027 let room_id_2 = room_id!("!r2:bar.org");
1028
1029 {
1030 let _mock_guard = Mock::given(SlidingSyncMatcher)
1031 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1032 "pos": "1",
1033 "lists": {},
1034 "rooms": {
1035 room_id_0: {
1036 "name": "Room #0",
1037 "initial": true,
1038 },
1039 room_id_1: {
1040 "name": "Room #1",
1041 "initial": true,
1042 },
1043 room_id_2: {
1044 "name": "Room #2",
1045 "initial": true,
1046 },
1047 }
1048 })))
1049 .mount_as_scoped(&server)
1050 .await;
1051
1052 let _ = stream.next().await.unwrap()?;
1053 }
1054
1055 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1056
1057 assert!(room0.are_members_synced().not());
1061
1062 {
1063 struct MemberMatcher(OwnedRoomId);
1064
1065 impl Match for MemberMatcher {
1066 fn matches(&self, request: &Request) -> bool {
1067 request.url.path()
1068 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1069 && request.method == Method::GET
1070 }
1071 }
1072
1073 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1074 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1075 "chunk": [],
1076 })))
1077 .mount_as_scoped(&server)
1078 .await;
1079
1080 assert_matches!(room0.request_members().await, Ok(()));
1081 }
1082
1083 assert!(room0.are_members_synced());
1085
1086 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1087
1088 assert!(room0.are_members_synced().not());
1091
1092 {
1093 let sticky = sliding_sync.inner.sticky.read().unwrap();
1094 let room_subscriptions = &sticky.data().room_subscriptions;
1095
1096 assert!(room_subscriptions.contains_key(room_id_0));
1097 assert!(room_subscriptions.contains_key(room_id_1));
1098 assert!(!room_subscriptions.contains_key(room_id_2));
1099 }
1100
1101 {
1104 struct MemberMatcher(OwnedRoomId);
1105
1106 impl Match for MemberMatcher {
1107 fn matches(&self, request: &Request) -> bool {
1108 request.url.path()
1109 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1110 && request.method == Method::GET
1111 }
1112 }
1113
1114 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1115 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1116 "chunk": [],
1117 })))
1118 .mount_as_scoped(&server)
1119 .await;
1120
1121 assert_matches!(room0.request_members().await, Ok(()));
1122 }
1123
1124 assert!(room0.are_members_synced());
1126
1127 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1128
1129 assert!(room0.are_members_synced());
1132
1133 Ok(())
1134 }
1135
1136 #[async_test]
1137 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1138 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1139 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1140 .await?;
1141
1142 let room_id_0 = room_id!("!r0:bar.org");
1143 let room_id_1 = room_id!("!r1:bar.org");
1144 let room_id_2 = room_id!("!r2:bar.org");
1145
1146 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1148
1149 {
1150 let sticky = sliding_sync.inner.sticky.read().unwrap();
1151 let room_subscriptions = &sticky.data().room_subscriptions;
1152
1153 assert!(room_subscriptions.contains_key(room_id_0));
1154 assert!(room_subscriptions.contains_key(room_id_1));
1155 assert!(room_subscriptions.contains_key(room_id_2).not());
1156 }
1157
1158 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1160
1161 {
1162 let sticky = sliding_sync.inner.sticky.read().unwrap();
1163 let room_subscriptions = &sticky.data().room_subscriptions;
1164
1165 assert!(room_subscriptions.contains_key(room_id_0));
1166 assert!(room_subscriptions.contains_key(room_id_1));
1167 assert!(room_subscriptions.contains_key(room_id_2));
1168 }
1169
1170 sliding_sync.expire_session().await;
1172
1173 {
1174 let sticky = sliding_sync.inner.sticky.read().unwrap();
1175 let room_subscriptions = &sticky.data().room_subscriptions;
1176
1177 assert!(room_subscriptions.is_empty());
1178 }
1179
1180 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1182
1183 {
1184 let sticky = sliding_sync.inner.sticky.read().unwrap();
1185 let room_subscriptions = &sticky.data().room_subscriptions;
1186
1187 assert!(room_subscriptions.contains_key(room_id_0).not());
1188 assert!(room_subscriptions.contains_key(room_id_1).not());
1189 assert!(room_subscriptions.contains_key(room_id_2));
1190 }
1191
1192 Ok(())
1193 }
1194
1195 #[async_test]
1196 async fn test_to_device_token_properly_cached() -> Result<()> {
1197 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1198 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1199 .await?;
1200
1201 let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1204 assert!(frozen.to_device_since.is_none());
1205
1206 Ok(())
1207 }
1208
1209 #[async_test]
1210 async fn test_add_list() -> Result<()> {
1211 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1212 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1213 .await?;
1214
1215 let _stream = sliding_sync.sync();
1216 pin_mut!(_stream);
1217
1218 sliding_sync
1219 .add_list(
1220 SlidingSyncList::builder("bar")
1221 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1222 )
1223 .await?;
1224
1225 let lists = sliding_sync.inner.lists.read().await;
1226
1227 assert!(lists.contains_key("foo"));
1228 assert!(lists.contains_key("bar"));
1229
1230 Ok(())
1233 }
1234
1235 #[test]
1236 fn test_sticky_parameters_api_invalidated_flow() {
1237 let r0 = room_id!("!r0.matrix.org");
1238 let r1 = room_id!("!r1:matrix.org");
1239
1240 let mut room_subscriptions = BTreeMap::new();
1241 room_subscriptions.insert(r0.to_owned(), Default::default());
1242
1243 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1245 room_subscriptions,
1246 Default::default(),
1247 ));
1248 assert!(sticky.is_invalidated());
1249
1250 let txn_id: &TransactionId = "tid123".into();
1252
1253 let mut request = http::Request::default();
1254 request.txn_id = Some(txn_id.to_string());
1255
1256 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1257
1258 assert!(request.txn_id.is_some());
1259 assert_eq!(request.room_subscriptions.len(), 1);
1260 assert!(request.room_subscriptions.contains_key(r0));
1261
1262 let tid = request.txn_id.unwrap();
1263
1264 sticky.maybe_commit(tid.as_str().into());
1265 assert!(!sticky.is_invalidated());
1266
1267 sticky
1269 .data_mut()
1270 .room_subscriptions
1271 .insert(r1.to_owned(), (Default::default(), Default::default()));
1272 assert!(sticky.is_invalidated());
1273
1274 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1276 assert!(sticky.is_invalidated());
1277
1278 let txn_id1: &TransactionId = "tid456".into();
1280 let mut request1 = http::Request::default();
1281 request1.txn_id = Some(txn_id1.to_string());
1282 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1283
1284 assert!(sticky.is_invalidated());
1285 assert_eq!(request1.room_subscriptions.len(), 1);
1289 assert!(request1.room_subscriptions.contains_key(r1));
1290
1291 let txn_id2: &TransactionId = "tid789".into();
1292 let mut request2 = http::Request::default();
1293 request2.txn_id = Some(txn_id2.to_string());
1294
1295 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1296 assert!(sticky.is_invalidated());
1297 assert_eq!(request2.room_subscriptions.len(), 1);
1300 assert!(request2.room_subscriptions.contains_key(r1));
1301
1302 sticky.maybe_commit(txn_id1);
1305 assert!(sticky.is_invalidated());
1306
1307 sticky.maybe_commit(txn_id2);
1309 assert!(!sticky.is_invalidated());
1310 }
1311
1312 #[test]
1313 fn test_room_subscriptions_are_sticky() {
1314 let r0 = room_id!("!r0.matrix.org");
1315 let r1 = room_id!("!r1:matrix.org");
1316
1317 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1318 BTreeMap::new(),
1319 Default::default(),
1320 ));
1321
1322 {
1324 sticky
1326 .data_mut()
1327 .room_subscriptions
1328 .insert(r0.to_owned(), (Default::default(), Default::default()));
1329
1330 let txn_id: &TransactionId = "tid0".into();
1332 let mut request = http::Request::default();
1333 request.txn_id = Some(txn_id.to_string());
1334
1335 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1336
1337 assert!(request.txn_id.is_some());
1338 assert_eq!(request.room_subscriptions.len(), 1);
1339 assert!(request.room_subscriptions.contains_key(r0));
1340
1341 let tid = request.txn_id.unwrap();
1343
1344 sticky.maybe_commit(tid.as_str().into());
1345 }
1346
1347 {
1349 sticky
1351 .data_mut()
1352 .room_subscriptions
1353 .insert(r1.to_owned(), (Default::default(), Default::default()));
1354
1355 let txn_id: &TransactionId = "tid1".into();
1357 let mut request = http::Request::default();
1358 request.txn_id = Some(txn_id.to_string());
1359
1360 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1361
1362 assert!(request.txn_id.is_some());
1363 assert_eq!(request.room_subscriptions.len(), 1);
1364 assert!(request.room_subscriptions.contains_key(r1));
1366
1367 }
1371
1372 {
1374 let txn_id: &TransactionId = "tid2".into();
1376 let mut request = http::Request::default();
1377 request.txn_id = Some(txn_id.to_string());
1378
1379 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1380
1381 assert!(request.txn_id.is_some());
1382 assert_eq!(request.room_subscriptions.len(), 1);
1383 assert!(request.room_subscriptions.contains_key(r1));
1385
1386 let tid = request.txn_id.unwrap();
1388
1389 sticky.maybe_commit(tid.as_str().into());
1390 }
1391
1392 {
1394 let txn_id: &TransactionId = "tid3".into();
1396 let mut request = http::Request::default();
1397 request.txn_id = Some(txn_id.to_string());
1398
1399 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1400
1401 assert!(request.txn_id.is_some());
1402 assert!(request.room_subscriptions.is_empty());
1404 }
1405 }
1406
1407 #[test]
1408 fn test_extensions_are_sticky() {
1409 let mut extensions = http::request::Extensions::default();
1410 extensions.account_data.enabled = Some(true);
1411
1412 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1414 Default::default(),
1415 extensions,
1416 ));
1417
1418 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1419
1420 let extensions = &sticky.data().extensions;
1423 assert_eq!(extensions.e2ee.enabled, None);
1424 assert_eq!(extensions.to_device.enabled, None);
1425 assert_eq!(extensions.to_device.since, None);
1426
1427 assert_eq!(extensions.account_data.enabled, Some(true));
1429
1430 let txn_id: &TransactionId = "tid123".into();
1431 let mut request = http::Request::default();
1432 request.txn_id = Some(txn_id.to_string());
1433 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1434 assert!(sticky.is_invalidated());
1435 assert_eq!(request.extensions.to_device.enabled, None);
1436 assert_eq!(request.extensions.to_device.since, None);
1437 assert_eq!(request.extensions.e2ee.enabled, None);
1438 assert_eq!(request.extensions.account_data.enabled, Some(true));
1439 }
1440
1441 #[async_test]
1442 async fn test_sticky_extensions_plus_since() -> Result<()> {
1443 let server = MockServer::start().await;
1444 let client = logged_in_client(Some(server.uri())).await;
1445
1446 let sync = client
1447 .sliding_sync("test-slidingsync")?
1448 .add_list(SlidingSyncList::builder("new_list"))
1449 .build()
1450 .await?;
1451
1452 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1454 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1455 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1456
1457 let sync = client
1459 .sliding_sync("test-slidingsync")?
1460 .add_list(SlidingSyncList::builder("new_list"))
1461 .with_to_device_extension(
1462 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1463 )
1464 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1465 .build()
1466 .await?;
1467
1468 let txn_id = TransactionId::new();
1471 let (request, _, _) = sync
1472 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1473 .await?;
1474
1475 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1476 assert_eq!(request.extensions.to_device.enabled, Some(true));
1477 assert!(request.extensions.to_device.since.is_none());
1478
1479 {
1480 let mut sticky = sync.inner.sticky.write().unwrap();
1482 assert!(sticky.is_invalidated());
1483 sticky.maybe_commit(
1484 "hopefully the rng won't generate this very specific transaction id".into(),
1485 );
1486 assert!(sticky.is_invalidated());
1487 }
1488
1489 let txn_id2 = TransactionId::new();
1491 let (request, _, _) = sync
1492 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1493 .await?;
1494
1495 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1496 assert_eq!(request.extensions.to_device.enabled, Some(true));
1497 assert!(request.extensions.to_device.since.is_none());
1498
1499 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1500
1501 {
1502 let mut sticky = sync.inner.sticky.write().unwrap();
1504 assert!(sticky.is_invalidated());
1505 sticky.maybe_commit(txn_id2.as_str().into());
1506 assert!(!sticky.is_invalidated());
1507 }
1508
1509 let txn_id = TransactionId::new();
1511 let (request, _, _) = sync
1512 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1513 .await?;
1514 assert!(request.extensions.e2ee.enabled.is_none());
1515 assert!(request.extensions.to_device.enabled.is_none());
1516 assert!(request.extensions.to_device.since.is_none());
1517
1518 let _since_token = "since";
1522
1523 #[cfg(feature = "e2e-encryption")]
1524 {
1525 use matrix_sdk_base::crypto::store::Changes;
1526 if let Some(olm_machine) = &*client.olm_machine().await {
1527 olm_machine
1528 .store()
1529 .save_changes(Changes {
1530 next_batch_token: Some(_since_token.to_owned()),
1531 ..Default::default()
1532 })
1533 .await?;
1534 }
1535 }
1536
1537 let txn_id = TransactionId::new();
1538 let (request, _, _) = sync
1539 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1540 .await?;
1541
1542 assert!(request.extensions.e2ee.enabled.is_none());
1543 assert!(request.extensions.to_device.enabled.is_none());
1544
1545 #[cfg(feature = "e2e-encryption")]
1546 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1547
1548 Ok(())
1549 }
1550
1551 #[async_test]
1557 #[cfg(feature = "e2e-encryption")]
1558 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1559 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1560 use matrix_sdk_test::ruma_response_from_json;
1561 use ruma::user_id;
1562
1563 let server = MockServer::start().await;
1564 let client = logged_in_client(Some(server.uri())).await;
1565
1566 let alice = user_id!("@alice:localhost");
1567 let bob = user_id!("@bob:localhost");
1568 let me = user_id!("@example:localhost");
1569
1570 {
1573 let olm_machine = client.olm_machine().await;
1574 let olm_machine = olm_machine.as_ref().unwrap();
1575
1576 olm_machine.update_tracked_users([alice, bob]).await?;
1577
1578 let outgoing_requests = olm_machine.outgoing_requests().await?;
1580
1581 assert_eq!(outgoing_requests.len(), 2);
1582 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1583 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1584
1585 olm_machine
1587 .mark_request_as_sent(
1588 outgoing_requests[0].request_id(),
1589 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1590 "one_time_key_counts": {}
1591 }))),
1592 )
1593 .await?;
1594
1595 olm_machine
1596 .mark_request_as_sent(
1597 outgoing_requests[1].request_id(),
1598 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1599 "device_keys": {
1600 alice: {},
1601 bob: {},
1602 }
1603 }))),
1604 )
1605 .await?;
1606
1607 let outgoing_requests = olm_machine.outgoing_requests().await?;
1609
1610 assert_eq!(outgoing_requests.len(), 1);
1611 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1612
1613 olm_machine
1614 .mark_request_as_sent(
1615 outgoing_requests[0].request_id(),
1616 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1617 "device_keys": {
1618 me: {},
1619 }
1620 }))),
1621 )
1622 .await?;
1623
1624 let outgoing_requests = olm_machine.outgoing_requests().await?;
1626
1627 assert!(outgoing_requests.is_empty());
1628 }
1629
1630 let sync = client
1631 .sliding_sync("test-slidingsync")?
1632 .add_list(SlidingSyncList::builder("new_list"))
1633 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1634 .build()
1635 .await?;
1636
1637 let txn_id = TransactionId::new();
1639 let (_request, _, _) = sync
1640 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1641 .await?;
1642
1643 {
1645 let olm_machine = client.olm_machine().await;
1646 let olm_machine = olm_machine.as_ref().unwrap();
1647
1648 let outgoing_requests = olm_machine.outgoing_requests().await?;
1650
1651 assert_eq!(outgoing_requests.len(), 1);
1652 assert_matches!(
1653 outgoing_requests[0].request(),
1654 AnyOutgoingRequest::KeysQuery(request) => {
1655 assert!(request.device_keys.contains_key(alice));
1656 assert!(request.device_keys.contains_key(bob));
1657 assert!(request.device_keys.contains_key(me));
1658 }
1659 );
1660
1661 olm_machine
1663 .mark_request_as_sent(
1664 outgoing_requests[0].request_id(),
1665 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1666 "device_keys": {
1667 alice: {},
1668 bob: {},
1669 me: {},
1670 }
1671 }))),
1672 )
1673 .await?;
1674 }
1675
1676 sync.set_pos("chocolat".to_owned()).await;
1678
1679 let txn_id = TransactionId::new();
1680 let (_request, _, _) = sync
1681 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1682 .await?;
1683
1684 {
1686 let olm_machine = client.olm_machine().await;
1687 let olm_machine = olm_machine.as_ref().unwrap();
1688
1689 let outgoing_requests = olm_machine.outgoing_requests().await?;
1691
1692 assert!(outgoing_requests.is_empty());
1693 }
1694
1695 Ok(())
1696 }
1697
1698 #[async_test]
1699 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1700 let server = MockServer::start().await;
1701 let client = logged_in_client(Some(server.uri())).await;
1702
1703 let sliding_sync = client
1704 .sliding_sync("test-slidingsync")?
1705 .with_to_device_extension(
1706 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1707 )
1708 .build()
1709 .await?;
1710
1711 let (request, _, _) =
1713 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1714 assert!(request.extensions.to_device.enabled.is_some());
1715
1716 let sync = sliding_sync.sync();
1717 pin_mut!(sync);
1718
1719 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1721
1722 #[derive(Deserialize)]
1723 struct PartialRequest {
1724 txn_id: Option<String>,
1725 }
1726
1727 {
1728 let _mock_guard = Mock::given(SlidingSyncMatcher)
1729 .respond_with(|request: &Request| {
1730 let request: PartialRequest = request.body_json().unwrap();
1732
1733 ResponseTemplate::new(200).set_body_json(json!({
1734 "txn_id": request.txn_id,
1735 "pos": "0",
1736 }))
1737 })
1738 .mount_as_scoped(&server)
1739 .await;
1740
1741 let next = sync.next().await;
1742 assert_matches!(next, Some(Ok(_update_summary)));
1743
1744 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1746 }
1747
1748 let (request, _, _) =
1750 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1751 assert!(request.extensions.to_device.enabled.is_none());
1752
1753 {
1755 let _mock_guard = Mock::given(SlidingSyncMatcher)
1756 .respond_with(|request: &Request| {
1757 let request: PartialRequest = request.body_json().unwrap();
1759
1760 ResponseTemplate::new(200).set_body_json(json!({
1761 "txn_id": request.txn_id,
1762 "pos": "1",
1763 }))
1764 })
1765 .mount_as_scoped(&server)
1766 .await;
1767
1768 let next = sync.next().await;
1769 assert_matches!(next, Some(Ok(_update_summary)));
1770
1771 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1773 }
1774
1775 {
1778 let _mock_guard = Mock::given(SlidingSyncMatcher)
1779 .respond_with(|request: &Request| {
1780 let request: PartialRequest = request.body_json().unwrap();
1782
1783 ResponseTemplate::new(200).set_body_json(json!({
1784 "txn_id": request.txn_id,
1785 "pos": "0", }))
1787 })
1788 .up_to_n_times(1) .mount_as_scoped(&server)
1790 .await;
1791
1792 let next = sync.next().await;
1793 assert_matches!(next, Some(Ok(_update_summary)));
1794
1795 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1797 }
1798
1799 {
1804 let _mock_guard = Mock::given(SlidingSyncMatcher)
1805 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1806 "error": "foo",
1807 "errcode": "M_UNKNOWN_POS",
1808 })))
1809 .mount_as_scoped(&server)
1810 .await;
1811
1812 let next = sync.next().await;
1813
1814 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1816
1817 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1819
1820 let (request, _, _) =
1822 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1823
1824 assert!(request.extensions.to_device.enabled.is_some());
1825
1826 assert!(sync.next().await.is_none());
1828 }
1829
1830 Ok(())
1831 }
1832
1833 #[cfg(feature = "e2e-encryption")]
1834 #[async_test]
1835 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1836 let server = MockServer::start().await;
1837
1838 #[derive(Deserialize)]
1839 struct PartialRequest {
1840 txn_id: Option<String>,
1841 }
1842
1843 let server_pos = Arc::new(Mutex::new(0));
1844 let _mock_guard = Mock::given(SlidingSyncMatcher)
1845 .respond_with(move |request: &Request| {
1846 let request: PartialRequest = request.body_json().unwrap();
1848 let pos = {
1849 let mut pos = server_pos.lock().unwrap();
1850 let prev = *pos;
1851 *pos += 1;
1852 prev
1853 };
1854
1855 ResponseTemplate::new(200).set_body_json(json!({
1856 "txn_id": request.txn_id,
1857 "pos": pos.to_string(),
1858 }))
1859 })
1860 .mount_as_scoped(&server)
1861 .await;
1862
1863 let client = logged_in_client(Some(server.uri())).await;
1864
1865 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1866
1867 {
1869 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1870
1871 let (request, _, _) =
1872 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1873 assert!(request.pos.is_none());
1874 }
1875
1876 let sync = sliding_sync.sync();
1877 pin_mut!(sync);
1878
1879 let next = sync.next().await;
1882 assert_matches!(next, Some(Ok(_update_summary)));
1883
1884 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1885
1886 let restored_fields = restore_sliding_sync_state(
1887 &client,
1888 &sliding_sync.inner.storage_key,
1889 &*sliding_sync.inner.lists.read().await,
1890 )
1891 .await?
1892 .expect("must have restored fields");
1893
1894 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1897
1898 {
1902 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1903
1904 let mut position_guard = other_sync.inner.position.lock().await;
1905 position_guard.pos = Some("yolo".to_owned());
1906
1907 other_sync.cache_to_storage(&position_guard).await?;
1908 }
1909
1910 {
1912 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1913 let (request, _, _) =
1914 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1915 assert_eq!(request.pos.as_deref(), Some("0"));
1916 }
1917
1918 {
1921 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1922 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1923 }
1924
1925 Ok(())
1926 }
1927
1928 #[cfg(feature = "e2e-encryption")]
1929 #[async_test]
1930 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1931 let server = MockServer::start().await;
1932
1933 #[derive(Deserialize)]
1934 struct PartialRequest {
1935 txn_id: Option<String>,
1936 }
1937
1938 let server_pos = Arc::new(Mutex::new(0));
1939 let _mock_guard = Mock::given(SlidingSyncMatcher)
1940 .respond_with(move |request: &Request| {
1941 let request: PartialRequest = request.body_json().unwrap();
1943 let pos = {
1944 let mut pos = server_pos.lock().unwrap();
1945 let prev = *pos;
1946 *pos += 1;
1947 prev
1948 };
1949
1950 ResponseTemplate::new(200).set_body_json(json!({
1951 "txn_id": request.txn_id,
1952 "pos": pos.to_string(),
1953 }))
1954 })
1955 .mount_as_scoped(&server)
1956 .await;
1957
1958 let client = logged_in_client(Some(server.uri())).await;
1959
1960 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1961
1962 {
1964 let (request, _, _) =
1965 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1966
1967 assert!(request.pos.is_none());
1968 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1969 }
1970
1971 let sync = sliding_sync.sync();
1972 pin_mut!(sync);
1973
1974 let next = sync.next().await;
1977 assert_matches!(next, Some(Ok(_update_summary)));
1978
1979 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1980
1981 let restored_fields = restore_sliding_sync_state(
1982 &client,
1983 &sliding_sync.inner.storage_key,
1984 &*sliding_sync.inner.lists.read().await,
1985 )
1986 .await?
1987 .expect("must have restored fields");
1988
1989 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1992
1993 {
1995 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1996
1997 let mut position_guard = other_sync.inner.position.lock().await;
1998 position_guard.pos = Some("42".to_owned());
1999
2000 other_sync.cache_to_storage(&position_guard).await?;
2001 }
2002
2003 {
2005 let (request, _, _) =
2006 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2007 assert_eq!(request.pos.as_deref(), Some("42"));
2008 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2009 }
2010
2011 {
2013 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2014 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2015
2016 let (request, _, _) =
2017 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2018 assert_eq!(request.pos.as_deref(), Some("42"));
2019 }
2020
2021 sliding_sync.expire_session().await;
2024
2025 {
2026 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2027
2028 let (request, _, _) =
2029 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2030 assert!(request.pos.is_none());
2031 }
2032
2033 {
2035 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2036 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2037
2038 let (request, _, _) =
2039 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2040 assert!(request.pos.is_none());
2041 }
2042
2043 Ok(())
2044 }
2045
2046 #[async_test]
2047 async fn test_stop_sync_loop() -> Result<()> {
2048 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2049 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2050 .await?;
2051
2052 let stream = sliding_sync.sync();
2054 pin_mut!(stream);
2055
2056 assert!(stream.next().await.is_some());
2058
2059 sliding_sync.stop_sync()?;
2061
2062 assert!(stream.next().await.is_none());
2064
2065 let stream = sliding_sync.sync();
2067 pin_mut!(stream);
2068
2069 assert!(stream.next().await.is_some());
2071
2072 Ok(())
2073 }
2074
2075 #[async_test]
2076 async fn test_process_read_receipts() -> Result<()> {
2077 let room = owned_room_id!("!pony:example.org");
2078
2079 let server = MockServer::start().await;
2080 let client = logged_in_client(Some(server.uri())).await;
2081
2082 let sliding_sync = client
2083 .sliding_sync("test")?
2084 .with_receipt_extension(
2085 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2086 )
2087 .add_list(
2088 SlidingSyncList::builder("all")
2089 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2090 )
2091 .build()
2092 .await?;
2093
2094 {
2096 let server_response = assign!(http::Response::new("0".to_owned()), {
2097 rooms: BTreeMap::from([(
2098 room.clone(),
2099 http::response::Room::default(),
2100 )])
2101 });
2102
2103 let _summary = {
2104 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2105 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2106 };
2107 }
2108
2109 let server_response = assign!(http::Response::new("1".to_owned()), {
2110 extensions: assign!(http::response::Extensions::default(), {
2111 receipts: assign!(http::response::Receipts::default(), {
2112 rooms: BTreeMap::from([
2113 (
2114 room.clone(),
2115 Raw::from_json_string(
2116 json!({
2117 "room_id": room,
2118 "type": "m.receipt",
2119 "content": {
2120 "$event:bar.org": {
2121 "m.read": {
2122 client.user_id().unwrap(): {
2123 "ts": 1436451550,
2124 }
2125 }
2126 }
2127 }
2128 })
2129 .to_string(),
2130 ).unwrap()
2131 )
2132 ])
2133 })
2134 })
2135 });
2136
2137 let summary = {
2138 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2139 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2140 };
2141
2142 assert!(summary.rooms.contains(&room));
2143
2144 Ok(())
2145 }
2146
2147 #[async_test]
2148 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2149 let room_id = owned_room_id!("!unicorn:example.org");
2150
2151 let server = MockServer::start().await;
2152 let client = logged_in_client(Some(server.uri())).await;
2153
2154 let sliding_sync = client
2157 .sliding_sync("test")?
2158 .with_account_data_extension(
2159 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2160 )
2161 .add_list(
2162 SlidingSyncList::builder("all")
2163 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2164 )
2165 .build()
2166 .await?;
2167
2168 {
2170 let server_response = assign!(http::Response::new("0".to_owned()), {
2171 rooms: BTreeMap::from([(
2172 room_id.clone(),
2173 http::response::Room::default(),
2174 )])
2175 });
2176
2177 let _summary = {
2178 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2179 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2180 };
2181 }
2182
2183 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2187
2188 let update_summary = {
2189 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2190 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2191 };
2192
2193 assert!(update_summary.rooms.contains(&room_id));
2196
2197 let room = client.get_room(&room_id).unwrap();
2198
2199 assert!(room.is_marked_unread());
2202
2203 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2206
2207 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2208 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
2209
2210 let room = client.get_room(&room_id).unwrap();
2211
2212 assert!(!room.is_marked_unread());
2213
2214 Ok(())
2215 }
2216
2217 fn make_mark_unread_response(
2218 response_number: &str,
2219 room_id: OwnedRoomId,
2220 unread: bool,
2221 add_rooms_section: bool,
2222 ) -> http::Response {
2223 let rooms = if add_rooms_section {
2224 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2225 } else {
2226 BTreeMap::new()
2227 };
2228
2229 let extensions = assign!(http::response::Extensions::default(), {
2230 account_data: assign!(http::response::AccountData::default(), {
2231 rooms: BTreeMap::from([
2232 (
2233 room_id,
2234 vec![
2235 Raw::from_json_string(
2236 json!({
2237 "content": {
2238 "unread": unread
2239 },
2240 "type": "com.famedly.marked_unread"
2241 })
2242 .to_string(),
2243 ).unwrap()
2244 ]
2245 )
2246 ])
2247 })
2248 });
2249
2250 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2251 }
2252
2253 #[async_test]
2254 async fn test_process_rooms_account_data() -> Result<()> {
2255 let room = owned_room_id!("!pony:example.org");
2256
2257 let server = MockServer::start().await;
2258 let client = logged_in_client(Some(server.uri())).await;
2259
2260 let sliding_sync = client
2261 .sliding_sync("test")?
2262 .with_account_data_extension(
2263 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2264 )
2265 .add_list(
2266 SlidingSyncList::builder("all")
2267 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2268 )
2269 .build()
2270 .await?;
2271
2272 {
2274 let server_response = assign!(http::Response::new("0".to_owned()), {
2275 rooms: BTreeMap::from([(
2276 room.clone(),
2277 http::response::Room::default(),
2278 )])
2279 });
2280
2281 let _summary = {
2282 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2283 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2284 };
2285 }
2286
2287 let server_response = assign!(http::Response::new("1".to_owned()), {
2288 extensions: assign!(http::response::Extensions::default(), {
2289 account_data: assign!(http::response::AccountData::default(), {
2290 rooms: BTreeMap::from([
2291 (
2292 room.clone(),
2293 vec![
2294 Raw::from_json_string(
2295 json!({
2296 "content": {
2297 "tags": {
2298 "u.work": {
2299 "order": 0.9
2300 }
2301 }
2302 },
2303 "type": "m.tag"
2304 })
2305 .to_string(),
2306 ).unwrap()
2307 ]
2308 )
2309 ])
2310 })
2311 })
2312 });
2313 let summary = {
2314 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2315 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2316 };
2317
2318 assert!(summary.rooms.contains(&room));
2319
2320 Ok(())
2321 }
2322
2323 #[async_test]
2324 #[cfg(feature = "e2e-encryption")]
2325 async fn test_process_only_encryption_events() -> Result<()> {
2326 use ruma::OneTimeKeyAlgorithm;
2327
2328 let room = owned_room_id!("!croissant:example.org");
2329
2330 let server = MockServer::start().await;
2331 let client = logged_in_client(Some(server.uri())).await;
2332
2333 let server_response = assign!(http::Response::new("0".to_owned()), {
2334 rooms: BTreeMap::from([(
2335 room.clone(),
2336 assign!(http::response::Room::default(), {
2337 name: Some("Croissants lovers".to_owned()),
2338 timeline: Vec::new(),
2339 }),
2340 )]),
2341
2342 extensions: assign!(http::response::Extensions::default(), {
2343 e2ee: assign!(http::response::E2EE::default(), {
2344 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2345 }),
2346 to_device: Some(assign!(http::response::ToDevice::default(), {
2347 next_batch: "to-device-token".to_owned(),
2348 })),
2349 })
2350 });
2351
2352 let sliding_sync = client
2356 .sliding_sync("test")?
2357 .with_to_device_extension(
2358 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2359 )
2360 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2361 .build()
2362 .await?;
2363
2364 {
2365 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2366
2367 sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2368 }
2369
2370 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2372 assert_eq!(uploaded_key_count, 42);
2373
2374 {
2375 let olm_machine = &*client.olm_machine_for_testing().await;
2376 assert_eq!(
2377 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2378 Some("to-device-token")
2379 );
2380 }
2381
2382 assert!(client.get_room(&room).is_none());
2384
2385 let client = logged_in_client(Some(server.uri())).await;
2388
2389 let sliding_sync = client
2390 .sliding_sync("test")?
2391 .add_list(SlidingSyncList::builder("thelist"))
2392 .build()
2393 .await?;
2394
2395 {
2396 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2397
2398 sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2399 }
2400
2401 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2403 assert_eq!(uploaded_key_count, 0);
2404
2405 {
2406 let olm_machine = &*client.olm_machine_for_testing().await;
2407 assert_eq!(
2408 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2409 None
2410 );
2411 }
2412
2413 assert!(client.get_room(&room).is_some());
2415
2416 let client = logged_in_client(Some(server.uri())).await;
2418
2419 let sliding_sync = client
2420 .sliding_sync("test")?
2421 .add_list(SlidingSyncList::builder("thelist"))
2422 .with_to_device_extension(
2423 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2424 )
2425 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2426 .build()
2427 .await?;
2428
2429 {
2430 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2431
2432 sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2433 }
2434
2435 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2437 assert_eq!(uploaded_key_count, 42);
2438
2439 {
2440 let olm_machine = &*client.olm_machine_for_testing().await;
2441 assert_eq!(
2442 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2443 Some("to-device-token")
2444 );
2445 }
2446
2447 assert!(client.get_room(&room).is_some());
2449
2450 Ok(())
2451 }
2452
2453 #[async_test]
2454 async fn test_lock_multiple_requests() -> Result<()> {
2455 let server = MockServer::start().await;
2456 let client = logged_in_client(Some(server.uri())).await;
2457
2458 let pos = Arc::new(Mutex::new(0));
2459 let _mock_guard = Mock::given(SlidingSyncMatcher)
2460 .respond_with(move |_: &Request| {
2461 let mut pos = pos.lock().unwrap();
2462 *pos += 1;
2463 ResponseTemplate::new(200).set_body_json(json!({
2464 "pos": pos.to_string(),
2465 "lists": {},
2466 "rooms": {}
2467 }))
2468 })
2469 .mount_as_scoped(&server)
2470 .await;
2471
2472 let sliding_sync = client
2473 .sliding_sync("test")?
2474 .with_to_device_extension(
2475 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2476 )
2477 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2478 .build()
2479 .await?;
2480
2481 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2484
2485 for result in requests.await {
2486 result?;
2487 }
2488
2489 Ok(())
2490 }
2491
2492 #[async_test]
2493 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2494 let server = MockServer::start().await;
2495 let client = logged_in_client(Some(server.uri())).await;
2496
2497 let pos = Arc::new(Mutex::new(0));
2498 let _mock_guard = Mock::given(SlidingSyncMatcher)
2499 .respond_with(move |_: &Request| {
2500 let mut pos = pos.lock().unwrap();
2501 *pos += 1;
2502 ResponseTemplate::new(200)
2504 .set_body_json(json!({
2505 "pos": pos.to_string(),
2506 "lists": {},
2507 "rooms": {}
2508 }))
2509 .set_delay(Duration::from_secs(2))
2510 })
2511 .mount_as_scoped(&server)
2512 .await;
2513
2514 let sliding_sync =
2515 client
2516 .sliding_sync("test")?
2517 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2518 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2519 ))
2520 .add_list(
2521 SlidingSyncList::builder("another-list")
2522 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2523 )
2524 .build()
2525 .await?;
2526
2527 let stream = sliding_sync.sync();
2528 pin_mut!(stream);
2529
2530 let cloned_sync = sliding_sync.clone();
2531 tokio::spawn(async move {
2532 tokio::time::sleep(Duration::from_millis(100)).await;
2533
2534 cloned_sync
2535 .on_list("another-list", |list| {
2536 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2537 ready(())
2538 })
2539 .await;
2540 });
2541
2542 assert_matches!(stream.next().await, Some(Ok(_)));
2543
2544 sliding_sync.stop_sync().unwrap();
2545
2546 assert_matches!(stream.next().await, None);
2547
2548 let mut num_requests = 0;
2549
2550 for request in server.received_requests().await.unwrap() {
2551 if !SlidingSyncMatcher.matches(&request) {
2552 continue;
2553 }
2554
2555 let another_list_ranges = if num_requests == 0 {
2556 json!([[0, 10]])
2558 } else {
2559 json!([[10, 20]])
2561 };
2562
2563 num_requests += 1;
2564 assert!(num_requests <= 2, "more than one request hit the server");
2565
2566 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2567
2568 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2569 &json_value,
2570 &json!({
2571 "conn_id": "test",
2572 "lists": {
2573 "room-list": {
2574 "ranges": [[0, 9]],
2575 "required_state": [
2576 ["m.room.encryption", ""],
2577 ["m.room.tombstone", ""]
2578 ],
2579 },
2580 "another-list": {
2581 "ranges": another_list_ranges,
2582 "required_state": [
2583 ["m.room.encryption", ""],
2584 ["m.room.tombstone", ""]
2585 ],
2586 },
2587 }
2588 }),
2589 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2590 ) {
2591 dbg!(json_value);
2592 panic!("json differ: {err}");
2593 }
2594 }
2595
2596 assert_eq!(num_requests, 2);
2597
2598 Ok(())
2599 }
2600
2601 #[async_test]
2602 async fn test_timeout_zero_list() -> Result<()> {
2603 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2604
2605 let (request, _, _) =
2606 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2607
2608 assert!(request.timeout.is_some());
2611
2612 Ok(())
2613 }
2614
2615 #[async_test]
2616 async fn test_timeout_one_list() -> Result<()> {
2617 let (_server, sliding_sync) = new_sliding_sync(vec![
2618 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2619 ])
2620 .await?;
2621
2622 let (request, _, _) =
2623 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2624
2625 assert!(request.timeout.is_none());
2627
2628 {
2630 let server_response = assign!(http::Response::new("0".to_owned()), {
2631 lists: BTreeMap::from([(
2632 "foo".to_owned(),
2633 assign!(http::response::List::default(), {
2634 count: uint!(7),
2635 })
2636 )])
2637 });
2638
2639 let _summary = {
2640 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2641 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2642 };
2643 }
2644
2645 let (request, _, _) =
2646 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2647
2648 assert!(request.timeout.is_some());
2650
2651 Ok(())
2652 }
2653
2654 #[async_test]
2655 async fn test_timeout_three_lists() -> Result<()> {
2656 let (_server, sliding_sync) = new_sliding_sync(vec![
2657 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2658 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2659 SlidingSyncList::builder("baz")
2660 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2661 ])
2662 .await?;
2663
2664 let (request, _, _) =
2665 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2666
2667 assert!(request.timeout.is_none());
2669
2670 {
2672 let server_response = assign!(http::Response::new("0".to_owned()), {
2673 lists: BTreeMap::from([(
2674 "foo".to_owned(),
2675 assign!(http::response::List::default(), {
2676 count: uint!(7),
2677 })
2678 )])
2679 });
2680
2681 let _summary = {
2682 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2683 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2684 };
2685 }
2686
2687 let (request, _, _) =
2688 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2689
2690 assert!(request.timeout.is_none());
2692
2693 {
2695 let server_response = assign!(http::Response::new("1".to_owned()), {
2696 lists: BTreeMap::from([(
2697 "bar".to_owned(),
2698 assign!(http::response::List::default(), {
2699 count: uint!(7),
2700 })
2701 )])
2702 });
2703
2704 let _summary = {
2705 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2706 sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2707 };
2708 }
2709
2710 let (request, _, _) =
2711 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2712
2713 assert!(request.timeout.is_some());
2715
2716 Ok(())
2717 }
2718
2719 #[async_test]
2720 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2721 let server = MockServer::start().await;
2722 let client = logged_in_client(Some(server.uri())).await;
2723
2724 let _mock_guard = Mock::given(SlidingSyncMatcher)
2725 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2726 "pos": "0",
2727 "lists": {},
2728 "rooms": {}
2729 })))
2730 .mount_as_scoped(&server)
2731 .await;
2732
2733 let sliding_sync = client
2734 .sliding_sync("test")?
2735 .with_to_device_extension(
2736 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2737 )
2738 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2739 .build()
2740 .await?;
2741
2742 let sliding_sync = Arc::new(sliding_sync);
2743
2744 let sync_beat_listener = client.inner.sync_beat.listen();
2746 sliding_sync.sync_once().await?;
2747
2748 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2750 Ok(())
2751 }
2752
2753 #[async_test]
2754 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2755 let server = MockServer::start().await;
2756 let client = logged_in_client(Some(server.uri())).await;
2757
2758 let _mock_guard = Mock::given(SlidingSyncMatcher)
2759 .respond_with(ResponseTemplate::new(404))
2760 .mount_as_scoped(&server)
2761 .await;
2762
2763 let sliding_sync = client
2764 .sliding_sync("test")?
2765 .with_to_device_extension(
2766 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2767 )
2768 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2769 .build()
2770 .await?;
2771
2772 let sliding_sync = Arc::new(sliding_sync);
2773
2774 let sync_beat_listener = client.inner.sync_beat.listen();
2776 let sync_result = sliding_sync.sync_once().await;
2777 assert!(sync_result.is_err());
2778
2779 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2781
2782 Ok(())
2783 }
2784}