1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26 collections::{btree_map::Entry, BTreeMap},
27 fmt::Debug,
28 future::Future,
29 sync::{Arc, RwLock as StdRwLock},
30 time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42 assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46 select,
47 sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53 cache::restore_sliding_sync_state,
54 client::SlidingSyncResponseProcessor,
55 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{config::RequestConfig, Client, Result};
58
59#[derive(Clone, Debug)]
63pub struct SlidingSync {
64 inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70 id: String,
74
75 client: Client,
77
78 poll_timeout: Duration,
80
81 network_timeout: Duration,
84
85 storage_key: String,
87
88 share_pos: bool,
95
96 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 pub async fn has_pos(&self) -> bool {
129 self.inner.position.lock().await.pos.is_some()
130 }
131
132 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
133 cache::store_sliding_sync_state(self, position).await
134 }
135
136 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
138 SlidingSyncBuilder::new(id, client)
139 }
140
141 pub fn subscribe_to_rooms(
148 &self,
149 room_ids: &[&RoomId],
150 settings: Option<http::request::RoomSubscription>,
151 cancel_in_flight_request: bool,
152 ) {
153 let settings = settings.unwrap_or_default();
154 let mut sticky = self.inner.sticky.write().unwrap();
155 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
156
157 let mut skip_over_current_sync_loop_iteration = false;
158
159 for room_id in room_ids {
160 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
167 if let Some(room) = self.inner.client.get_room(room_id) {
168 room.mark_members_missing();
169 }
170
171 entry.insert((RoomSubscriptionState::default(), settings.clone()));
172
173 skip_over_current_sync_loop_iteration = true;
174 }
175 }
176
177 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
178 self.inner.internal_channel_send_if_possible(
179 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
180 );
181 }
182 }
183
184 pub async fn on_list<Function, FunctionOutput, R>(
186 &self,
187 list_name: &str,
188 function: Function,
189 ) -> Option<R>
190 where
191 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
192 FunctionOutput: Future<Output = R>,
193 {
194 let lists = self.inner.lists.read().await;
195
196 match lists.get(list_name) {
197 Some(list) => Some(function(list).await),
198 None => None,
199 }
200 }
201
202 pub async fn add_list(
208 &self,
209 list_builder: SlidingSyncListBuilder,
210 ) -> Result<Option<SlidingSyncList>> {
211 let list = list_builder.build(self.inner.internal_channel.clone());
212
213 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
214
215 self.inner.internal_channel_send_if_possible(
216 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
217 );
218
219 Ok(old_list)
220 }
221
222 pub async fn add_cached_list(
229 &self,
230 mut list_builder: SlidingSyncListBuilder,
231 ) -> Result<Option<SlidingSyncList>> {
232 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
233
234 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
235
236 self.add_list(list_builder).await
237 }
238
239 #[instrument(skip_all)]
241 async fn handle_response(
242 &self,
243 sliding_sync_response: http::Response,
244 position: &mut SlidingSyncPositionMarkers,
245 requested_required_states: RequestedRequiredStates,
246 ) -> Result<UpdateSummary, crate::Error> {
247 let pos = Some(sliding_sync_response.pos.clone());
248
249 let must_process_rooms_response = self.must_process_rooms_response().await;
250
251 trace!(yes = must_process_rooms_response, "Must process rooms response?");
252
253 let sync_response = {
261 let _timer = timer!("response processor");
262
263 let response_processor = {
264 let _sync_lock = {
267 let _timer = timer!("acquiring the `sync_lock`");
268
269 self.inner.client.base_client().sync_lock().lock().await
270 };
271
272 let mut response_processor =
273 SlidingSyncResponseProcessor::new(self.inner.client.clone());
274
275 #[cfg(feature = "e2e-encryption")]
276 if self.is_e2ee_enabled() {
277 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
278 }
279
280 if must_process_rooms_response {
283 response_processor
284 .handle_room_response(&sliding_sync_response, &requested_required_states)
285 .await?;
286 }
287
288 response_processor
289 };
290
291 response_processor.process_and_take_response().await?
293 };
294
295 debug!("Sliding Sync response has been handled by the client");
296 trace!(?sync_response);
297
298 if let Some(ref txn_id) = sliding_sync_response.txn_id {
300 let txn_id = txn_id.as_str().into();
301 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
302 let mut lists = self.inner.lists.write().await;
303 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
304 }
305
306 let update_summary = {
307 let updated_rooms = {
309 let mut updated_rooms = Vec::with_capacity(
310 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
311 );
312
313 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
314
315 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
323
324 updated_rooms
325 };
326
327 let updated_lists = {
329 debug!(
330 lists = ?sliding_sync_response.lists,
331 "Update lists"
332 );
333
334 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
335 let mut lists = self.inner.lists.write().await;
336
337 for (name, list) in lists.iter_mut() {
340 if let Some(updates) = sliding_sync_response.lists.get(name) {
341 let maximum_number_of_rooms: u32 =
342 updates.count.try_into().expect("failed to convert `count` to `u32`");
343
344 if list.update(Some(maximum_number_of_rooms))? {
345 updated_lists.push(name.clone());
346 }
347 } else if list.update(None)? {
348 updated_lists.push(name.clone());
349 }
350 }
351
352 for name in sliding_sync_response.lists.keys() {
354 if !lists.contains_key(name) {
355 error!("Response for list `{name}` - unknown to us; skipping");
356 }
357 }
358
359 updated_lists
360 };
361
362 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
363 };
364
365 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
369
370 position.pos = pos;
371
372 Ok(update_summary)
373 }
374
375 #[instrument(skip_all)]
376 async fn generate_sync_request(
377 &self,
378 txn_id: &mut LazyTransactionId,
379 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
380 let mut requests_lists = BTreeMap::new();
382
383 let require_timeout = {
384 let lists = self.inner.lists.read().await;
385
386 let mut require_timeout = true;
388
389 for (name, list) in lists.iter() {
390 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
391 require_timeout = require_timeout && list.requires_timeout();
392 }
393
394 require_timeout
395 };
396
397 let mut position_guard = {
405 debug!("Waiting to acquire the `position` lock");
406
407 let _timer = timer!("acquiring the `position` lock");
408
409 self.inner.position.clone().lock_owned().await
410 };
411
412 debug!(pos = ?position_guard.pos, "Got a position");
413
414 let to_device_enabled =
415 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
416
417 let restored_fields = if self.inner.share_pos || to_device_enabled {
418 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
419 } else {
420 None
421 };
422
423 let pos = if self.inner.share_pos {
426 if let Some(fields) = &restored_fields {
427 if fields.pos != position_guard.pos {
429 info!(
430 "Pos from previous request ('{:?}') was different from \
431 pos in database ('{:?}').",
432 position_guard.pos, fields.pos
433 );
434 position_guard.pos = fields.pos.clone();
435 }
436 fields.pos.clone()
437 } else {
438 position_guard.pos.clone()
439 }
440 } else {
441 position_guard.pos.clone()
442 };
443
444 Span::current().record("pos", &pos);
445
446 #[cfg(feature = "e2e-encryption")]
455 if pos.is_none() && self.is_e2ee_enabled() {
456 info!("Marking all tracked users as dirty");
457
458 let olm_machine = self.inner.client.olm_machine().await;
459 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
460 olm_machine.mark_all_tracked_users_as_dirty().await?;
461 }
462
463 let timeout = require_timeout.then(|| self.inner.poll_timeout);
468
469 let mut request = assign!(http::Request::new(), {
470 conn_id: Some(self.inner.id.clone()),
471 pos,
472 timeout,
473 lists: requests_lists,
474 });
475
476 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
478
479 if to_device_enabled {
483 request.extensions.to_device.since =
484 restored_fields.and_then(|fields| fields.to_device_token);
485 }
486
487 if let Some(txn_id) = txn_id.get() {
489 request.txn_id = Some(txn_id.to_string());
490 }
491
492 Ok((
493 request,
495 RequestConfig::default()
498 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
499 .retry_limit(3),
500 position_guard,
501 ))
502 }
503
504 async fn send_sync_request(
508 &self,
509 request: http::Request,
510 request_config: RequestConfig,
511 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
512 ) -> Result<UpdateSummary> {
513 debug!("Sending request");
514
515 let requested_required_states = RequestedRequiredStates::from(&request);
517 let request = self.inner.client.send(request).with_request_config(request_config);
518
519 #[cfg(feature = "e2e-encryption")]
526 let response = {
527 if self.is_e2ee_enabled() {
528 let client = self.inner.client.clone();
545 let e2ee_uploads = spawn(
546 async move {
547 if let Err(error) = client.send_outgoing_requests().await {
548 error!(?error, "Error while sending outgoing E2EE requests");
549 }
550 }
551 .instrument(Span::current()),
552 )
553 .abort_on_drop();
556
557 let response = request.await?;
559
560 e2ee_uploads.await.map_err(|error| Error::JoinError {
565 task_description: "e2ee_uploads".to_owned(),
566 error,
567 })?;
568
569 response
570 } else {
571 request.await?
572 }
573 };
574
575 #[cfg(not(feature = "e2e-encryption"))]
577 let response = request.await?;
578
579 debug!("Received response");
580
581 let this = self.clone();
591
592 let future = async move {
595 debug!("Start handling response");
596
597 let updates = this
603 .handle_response(response, &mut position_guard, requested_required_states)
604 .await?;
605
606 this.cache_to_storage(&position_guard).await?;
607
608 drop(position_guard);
611
612 debug!("Done handling response");
613
614 Ok(updates)
615 };
616
617 spawn(future.instrument(Span::current())).await.unwrap()
618 }
619
620 #[cfg(feature = "e2e-encryption")]
622 fn is_e2ee_enabled(&self) -> bool {
623 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
624 }
625
626 #[cfg(not(feature = "e2e-encryption"))]
627 fn is_e2ee_enabled(&self) -> bool {
628 false
629 }
630
631 async fn must_process_rooms_response(&self) -> bool {
633 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
636 || !self.inner.lists.read().await.is_empty()
637 }
638
639 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
640 async fn sync_once(&self) -> Result<UpdateSummary> {
641 let (request, request_config, position_guard) =
642 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
643
644 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
646
647 self.inner.client.inner.sync_beat.notify(usize::MAX);
649
650 Ok(summaries)
651 }
652
653 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
663 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
664 debug!("Starting sync stream");
665
666 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
667
668 stream! {
669 loop {
670 debug!("Sync stream is running");
671
672 select! {
673 biased;
674
675 internal_message = internal_channel_receiver.recv() => {
676 use SlidingSyncInternalMessage::*;
677
678 debug!(?internal_message, "Sync stream has received an internal message");
679
680 match internal_message {
681 Err(_) | Ok(SyncLoopStop) => {
682 break;
683 }
684
685 Ok(SyncLoopSkipOverCurrentIteration) => {
686 continue;
687 }
688 }
689 }
690
691 update_summary = self.sync_once() => {
692 match update_summary {
693 Ok(updates) => {
694 yield Ok(updates);
695 }
696
697 Err(error) => {
699 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
700 self.expire_session().await;
702 }
703
704 yield Err(error);
705
706 break;
708 }
709 }
710 }
711 }
712 }
713
714 debug!("Sync stream has exited.");
715 }
716 }
717
718 pub fn stop_sync(&self) -> Result<()> {
727 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
728 }
729
730 #[doc(hidden)]
742 pub async fn expire_session(&self) {
743 info!("Session expired; resetting `pos` and sticky parameters");
744
745 {
746 let lists = self.inner.lists.read().await;
747 for list in lists.values() {
748 list.set_maximum_number_of_rooms(None);
750
751 list.invalidate_sticky_data();
753 }
754 }
755
756 {
758 let mut position = self.inner.position.lock().await;
759
760 position.pos = None;
762
763 if let Err(err) = self.cache_to_storage(&position).await {
767 warn!("Failed to invalidate cached sliding sync state: {err}");
768 }
769 }
770
771 {
772 let mut sticky = self.inner.sticky.write().unwrap();
773
774 sticky.data_mut().room_subscriptions.clear();
777 }
778 }
779}
780
781impl SlidingSyncInner {
782 #[instrument]
784 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
785 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
786 }
787
788 #[instrument]
791 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
792 let _ = self.internal_channel.send(message);
794 }
795}
796
797#[derive(Copy, Clone, Debug, PartialEq)]
798enum SlidingSyncInternalMessage {
799 SyncLoopStop,
801
802 SyncLoopSkipOverCurrentIteration,
805}
806
807#[cfg(any(test, feature = "testing"))]
808impl SlidingSync {
809 pub async fn set_pos(&self, new_pos: String) {
811 let mut position_lock = self.inner.position.lock().await;
812 position_lock.pos = Some(new_pos);
813 }
814
815 pub fn extensions_config(&self) -> http::request::Extensions {
821 let sticky = self.inner.sticky.read().unwrap();
822 sticky.data().extensions.clone()
823 }
824}
825
826#[derive(Clone, Debug)]
827pub(super) struct SlidingSyncPositionMarkers {
828 pos: Option<String>,
831}
832
833#[derive(Serialize, Deserialize)]
834struct FrozenSlidingSyncPos {
835 #[serde(skip_serializing_if = "Option::is_none")]
836 pos: Option<String>,
837}
838
839#[derive(Debug, Clone)]
842pub struct UpdateSummary {
843 pub lists: Vec<String>,
845 pub rooms: Vec<OwnedRoomId>,
847}
848
849#[derive(Debug, Default)]
853enum RoomSubscriptionState {
854 #[default]
858 Pending,
859
860 Applied,
863}
864
865#[derive(Debug)]
868pub(super) struct SlidingSyncStickyParameters {
869 room_subscriptions:
872 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
873
874 extensions: http::request::Extensions,
877}
878
879impl SlidingSyncStickyParameters {
880 pub fn new(
882 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
883 extensions: http::request::Extensions,
884 ) -> Self {
885 Self {
886 room_subscriptions: room_subscriptions
887 .into_iter()
888 .map(|(room_id, room_subscription)| {
889 (room_id, (RoomSubscriptionState::Pending, room_subscription))
890 })
891 .collect(),
892 extensions,
893 }
894 }
895}
896
897impl StickyData for SlidingSyncStickyParameters {
898 type Request = http::Request;
899
900 fn apply(&self, request: &mut Self::Request) {
901 request.room_subscriptions = self
902 .room_subscriptions
903 .iter()
904 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
905 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
906 .collect();
907 request.extensions = self.extensions.clone();
908 }
909
910 fn on_commit(&mut self) {
911 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
913 if matches!(state, RoomSubscriptionState::Pending) {
914 *state = RoomSubscriptionState::Applied;
915 }
916 }
917 }
918}
919
920#[cfg(all(test, not(target_family = "wasm")))]
921#[allow(clippy::dbg_macro)]
922mod tests {
923 use std::{
924 collections::BTreeMap,
925 future::ready,
926 ops::Not,
927 sync::{Arc, Mutex},
928 time::Duration,
929 };
930
931 use assert_matches::assert_matches;
932 use event_listener::Listener;
933 use futures_util::{future::join_all, pin_mut, StreamExt};
934 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
935 use matrix_sdk_common::executor::spawn;
936 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
937 use ruma::{
938 api::client::error::ErrorKind,
939 assign,
940 events::{direct::DirectEvent, room::member::MembershipState},
941 owned_room_id, room_id,
942 serde::Raw,
943 uint, OwnedRoomId, TransactionId,
944 };
945 use serde::Deserialize;
946 use serde_json::json;
947 use wiremock::{
948 http::Method, matchers::method, Match, Mock, MockServer, Request, ResponseTemplate,
949 };
950
951 use super::{
952 http,
953 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
954 SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
955 SlidingSyncStickyParameters,
956 };
957 use crate::{
958 sliding_sync::cache::restore_sliding_sync_state,
959 test_utils::{logged_in_client, mocks::MatrixMockServer},
960 Client, Result,
961 };
962
963 #[derive(Copy, Clone)]
964 struct SlidingSyncMatcher;
965
966 impl Match for SlidingSyncMatcher {
967 fn matches(&self, request: &Request) -> bool {
968 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
969 && request.method == Method::POST
970 }
971 }
972
973 async fn new_sliding_sync(
974 lists: Vec<SlidingSyncListBuilder>,
975 ) -> Result<(MockServer, SlidingSync)> {
976 let server = MockServer::start().await;
977 let client = logged_in_client(Some(server.uri())).await;
978
979 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
980
981 for list in lists {
982 sliding_sync_builder = sliding_sync_builder.add_list(list);
983 }
984
985 let sliding_sync = sliding_sync_builder.build().await?;
986
987 Ok((server, sliding_sync))
988 }
989
990 #[async_test]
991 async fn test_subscribe_to_rooms() -> Result<()> {
992 let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
993 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
994 .await?;
995
996 let stream = sliding_sync.sync();
997 pin_mut!(stream);
998
999 let room_id_0 = room_id!("!r0:bar.org");
1000 let room_id_1 = room_id!("!r1:bar.org");
1001 let room_id_2 = room_id!("!r2:bar.org");
1002
1003 {
1004 let _mock_guard = Mock::given(SlidingSyncMatcher)
1005 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1006 "pos": "1",
1007 "lists": {},
1008 "rooms": {
1009 room_id_0: {
1010 "name": "Room #0",
1011 "initial": true,
1012 },
1013 room_id_1: {
1014 "name": "Room #1",
1015 "initial": true,
1016 },
1017 room_id_2: {
1018 "name": "Room #2",
1019 "initial": true,
1020 },
1021 }
1022 })))
1023 .mount_as_scoped(&server)
1024 .await;
1025
1026 let _ = stream.next().await.unwrap()?;
1027 }
1028
1029 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1030
1031 assert!(room0.are_members_synced().not());
1035
1036 {
1037 struct MemberMatcher(OwnedRoomId);
1038
1039 impl Match for MemberMatcher {
1040 fn matches(&self, request: &Request) -> bool {
1041 request.url.path()
1042 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1043 && request.method == Method::GET
1044 }
1045 }
1046
1047 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1048 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1049 "chunk": [],
1050 })))
1051 .mount_as_scoped(&server)
1052 .await;
1053
1054 assert_matches!(room0.request_members().await, Ok(()));
1055 }
1056
1057 assert!(room0.are_members_synced());
1059
1060 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1061
1062 assert!(room0.are_members_synced().not());
1065
1066 {
1067 let sticky = sliding_sync.inner.sticky.read().unwrap();
1068 let room_subscriptions = &sticky.data().room_subscriptions;
1069
1070 assert!(room_subscriptions.contains_key(room_id_0));
1071 assert!(room_subscriptions.contains_key(room_id_1));
1072 assert!(!room_subscriptions.contains_key(room_id_2));
1073 }
1074
1075 {
1078 struct MemberMatcher(OwnedRoomId);
1079
1080 impl Match for MemberMatcher {
1081 fn matches(&self, request: &Request) -> bool {
1082 request.url.path()
1083 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1084 && request.method == Method::GET
1085 }
1086 }
1087
1088 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1089 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1090 "chunk": [],
1091 })))
1092 .mount_as_scoped(&server)
1093 .await;
1094
1095 assert_matches!(room0.request_members().await, Ok(()));
1096 }
1097
1098 assert!(room0.are_members_synced());
1100
1101 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1102
1103 assert!(room0.are_members_synced());
1106
1107 Ok(())
1108 }
1109
1110 #[async_test]
1111 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1112 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1113 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1114 .await?;
1115
1116 let room_id_0 = room_id!("!r0:bar.org");
1117 let room_id_1 = room_id!("!r1:bar.org");
1118 let room_id_2 = room_id!("!r2:bar.org");
1119
1120 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1122
1123 {
1124 let sticky = sliding_sync.inner.sticky.read().unwrap();
1125 let room_subscriptions = &sticky.data().room_subscriptions;
1126
1127 assert!(room_subscriptions.contains_key(room_id_0));
1128 assert!(room_subscriptions.contains_key(room_id_1));
1129 assert!(room_subscriptions.contains_key(room_id_2).not());
1130 }
1131
1132 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1134
1135 {
1136 let sticky = sliding_sync.inner.sticky.read().unwrap();
1137 let room_subscriptions = &sticky.data().room_subscriptions;
1138
1139 assert!(room_subscriptions.contains_key(room_id_0));
1140 assert!(room_subscriptions.contains_key(room_id_1));
1141 assert!(room_subscriptions.contains_key(room_id_2));
1142 }
1143
1144 sliding_sync.expire_session().await;
1146
1147 {
1148 let sticky = sliding_sync.inner.sticky.read().unwrap();
1149 let room_subscriptions = &sticky.data().room_subscriptions;
1150
1151 assert!(room_subscriptions.is_empty());
1152 }
1153
1154 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1156
1157 {
1158 let sticky = sliding_sync.inner.sticky.read().unwrap();
1159 let room_subscriptions = &sticky.data().room_subscriptions;
1160
1161 assert!(room_subscriptions.contains_key(room_id_0).not());
1162 assert!(room_subscriptions.contains_key(room_id_1).not());
1163 assert!(room_subscriptions.contains_key(room_id_2));
1164 }
1165
1166 Ok(())
1167 }
1168
1169 #[async_test]
1170 async fn test_add_list() -> Result<()> {
1171 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1172 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1173 .await?;
1174
1175 let _stream = sliding_sync.sync();
1176 pin_mut!(_stream);
1177
1178 sliding_sync
1179 .add_list(
1180 SlidingSyncList::builder("bar")
1181 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1182 )
1183 .await?;
1184
1185 let lists = sliding_sync.inner.lists.read().await;
1186
1187 assert!(lists.contains_key("foo"));
1188 assert!(lists.contains_key("bar"));
1189
1190 Ok(())
1193 }
1194
1195 #[test]
1196 fn test_sticky_parameters_api_invalidated_flow() {
1197 let r0 = room_id!("!r0.matrix.org");
1198 let r1 = room_id!("!r1:matrix.org");
1199
1200 let mut room_subscriptions = BTreeMap::new();
1201 room_subscriptions.insert(r0.to_owned(), Default::default());
1202
1203 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1205 room_subscriptions,
1206 Default::default(),
1207 ));
1208 assert!(sticky.is_invalidated());
1209
1210 let txn_id: &TransactionId = "tid123".into();
1212
1213 let mut request = http::Request::default();
1214 request.txn_id = Some(txn_id.to_string());
1215
1216 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1217
1218 assert!(request.txn_id.is_some());
1219 assert_eq!(request.room_subscriptions.len(), 1);
1220 assert!(request.room_subscriptions.contains_key(r0));
1221
1222 let tid = request.txn_id.unwrap();
1223
1224 sticky.maybe_commit(tid.as_str().into());
1225 assert!(!sticky.is_invalidated());
1226
1227 sticky
1229 .data_mut()
1230 .room_subscriptions
1231 .insert(r1.to_owned(), (Default::default(), Default::default()));
1232 assert!(sticky.is_invalidated());
1233
1234 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1236 assert!(sticky.is_invalidated());
1237
1238 let txn_id1: &TransactionId = "tid456".into();
1240 let mut request1 = http::Request::default();
1241 request1.txn_id = Some(txn_id1.to_string());
1242 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1243
1244 assert!(sticky.is_invalidated());
1245 assert_eq!(request1.room_subscriptions.len(), 1);
1249 assert!(request1.room_subscriptions.contains_key(r1));
1250
1251 let txn_id2: &TransactionId = "tid789".into();
1252 let mut request2 = http::Request::default();
1253 request2.txn_id = Some(txn_id2.to_string());
1254
1255 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1256 assert!(sticky.is_invalidated());
1257 assert_eq!(request2.room_subscriptions.len(), 1);
1260 assert!(request2.room_subscriptions.contains_key(r1));
1261
1262 sticky.maybe_commit(txn_id1);
1265 assert!(sticky.is_invalidated());
1266
1267 sticky.maybe_commit(txn_id2);
1269 assert!(!sticky.is_invalidated());
1270 }
1271
1272 #[test]
1273 fn test_room_subscriptions_are_sticky() {
1274 let r0 = room_id!("!r0.matrix.org");
1275 let r1 = room_id!("!r1:matrix.org");
1276
1277 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1278 BTreeMap::new(),
1279 Default::default(),
1280 ));
1281
1282 {
1284 sticky
1286 .data_mut()
1287 .room_subscriptions
1288 .insert(r0.to_owned(), (Default::default(), Default::default()));
1289
1290 let txn_id: &TransactionId = "tid0".into();
1292 let mut request = http::Request::default();
1293 request.txn_id = Some(txn_id.to_string());
1294
1295 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1296
1297 assert!(request.txn_id.is_some());
1298 assert_eq!(request.room_subscriptions.len(), 1);
1299 assert!(request.room_subscriptions.contains_key(r0));
1300
1301 let tid = request.txn_id.unwrap();
1303
1304 sticky.maybe_commit(tid.as_str().into());
1305 }
1306
1307 {
1309 sticky
1311 .data_mut()
1312 .room_subscriptions
1313 .insert(r1.to_owned(), (Default::default(), Default::default()));
1314
1315 let txn_id: &TransactionId = "tid1".into();
1317 let mut request = http::Request::default();
1318 request.txn_id = Some(txn_id.to_string());
1319
1320 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1321
1322 assert!(request.txn_id.is_some());
1323 assert_eq!(request.room_subscriptions.len(), 1);
1324 assert!(request.room_subscriptions.contains_key(r1));
1326
1327 }
1331
1332 {
1334 let txn_id: &TransactionId = "tid2".into();
1336 let mut request = http::Request::default();
1337 request.txn_id = Some(txn_id.to_string());
1338
1339 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1340
1341 assert!(request.txn_id.is_some());
1342 assert_eq!(request.room_subscriptions.len(), 1);
1343 assert!(request.room_subscriptions.contains_key(r1));
1345
1346 let tid = request.txn_id.unwrap();
1348
1349 sticky.maybe_commit(tid.as_str().into());
1350 }
1351
1352 {
1354 let txn_id: &TransactionId = "tid3".into();
1356 let mut request = http::Request::default();
1357 request.txn_id = Some(txn_id.to_string());
1358
1359 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1360
1361 assert!(request.txn_id.is_some());
1362 assert!(request.room_subscriptions.is_empty());
1364 }
1365 }
1366
1367 #[test]
1368 fn test_extensions_are_sticky() {
1369 let mut extensions = http::request::Extensions::default();
1370 extensions.account_data.enabled = Some(true);
1371
1372 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1374 Default::default(),
1375 extensions,
1376 ));
1377
1378 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1379
1380 let extensions = &sticky.data().extensions;
1383 assert_eq!(extensions.e2ee.enabled, None);
1384 assert_eq!(extensions.to_device.enabled, None);
1385 assert_eq!(extensions.to_device.since, None);
1386
1387 assert_eq!(extensions.account_data.enabled, Some(true));
1389
1390 let txn_id: &TransactionId = "tid123".into();
1391 let mut request = http::Request::default();
1392 request.txn_id = Some(txn_id.to_string());
1393 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1394 assert!(sticky.is_invalidated());
1395 assert_eq!(request.extensions.to_device.enabled, None);
1396 assert_eq!(request.extensions.to_device.since, None);
1397 assert_eq!(request.extensions.e2ee.enabled, None);
1398 assert_eq!(request.extensions.account_data.enabled, Some(true));
1399 }
1400
1401 #[async_test]
1402 async fn test_sticky_extensions_plus_since() -> Result<()> {
1403 let server = MockServer::start().await;
1404 let client = logged_in_client(Some(server.uri())).await;
1405
1406 let sync = client
1407 .sliding_sync("test-slidingsync")?
1408 .add_list(SlidingSyncList::builder("new_list"))
1409 .build()
1410 .await?;
1411
1412 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1414 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1415 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1416
1417 let sync = client
1419 .sliding_sync("test-slidingsync")?
1420 .add_list(SlidingSyncList::builder("new_list"))
1421 .with_to_device_extension(
1422 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1423 )
1424 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1425 .build()
1426 .await?;
1427
1428 let txn_id = TransactionId::new();
1431 let (request, _, _) = sync
1432 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1433 .await?;
1434
1435 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1436 assert_eq!(request.extensions.to_device.enabled, Some(true));
1437 assert!(request.extensions.to_device.since.is_none());
1438
1439 {
1440 let mut sticky = sync.inner.sticky.write().unwrap();
1442 assert!(sticky.is_invalidated());
1443 sticky.maybe_commit(
1444 "hopefully the rng won't generate this very specific transaction id".into(),
1445 );
1446 assert!(sticky.is_invalidated());
1447 }
1448
1449 let txn_id2 = TransactionId::new();
1451 let (request, _, _) = sync
1452 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1453 .await?;
1454
1455 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1456 assert_eq!(request.extensions.to_device.enabled, Some(true));
1457 assert!(request.extensions.to_device.since.is_none());
1458
1459 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1460
1461 {
1462 let mut sticky = sync.inner.sticky.write().unwrap();
1464 assert!(sticky.is_invalidated());
1465 sticky.maybe_commit(txn_id2.as_str().into());
1466 assert!(!sticky.is_invalidated());
1467 }
1468
1469 let txn_id = TransactionId::new();
1471 let (request, _, _) = sync
1472 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1473 .await?;
1474 assert!(request.extensions.e2ee.enabled.is_none());
1475 assert!(request.extensions.to_device.enabled.is_none());
1476 assert!(request.extensions.to_device.since.is_none());
1477
1478 let _since_token = "since";
1482
1483 #[cfg(feature = "e2e-encryption")]
1484 {
1485 use matrix_sdk_base::crypto::store::types::Changes;
1486 if let Some(olm_machine) = &*client.olm_machine().await {
1487 olm_machine
1488 .store()
1489 .save_changes(Changes {
1490 next_batch_token: Some(_since_token.to_owned()),
1491 ..Default::default()
1492 })
1493 .await?;
1494 }
1495 }
1496
1497 let txn_id = TransactionId::new();
1498 let (request, _, _) = sync
1499 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1500 .await?;
1501
1502 assert!(request.extensions.e2ee.enabled.is_none());
1503 assert!(request.extensions.to_device.enabled.is_none());
1504
1505 #[cfg(feature = "e2e-encryption")]
1506 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1507
1508 Ok(())
1509 }
1510
1511 #[async_test]
1517 #[cfg(feature = "e2e-encryption")]
1518 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1519 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1520 use matrix_sdk_test::ruma_response_from_json;
1521 use ruma::user_id;
1522
1523 let server = MockServer::start().await;
1524 let client = logged_in_client(Some(server.uri())).await;
1525
1526 let alice = user_id!("@alice:localhost");
1527 let bob = user_id!("@bob:localhost");
1528 let me = user_id!("@example:localhost");
1529
1530 {
1533 let olm_machine = client.olm_machine().await;
1534 let olm_machine = olm_machine.as_ref().unwrap();
1535
1536 olm_machine.update_tracked_users([alice, bob]).await?;
1537
1538 let outgoing_requests = olm_machine.outgoing_requests().await?;
1540
1541 assert_eq!(outgoing_requests.len(), 2);
1542 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1543 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1544
1545 olm_machine
1547 .mark_request_as_sent(
1548 outgoing_requests[0].request_id(),
1549 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1550 "one_time_key_counts": {}
1551 }))),
1552 )
1553 .await?;
1554
1555 olm_machine
1556 .mark_request_as_sent(
1557 outgoing_requests[1].request_id(),
1558 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1559 "device_keys": {
1560 alice: {},
1561 bob: {},
1562 }
1563 }))),
1564 )
1565 .await?;
1566
1567 let outgoing_requests = olm_machine.outgoing_requests().await?;
1569
1570 assert_eq!(outgoing_requests.len(), 1);
1571 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1572
1573 olm_machine
1574 .mark_request_as_sent(
1575 outgoing_requests[0].request_id(),
1576 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1577 "device_keys": {
1578 me: {},
1579 }
1580 }))),
1581 )
1582 .await?;
1583
1584 let outgoing_requests = olm_machine.outgoing_requests().await?;
1586
1587 assert!(outgoing_requests.is_empty());
1588 }
1589
1590 let sync = client
1591 .sliding_sync("test-slidingsync")?
1592 .add_list(SlidingSyncList::builder("new_list"))
1593 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1594 .build()
1595 .await?;
1596
1597 let txn_id = TransactionId::new();
1599 let (_request, _, _) = sync
1600 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1601 .await?;
1602
1603 {
1605 let olm_machine = client.olm_machine().await;
1606 let olm_machine = olm_machine.as_ref().unwrap();
1607
1608 let outgoing_requests = olm_machine.outgoing_requests().await?;
1610
1611 assert_eq!(outgoing_requests.len(), 1);
1612 assert_matches!(
1613 outgoing_requests[0].request(),
1614 AnyOutgoingRequest::KeysQuery(request) => {
1615 assert!(request.device_keys.contains_key(alice));
1616 assert!(request.device_keys.contains_key(bob));
1617 assert!(request.device_keys.contains_key(me));
1618 }
1619 );
1620
1621 olm_machine
1623 .mark_request_as_sent(
1624 outgoing_requests[0].request_id(),
1625 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1626 "device_keys": {
1627 alice: {},
1628 bob: {},
1629 me: {},
1630 }
1631 }))),
1632 )
1633 .await?;
1634 }
1635
1636 sync.set_pos("chocolat".to_owned()).await;
1638
1639 let txn_id = TransactionId::new();
1640 let (_request, _, _) = sync
1641 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1642 .await?;
1643
1644 {
1646 let olm_machine = client.olm_machine().await;
1647 let olm_machine = olm_machine.as_ref().unwrap();
1648
1649 let outgoing_requests = olm_machine.outgoing_requests().await?;
1651
1652 assert!(outgoing_requests.is_empty());
1653 }
1654
1655 Ok(())
1656 }
1657
1658 #[async_test]
1659 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1660 let server = MockServer::start().await;
1661 let client = logged_in_client(Some(server.uri())).await;
1662
1663 let sliding_sync = client
1664 .sliding_sync("test-slidingsync")?
1665 .with_to_device_extension(
1666 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1667 )
1668 .build()
1669 .await?;
1670
1671 let (request, _, _) =
1673 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1674 assert!(request.extensions.to_device.enabled.is_some());
1675
1676 let sync = sliding_sync.sync();
1677 pin_mut!(sync);
1678
1679 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1681
1682 #[derive(Deserialize)]
1683 struct PartialRequest {
1684 txn_id: Option<String>,
1685 }
1686
1687 {
1688 let _mock_guard = Mock::given(SlidingSyncMatcher)
1689 .respond_with(|request: &Request| {
1690 let request: PartialRequest = request.body_json().unwrap();
1692
1693 ResponseTemplate::new(200).set_body_json(json!({
1694 "txn_id": request.txn_id,
1695 "pos": "0",
1696 }))
1697 })
1698 .mount_as_scoped(&server)
1699 .await;
1700
1701 let next = sync.next().await;
1702 assert_matches!(next, Some(Ok(_update_summary)));
1703
1704 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1706 }
1707
1708 let (request, _, _) =
1710 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1711 assert!(request.extensions.to_device.enabled.is_none());
1712
1713 {
1715 let _mock_guard = Mock::given(SlidingSyncMatcher)
1716 .respond_with(|request: &Request| {
1717 let request: PartialRequest = request.body_json().unwrap();
1719
1720 ResponseTemplate::new(200).set_body_json(json!({
1721 "txn_id": request.txn_id,
1722 "pos": "1",
1723 }))
1724 })
1725 .mount_as_scoped(&server)
1726 .await;
1727
1728 let next = sync.next().await;
1729 assert_matches!(next, Some(Ok(_update_summary)));
1730
1731 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1733 }
1734
1735 {
1738 let _mock_guard = Mock::given(SlidingSyncMatcher)
1739 .respond_with(|request: &Request| {
1740 let request: PartialRequest = request.body_json().unwrap();
1742
1743 ResponseTemplate::new(200).set_body_json(json!({
1744 "txn_id": request.txn_id,
1745 "pos": "0", }))
1747 })
1748 .up_to_n_times(1) .mount_as_scoped(&server)
1750 .await;
1751
1752 let next = sync.next().await;
1753 assert_matches!(next, Some(Ok(_update_summary)));
1754
1755 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1757 }
1758
1759 {
1764 let _mock_guard = Mock::given(SlidingSyncMatcher)
1765 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1766 "error": "foo",
1767 "errcode": "M_UNKNOWN_POS",
1768 })))
1769 .mount_as_scoped(&server)
1770 .await;
1771
1772 let next = sync.next().await;
1773
1774 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1776
1777 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1779
1780 let (request, _, _) =
1782 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1783
1784 assert!(request.extensions.to_device.enabled.is_some());
1785
1786 assert!(sync.next().await.is_none());
1788 }
1789
1790 Ok(())
1791 }
1792
1793 #[cfg(feature = "e2e-encryption")]
1794 #[async_test]
1795 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1796 let server = MockServer::start().await;
1797
1798 #[derive(Deserialize)]
1799 struct PartialRequest {
1800 txn_id: Option<String>,
1801 }
1802
1803 let server_pos = Arc::new(Mutex::new(0));
1804 let _mock_guard = Mock::given(SlidingSyncMatcher)
1805 .respond_with(move |request: &Request| {
1806 let request: PartialRequest = request.body_json().unwrap();
1808 let pos = {
1809 let mut pos = server_pos.lock().unwrap();
1810 let prev = *pos;
1811 *pos += 1;
1812 prev
1813 };
1814
1815 ResponseTemplate::new(200).set_body_json(json!({
1816 "txn_id": request.txn_id,
1817 "pos": pos.to_string(),
1818 }))
1819 })
1820 .mount_as_scoped(&server)
1821 .await;
1822
1823 let client = logged_in_client(Some(server.uri())).await;
1824
1825 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1826
1827 {
1829 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1830
1831 let (request, _, _) =
1832 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1833 assert!(request.pos.is_none());
1834 }
1835
1836 let sync = sliding_sync.sync();
1837 pin_mut!(sync);
1838
1839 let next = sync.next().await;
1842 assert_matches!(next, Some(Ok(_update_summary)));
1843
1844 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1845
1846 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1847 .await?
1848 .expect("must have restored fields");
1849
1850 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1853
1854 {
1858 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1859
1860 let mut position_guard = other_sync.inner.position.lock().await;
1861 position_guard.pos = Some("yolo".to_owned());
1862
1863 other_sync.cache_to_storage(&position_guard).await?;
1864 }
1865
1866 {
1868 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1869 let (request, _, _) =
1870 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1871 assert_eq!(request.pos.as_deref(), Some("0"));
1872 }
1873
1874 {
1877 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1878 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1879 }
1880
1881 Ok(())
1882 }
1883
1884 #[cfg(feature = "e2e-encryption")]
1885 #[async_test]
1886 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1887 let server = MockServer::start().await;
1888
1889 #[derive(Deserialize)]
1890 struct PartialRequest {
1891 txn_id: Option<String>,
1892 }
1893
1894 let server_pos = Arc::new(Mutex::new(0));
1895 let _mock_guard = Mock::given(SlidingSyncMatcher)
1896 .respond_with(move |request: &Request| {
1897 let request: PartialRequest = request.body_json().unwrap();
1899 let pos = {
1900 let mut pos = server_pos.lock().unwrap();
1901 let prev = *pos;
1902 *pos += 1;
1903 prev
1904 };
1905
1906 ResponseTemplate::new(200).set_body_json(json!({
1907 "txn_id": request.txn_id,
1908 "pos": pos.to_string(),
1909 }))
1910 })
1911 .mount_as_scoped(&server)
1912 .await;
1913
1914 let client = logged_in_client(Some(server.uri())).await;
1915
1916 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1917
1918 {
1920 let (request, _, _) =
1921 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1922
1923 assert!(request.pos.is_none());
1924 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1925 }
1926
1927 let sync = sliding_sync.sync();
1928 pin_mut!(sync);
1929
1930 let next = sync.next().await;
1933 assert_matches!(next, Some(Ok(_update_summary)));
1934
1935 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1936
1937 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1938 .await?
1939 .expect("must have restored fields");
1940
1941 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1944
1945 {
1947 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1948
1949 let mut position_guard = other_sync.inner.position.lock().await;
1950 position_guard.pos = Some("42".to_owned());
1951
1952 other_sync.cache_to_storage(&position_guard).await?;
1953 }
1954
1955 {
1957 let (request, _, _) =
1958 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1959 assert_eq!(request.pos.as_deref(), Some("42"));
1960 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1961 }
1962
1963 {
1965 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1966 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1967
1968 let (request, _, _) =
1969 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1970 assert_eq!(request.pos.as_deref(), Some("42"));
1971 }
1972
1973 sliding_sync.expire_session().await;
1976
1977 {
1978 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1979
1980 let (request, _, _) =
1981 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1982 assert!(request.pos.is_none());
1983 }
1984
1985 {
1987 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1988 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1989
1990 let (request, _, _) =
1991 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1992 assert!(request.pos.is_none());
1993 }
1994
1995 Ok(())
1996 }
1997
1998 #[async_test]
1999 async fn test_stop_sync_loop() -> Result<()> {
2000 let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2001 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2002 .await?;
2003
2004 let stream = sliding_sync.sync();
2006 pin_mut!(stream);
2007
2008 assert!(stream.next().await.is_some());
2010
2011 sliding_sync.stop_sync()?;
2013
2014 assert!(stream.next().await.is_none());
2016
2017 let stream = sliding_sync.sync();
2019 pin_mut!(stream);
2020
2021 assert!(stream.next().await.is_some());
2023
2024 Ok(())
2025 }
2026
2027 #[async_test]
2028 async fn test_process_read_receipts() -> Result<()> {
2029 let room = owned_room_id!("!pony:example.org");
2030
2031 let server = MockServer::start().await;
2032 let client = logged_in_client(Some(server.uri())).await;
2033 client.event_cache().subscribe().unwrap();
2034
2035 let sliding_sync = client
2036 .sliding_sync("test")?
2037 .with_receipt_extension(
2038 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2039 )
2040 .add_list(
2041 SlidingSyncList::builder("all")
2042 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2043 )
2044 .build()
2045 .await?;
2046
2047 {
2049 let server_response = assign!(http::Response::new("0".to_owned()), {
2050 rooms: BTreeMap::from([(
2051 room.clone(),
2052 http::response::Room::default(),
2053 )])
2054 });
2055
2056 let _summary = {
2057 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2058 sliding_sync
2059 .handle_response(
2060 server_response.clone(),
2061 &mut pos_guard,
2062 RequestedRequiredStates::default(),
2063 )
2064 .await?
2065 };
2066 }
2067
2068 let server_response = assign!(http::Response::new("1".to_owned()), {
2069 extensions: assign!(http::response::Extensions::default(), {
2070 receipts: assign!(http::response::Receipts::default(), {
2071 rooms: BTreeMap::from([
2072 (
2073 room.clone(),
2074 Raw::from_json_string(
2075 json!({
2076 "room_id": room,
2077 "type": "m.receipt",
2078 "content": {
2079 "$event:bar.org": {
2080 "m.read": {
2081 client.user_id().unwrap(): {
2082 "ts": 1436451550,
2083 }
2084 }
2085 }
2086 }
2087 })
2088 .to_string(),
2089 ).unwrap()
2090 )
2091 ])
2092 })
2093 })
2094 });
2095
2096 let summary = {
2097 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2098 sliding_sync
2099 .handle_response(
2100 server_response.clone(),
2101 &mut pos_guard,
2102 RequestedRequiredStates::default(),
2103 )
2104 .await?
2105 };
2106
2107 assert!(summary.rooms.contains(&room));
2108
2109 Ok(())
2110 }
2111
2112 #[async_test]
2113 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2114 let room_id = owned_room_id!("!unicorn:example.org");
2115
2116 let server = MockServer::start().await;
2117 let client = logged_in_client(Some(server.uri())).await;
2118
2119 let sliding_sync = client
2122 .sliding_sync("test")?
2123 .with_account_data_extension(
2124 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2125 )
2126 .add_list(
2127 SlidingSyncList::builder("all")
2128 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2129 )
2130 .build()
2131 .await?;
2132
2133 {
2135 let server_response = assign!(http::Response::new("0".to_owned()), {
2136 rooms: BTreeMap::from([(
2137 room_id.clone(),
2138 http::response::Room::default(),
2139 )])
2140 });
2141
2142 let _summary = {
2143 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2144 sliding_sync
2145 .handle_response(
2146 server_response.clone(),
2147 &mut pos_guard,
2148 RequestedRequiredStates::default(),
2149 )
2150 .await?
2151 };
2152 }
2153
2154 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2158
2159 let update_summary = {
2160 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2161 sliding_sync
2162 .handle_response(
2163 server_response.clone(),
2164 &mut pos_guard,
2165 RequestedRequiredStates::default(),
2166 )
2167 .await?
2168 };
2169
2170 assert!(update_summary.rooms.contains(&room_id));
2173
2174 let room = client.get_room(&room_id).unwrap();
2175
2176 assert!(room.is_marked_unread());
2179
2180 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2183
2184 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2185 sliding_sync
2186 .handle_response(
2187 server_response.clone(),
2188 &mut pos_guard,
2189 RequestedRequiredStates::default(),
2190 )
2191 .await?;
2192
2193 let room = client.get_room(&room_id).unwrap();
2194
2195 assert!(!room.is_marked_unread());
2196
2197 Ok(())
2198 }
2199
2200 fn make_mark_unread_response(
2201 response_number: &str,
2202 room_id: OwnedRoomId,
2203 unread: bool,
2204 add_rooms_section: bool,
2205 ) -> http::Response {
2206 let rooms = if add_rooms_section {
2207 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2208 } else {
2209 BTreeMap::new()
2210 };
2211
2212 let extensions = assign!(http::response::Extensions::default(), {
2213 account_data: assign!(http::response::AccountData::default(), {
2214 rooms: BTreeMap::from([
2215 (
2216 room_id,
2217 vec![
2218 Raw::from_json_string(
2219 json!({
2220 "content": {
2221 "unread": unread
2222 },
2223 "type": "m.marked_unread"
2224 })
2225 .to_string(),
2226 ).unwrap()
2227 ]
2228 )
2229 ])
2230 })
2231 });
2232
2233 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2234 }
2235
2236 #[async_test]
2237 async fn test_process_rooms_account_data() -> Result<()> {
2238 let room = owned_room_id!("!pony:example.org");
2239
2240 let server = MockServer::start().await;
2241 let client = logged_in_client(Some(server.uri())).await;
2242
2243 let sliding_sync = client
2244 .sliding_sync("test")?
2245 .with_account_data_extension(
2246 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2247 )
2248 .add_list(
2249 SlidingSyncList::builder("all")
2250 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2251 )
2252 .build()
2253 .await?;
2254
2255 {
2257 let server_response = assign!(http::Response::new("0".to_owned()), {
2258 rooms: BTreeMap::from([(
2259 room.clone(),
2260 http::response::Room::default(),
2261 )])
2262 });
2263
2264 let _summary = {
2265 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2266 sliding_sync
2267 .handle_response(
2268 server_response.clone(),
2269 &mut pos_guard,
2270 RequestedRequiredStates::default(),
2271 )
2272 .await?
2273 };
2274 }
2275
2276 let server_response = assign!(http::Response::new("1".to_owned()), {
2277 extensions: assign!(http::response::Extensions::default(), {
2278 account_data: assign!(http::response::AccountData::default(), {
2279 rooms: BTreeMap::from([
2280 (
2281 room.clone(),
2282 vec![
2283 Raw::from_json_string(
2284 json!({
2285 "content": {
2286 "tags": {
2287 "u.work": {
2288 "order": 0.9
2289 }
2290 }
2291 },
2292 "type": "m.tag"
2293 })
2294 .to_string(),
2295 ).unwrap()
2296 ]
2297 )
2298 ])
2299 })
2300 })
2301 });
2302 let summary = {
2303 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2304 sliding_sync
2305 .handle_response(
2306 server_response.clone(),
2307 &mut pos_guard,
2308 RequestedRequiredStates::default(),
2309 )
2310 .await?
2311 };
2312
2313 assert!(summary.rooms.contains(&room));
2314
2315 Ok(())
2316 }
2317
2318 #[async_test]
2319 #[cfg(feature = "e2e-encryption")]
2320 async fn test_process_only_encryption_events() -> Result<()> {
2321 use ruma::OneTimeKeyAlgorithm;
2322
2323 let room = owned_room_id!("!croissant:example.org");
2324
2325 let server = MockServer::start().await;
2326 let client = logged_in_client(Some(server.uri())).await;
2327
2328 let server_response = assign!(http::Response::new("0".to_owned()), {
2329 rooms: BTreeMap::from([(
2330 room.clone(),
2331 assign!(http::response::Room::default(), {
2332 name: Some("Croissants lovers".to_owned()),
2333 timeline: Vec::new(),
2334 }),
2335 )]),
2336
2337 extensions: assign!(http::response::Extensions::default(), {
2338 e2ee: assign!(http::response::E2EE::default(), {
2339 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2340 }),
2341 to_device: Some(assign!(http::response::ToDevice::default(), {
2342 next_batch: "to-device-token".to_owned(),
2343 })),
2344 })
2345 });
2346
2347 let sliding_sync = client
2351 .sliding_sync("test")?
2352 .with_to_device_extension(
2353 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2354 )
2355 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2356 .build()
2357 .await?;
2358
2359 {
2360 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2361
2362 sliding_sync
2363 .handle_response(
2364 server_response.clone(),
2365 &mut position_guard,
2366 RequestedRequiredStates::default(),
2367 )
2368 .await?;
2369 }
2370
2371 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2373 assert_eq!(uploaded_key_count, 42);
2374
2375 {
2376 let olm_machine = &*client.olm_machine_for_testing().await;
2377 assert_eq!(
2378 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2379 Some("to-device-token")
2380 );
2381 }
2382
2383 assert!(client.get_room(&room).is_none());
2385
2386 let client = logged_in_client(Some(server.uri())).await;
2389
2390 let sliding_sync = client
2391 .sliding_sync("test")?
2392 .add_list(SlidingSyncList::builder("thelist"))
2393 .build()
2394 .await?;
2395
2396 {
2397 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2398
2399 sliding_sync
2400 .handle_response(
2401 server_response.clone(),
2402 &mut position_guard,
2403 RequestedRequiredStates::default(),
2404 )
2405 .await?;
2406 }
2407
2408 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2410 assert_eq!(uploaded_key_count, 0);
2411
2412 {
2413 let olm_machine = &*client.olm_machine_for_testing().await;
2414 assert_eq!(
2415 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2416 None
2417 );
2418 }
2419
2420 assert!(client.get_room(&room).is_some());
2422
2423 let client = logged_in_client(Some(server.uri())).await;
2425
2426 let sliding_sync = client
2427 .sliding_sync("test")?
2428 .add_list(SlidingSyncList::builder("thelist"))
2429 .with_to_device_extension(
2430 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2431 )
2432 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2433 .build()
2434 .await?;
2435
2436 {
2437 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2438
2439 sliding_sync
2440 .handle_response(
2441 server_response.clone(),
2442 &mut position_guard,
2443 RequestedRequiredStates::default(),
2444 )
2445 .await?;
2446 }
2447
2448 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2450 assert_eq!(uploaded_key_count, 42);
2451
2452 {
2453 let olm_machine = &*client.olm_machine_for_testing().await;
2454 assert_eq!(
2455 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2456 Some("to-device-token")
2457 );
2458 }
2459
2460 assert!(client.get_room(&room).is_some());
2462
2463 Ok(())
2464 }
2465
2466 #[async_test]
2467 async fn test_lock_multiple_requests() -> Result<()> {
2468 let server = MockServer::start().await;
2469 let client = logged_in_client(Some(server.uri())).await;
2470
2471 let pos = Arc::new(Mutex::new(0));
2472 let _mock_guard = Mock::given(SlidingSyncMatcher)
2473 .respond_with(move |_: &Request| {
2474 let mut pos = pos.lock().unwrap();
2475 *pos += 1;
2476 ResponseTemplate::new(200).set_body_json(json!({
2477 "pos": pos.to_string(),
2478 "lists": {},
2479 "rooms": {}
2480 }))
2481 })
2482 .mount_as_scoped(&server)
2483 .await;
2484
2485 let sliding_sync = client
2486 .sliding_sync("test")?
2487 .with_to_device_extension(
2488 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2489 )
2490 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2491 .build()
2492 .await?;
2493
2494 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2497
2498 for result in requests.await {
2499 result?;
2500 }
2501
2502 Ok(())
2503 }
2504
2505 #[async_test]
2506 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2507 let server = MockServer::start().await;
2508 let client = logged_in_client(Some(server.uri())).await;
2509
2510 let pos = Arc::new(Mutex::new(0));
2511 let _mock_guard = Mock::given(SlidingSyncMatcher)
2512 .respond_with(move |_: &Request| {
2513 let mut pos = pos.lock().unwrap();
2514 *pos += 1;
2515 ResponseTemplate::new(200)
2517 .set_body_json(json!({
2518 "pos": pos.to_string(),
2519 "lists": {},
2520 "rooms": {}
2521 }))
2522 .set_delay(Duration::from_secs(2))
2523 })
2524 .mount_as_scoped(&server)
2525 .await;
2526
2527 let sliding_sync =
2528 client
2529 .sliding_sync("test")?
2530 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2531 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2532 ))
2533 .add_list(
2534 SlidingSyncList::builder("another-list")
2535 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2536 )
2537 .build()
2538 .await?;
2539
2540 let stream = sliding_sync.sync();
2541 pin_mut!(stream);
2542
2543 let cloned_sync = sliding_sync.clone();
2544 spawn(async move {
2545 tokio::time::sleep(Duration::from_millis(100)).await;
2546
2547 cloned_sync
2548 .on_list("another-list", |list| {
2549 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2550 ready(())
2551 })
2552 .await;
2553 });
2554
2555 assert_matches!(stream.next().await, Some(Ok(_)));
2556
2557 sliding_sync.stop_sync().unwrap();
2558
2559 assert_matches!(stream.next().await, None);
2560
2561 let mut num_requests = 0;
2562
2563 for request in server.received_requests().await.unwrap() {
2564 if !SlidingSyncMatcher.matches(&request) {
2565 continue;
2566 }
2567
2568 let another_list_ranges = if num_requests == 0 {
2569 json!([[0, 10]])
2571 } else {
2572 json!([[10, 20]])
2574 };
2575
2576 num_requests += 1;
2577 assert!(num_requests <= 2, "more than one request hit the server");
2578
2579 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2580
2581 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2582 &json_value,
2583 &json!({
2584 "conn_id": "test",
2585 "lists": {
2586 "room-list": {
2587 "ranges": [[0, 9]],
2588 "required_state": [
2589 ["m.room.encryption", ""],
2590 ["m.room.tombstone", ""]
2591 ],
2592 },
2593 "another-list": {
2594 "ranges": another_list_ranges,
2595 "required_state": [
2596 ["m.room.encryption", ""],
2597 ["m.room.tombstone", ""]
2598 ],
2599 },
2600 }
2601 }),
2602 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2603 ) {
2604 dbg!(json_value);
2605 panic!("json differ: {err}");
2606 }
2607 }
2608
2609 assert_eq!(num_requests, 2);
2610
2611 Ok(())
2612 }
2613
2614 #[async_test]
2615 async fn test_timeout_zero_list() -> Result<()> {
2616 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2617
2618 let (request, _, _) =
2619 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2620
2621 assert!(request.timeout.is_some());
2624
2625 Ok(())
2626 }
2627
2628 #[async_test]
2629 async fn test_timeout_one_list() -> Result<()> {
2630 let (_server, sliding_sync) = new_sliding_sync(vec![
2631 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2632 ])
2633 .await?;
2634
2635 let (request, _, _) =
2636 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2637
2638 assert!(request.timeout.is_none());
2640
2641 {
2643 let server_response = assign!(http::Response::new("0".to_owned()), {
2644 lists: BTreeMap::from([(
2645 "foo".to_owned(),
2646 assign!(http::response::List::default(), {
2647 count: uint!(7),
2648 })
2649 )])
2650 });
2651
2652 let _summary = {
2653 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2654 sliding_sync
2655 .handle_response(
2656 server_response.clone(),
2657 &mut pos_guard,
2658 RequestedRequiredStates::default(),
2659 )
2660 .await?
2661 };
2662 }
2663
2664 let (request, _, _) =
2665 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2666
2667 assert!(request.timeout.is_some());
2669
2670 Ok(())
2671 }
2672
2673 #[async_test]
2674 async fn test_timeout_three_lists() -> Result<()> {
2675 let (_server, sliding_sync) = new_sliding_sync(vec![
2676 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2677 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2678 SlidingSyncList::builder("baz")
2679 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2680 ])
2681 .await?;
2682
2683 let (request, _, _) =
2684 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2685
2686 assert!(request.timeout.is_none());
2688
2689 {
2691 let server_response = assign!(http::Response::new("0".to_owned()), {
2692 lists: BTreeMap::from([(
2693 "foo".to_owned(),
2694 assign!(http::response::List::default(), {
2695 count: uint!(7),
2696 })
2697 )])
2698 });
2699
2700 let _summary = {
2701 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2702 sliding_sync
2703 .handle_response(
2704 server_response.clone(),
2705 &mut pos_guard,
2706 RequestedRequiredStates::default(),
2707 )
2708 .await?
2709 };
2710 }
2711
2712 let (request, _, _) =
2713 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2714
2715 assert!(request.timeout.is_none());
2717
2718 {
2720 let server_response = assign!(http::Response::new("1".to_owned()), {
2721 lists: BTreeMap::from([(
2722 "bar".to_owned(),
2723 assign!(http::response::List::default(), {
2724 count: uint!(7),
2725 })
2726 )])
2727 });
2728
2729 let _summary = {
2730 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2731 sliding_sync
2732 .handle_response(
2733 server_response.clone(),
2734 &mut pos_guard,
2735 RequestedRequiredStates::default(),
2736 )
2737 .await?
2738 };
2739 }
2740
2741 let (request, _, _) =
2742 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2743
2744 assert!(request.timeout.is_some());
2746
2747 Ok(())
2748 }
2749
2750 #[async_test]
2751 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2752 let server = MockServer::start().await;
2753 let client = logged_in_client(Some(server.uri())).await;
2754
2755 let _mock_guard = Mock::given(SlidingSyncMatcher)
2756 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2757 "pos": "0",
2758 "lists": {},
2759 "rooms": {}
2760 })))
2761 .mount_as_scoped(&server)
2762 .await;
2763
2764 let sliding_sync = client
2765 .sliding_sync("test")?
2766 .with_to_device_extension(
2767 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2768 )
2769 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2770 .build()
2771 .await?;
2772
2773 let sliding_sync = Arc::new(sliding_sync);
2774
2775 let sync_beat_listener = client.inner.sync_beat.listen();
2777 sliding_sync.sync_once().await?;
2778
2779 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2781 Ok(())
2782 }
2783
2784 #[async_test]
2785 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2786 let server = MockServer::start().await;
2787 let client = logged_in_client(Some(server.uri())).await;
2788
2789 let _mock_guard = Mock::given(SlidingSyncMatcher)
2790 .respond_with(ResponseTemplate::new(404))
2791 .mount_as_scoped(&server)
2792 .await;
2793
2794 let sliding_sync = client
2795 .sliding_sync("test")?
2796 .with_to_device_extension(
2797 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2798 )
2799 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2800 .build()
2801 .await?;
2802
2803 let sliding_sync = Arc::new(sliding_sync);
2804
2805 let sync_beat_listener = client.inner.sync_beat.listen();
2807 let sync_result = sliding_sync.sync_once().await;
2808 assert!(sync_result.is_err());
2809
2810 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2812
2813 Ok(())
2814 }
2815
2816 #[async_test]
2817 async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2818 let server = MatrixMockServer::new().await;
2819 let client = server.client_builder().build().await;
2820 let room_id = room_id!("!mu5hr00m:example.org");
2821
2822 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2823 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2824 "pos": "0",
2825 "lists": {},
2826 "extensions": {
2827 "account_data": {
2828 "global": [
2829 {
2830 "type": "m.direct",
2831 "content": {
2832 "@de4dlockh0lmes:example.org": [
2833 "!mu5hr00m:example.org"
2834 ]
2835 }
2836 }
2837 ]
2838 }
2839 },
2840 "rooms": {
2841 room_id: {
2842 "name": "Mario Bros Fanbase Room",
2843 "initial": true,
2844 },
2845 }
2846 })))
2847 .mount_as_scoped(server.server())
2848 .await;
2849
2850 let f = EventFactory::new().room(room_id);
2851
2852 Mock::given(method("GET"))
2853 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2854 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2855 "chunk": [
2856 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2857 ]
2858 })))
2859 .mount(server.server())
2860 .await;
2861
2862 let (tx, rx) = tokio::sync::oneshot::channel();
2863
2864 let tx = Arc::new(Mutex::new(Some(tx)));
2865 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2866 let members =
2868 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2869 assert_eq!(members.len(), 1);
2870 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2871 });
2872
2873 let sliding_sync = client
2874 .sliding_sync("test")?
2875 .add_list(SlidingSyncList::builder("thelist"))
2876 .with_account_data_extension(
2877 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2878 )
2879 .build()
2880 .await?;
2881
2882 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2883 .await
2884 .expect("Sync did not complete in time")
2885 .expect("Sync failed");
2886
2887 tokio::time::timeout(Duration::from_secs(5), rx)
2889 .await
2890 .expect("Event handler did not complete in time")
2891 .expect("Event handler failed");
2892
2893 Ok(())
2894 }
2895}