1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26 collections::{BTreeMap, btree_map::Entry},
27 fmt::Debug,
28 future::Future,
29 sync::{Arc, RwLock as StdRwLock},
30 time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41 OwnedRoomId, RoomId,
42 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
43 assign,
44};
45use tokio::{
46 select,
47 sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
48};
49use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53 cache::restore_sliding_sync_state,
54 client::SlidingSyncResponseProcessor,
55 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{Client, Result, config::RequestConfig};
58
59#[derive(Clone, Debug)]
63pub struct SlidingSync {
64 inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70 id: String,
74
75 client: Client,
77
78 poll_timeout: Duration,
80
81 network_timeout: Duration,
84
85 storage_key: String,
87
88 share_pos: bool,
95
96 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127 cache::store_sliding_sync_state(self, position).await
128 }
129
130 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132 SlidingSyncBuilder::new(id, client)
133 }
134
135 pub fn subscribe_to_rooms(
142 &self,
143 room_ids: &[&RoomId],
144 settings: Option<http::request::RoomSubscription>,
145 cancel_in_flight_request: bool,
146 ) {
147 let settings = settings.unwrap_or_default();
148 let mut sticky = self.inner.sticky.write().unwrap();
149 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
150
151 let mut skip_over_current_sync_loop_iteration = false;
152
153 for room_id in room_ids {
154 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
161 if let Some(room) = self.inner.client.get_room(room_id) {
162 room.mark_members_missing();
163 }
164
165 entry.insert((RoomSubscriptionState::default(), settings.clone()));
166
167 skip_over_current_sync_loop_iteration = true;
168 }
169 }
170
171 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172 self.inner.internal_channel_send_if_possible(
173 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174 );
175 }
176 }
177
178 pub async fn on_list<Function, FunctionOutput, R>(
180 &self,
181 list_name: &str,
182 function: Function,
183 ) -> Option<R>
184 where
185 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
186 FunctionOutput: Future<Output = R>,
187 {
188 let lists = self.inner.lists.read().await;
189
190 match lists.get(list_name) {
191 Some(list) => Some(function(list).await),
192 None => None,
193 }
194 }
195
196 pub async fn add_list(
202 &self,
203 list_builder: SlidingSyncListBuilder,
204 ) -> Result<Option<SlidingSyncList>> {
205 let list = list_builder.build(self.inner.internal_channel.clone());
206
207 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
208
209 self.inner.internal_channel_send_if_possible(
210 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
211 );
212
213 Ok(old_list)
214 }
215
216 pub async fn add_cached_list(
223 &self,
224 mut list_builder: SlidingSyncListBuilder,
225 ) -> Result<Option<SlidingSyncList>> {
226 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
227
228 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
229
230 self.add_list(list_builder).await
231 }
232
233 #[instrument(skip_all)]
235 async fn handle_response(
236 &self,
237 mut sliding_sync_response: http::Response,
238 position: &mut SlidingSyncPositionMarkers,
239 requested_required_states: RequestedRequiredStates,
240 ) -> Result<UpdateSummary, crate::Error> {
241 let pos = Some(sliding_sync_response.pos.clone());
242
243 let must_process_rooms_response = self.must_process_rooms_response().await;
244
245 trace!(yes = must_process_rooms_response, "Must process rooms response?");
246
247 let sync_response = {
255 let _timer = timer!("response processor");
256
257 let response_processor = {
258 let _state_store_lock = {
261 let _timer = timer!("acquiring the `state_store_lock`");
262
263 self.inner.client.base_client().state_store_lock().lock().await
264 };
265
266 let mut response_processor =
267 SlidingSyncResponseProcessor::new(self.inner.client.clone());
268
269 if self.is_thread_subscriptions_enabled() {
275 response_processor
276 .handle_thread_subscriptions(
277 position.pos.as_deref(),
278 std::mem::take(
279 &mut sliding_sync_response.extensions.thread_subscriptions,
280 ),
281 )
282 .await?;
283 }
284
285 #[cfg(feature = "e2e-encryption")]
286 if self.is_e2ee_enabled() {
287 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
288 }
289
290 if must_process_rooms_response {
293 response_processor
294 .handle_room_response(&sliding_sync_response, &requested_required_states)
295 .await?;
296 }
297
298 response_processor
299 };
300
301 response_processor.process_and_take_response().await?
303 };
304
305 debug!("Sliding Sync response has been handled by the client");
306 trace!(?sync_response);
307
308 if let Some(ref txn_id) = sliding_sync_response.txn_id {
310 let txn_id = txn_id.as_str().into();
311 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
312 let mut lists = self.inner.lists.write().await;
313 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
314 }
315
316 let update_summary = {
317 let updated_rooms = {
319 let mut updated_rooms = Vec::with_capacity(
320 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
321 );
322
323 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
324
325 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
333
334 updated_rooms
335 };
336
337 let updated_lists = {
339 debug!(
340 lists = ?sliding_sync_response.lists,
341 "Update lists"
342 );
343
344 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
345 let mut lists = self.inner.lists.write().await;
346
347 for (name, list) in lists.iter_mut() {
350 if let Some(updates) = sliding_sync_response.lists.get(name) {
351 let maximum_number_of_rooms: u32 =
352 updates.count.try_into().expect("failed to convert `count` to `u32`");
353
354 if list.update(Some(maximum_number_of_rooms))? {
355 updated_lists.push(name.clone());
356 }
357 } else if list.update(None)? {
358 updated_lists.push(name.clone());
359 }
360 }
361
362 for name in sliding_sync_response.lists.keys() {
364 if !lists.contains_key(name) {
365 error!("Response for list `{name}` - unknown to us; skipping");
366 }
367 }
368
369 updated_lists
370 };
371
372 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
373 };
374
375 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
379
380 position.pos = pos;
381
382 Ok(update_summary)
383 }
384
385 #[instrument(skip_all)]
386 async fn generate_sync_request(
387 &self,
388 txn_id: &mut LazyTransactionId,
389 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
390 let mut requests_lists = BTreeMap::new();
392
393 let require_timeout = {
394 let lists = self.inner.lists.read().await;
395
396 let mut require_timeout = true;
398
399 for (name, list) in lists.iter() {
400 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
401 require_timeout = require_timeout && list.requires_timeout();
402 }
403
404 require_timeout
405 };
406
407 let mut position_guard = {
415 debug!("Waiting to acquire the `position` lock");
416
417 let _timer = timer!("acquiring the `position` lock");
418
419 self.inner.position.clone().lock_owned().await
420 };
421
422 debug!(pos = ?position_guard.pos, "Got a position");
423
424 let to_device_enabled =
425 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
426
427 let restored_fields = if self.inner.share_pos || to_device_enabled {
428 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
429 } else {
430 None
431 };
432
433 let pos = if self.inner.share_pos {
436 if let Some(fields) = &restored_fields {
437 if fields.pos != position_guard.pos {
439 info!(
440 "Pos from previous request ('{:?}') was different from \
441 pos in database ('{:?}').",
442 position_guard.pos, fields.pos
443 );
444 position_guard.pos = fields.pos.clone();
445 }
446 fields.pos.clone()
447 } else {
448 position_guard.pos.clone()
449 }
450 } else {
451 position_guard.pos.clone()
452 };
453
454 Span::current().record("pos", &pos);
455
456 #[cfg(feature = "e2e-encryption")]
465 if pos.is_none() && self.is_e2ee_enabled() {
466 info!("Marking all tracked users as dirty");
467
468 let olm_machine = self.inner.client.olm_machine().await;
469 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
470 olm_machine.mark_all_tracked_users_as_dirty().await?;
471 }
472
473 let timeout = require_timeout.then(|| self.inner.poll_timeout);
478
479 let mut request = assign!(http::Request::new(), {
480 conn_id: Some(self.inner.id.clone()),
481 pos,
482 timeout,
483 lists: requests_lists,
484 });
485
486 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
488
489 if to_device_enabled {
493 request.extensions.to_device.since =
494 restored_fields.and_then(|fields| fields.to_device_token);
495 }
496
497 if let Some(txn_id) = txn_id.get() {
499 request.txn_id = Some(txn_id.to_string());
500 }
501
502 Ok((
503 request,
505 RequestConfig::default()
508 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
509 .retry_limit(3),
510 position_guard,
511 ))
512 }
513
514 async fn send_sync_request(
518 &self,
519 request: http::Request,
520 request_config: RequestConfig,
521 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
522 ) -> Result<UpdateSummary> {
523 debug!("Sending request");
524
525 let requested_required_states = RequestedRequiredStates::from(&request);
527 let request = self.inner.client.send(request).with_request_config(request_config);
528
529 #[cfg(feature = "e2e-encryption")]
536 let response = {
537 if self.is_e2ee_enabled() {
538 let client = self.inner.client.clone();
555 let e2ee_uploads = spawn(
556 async move {
557 if let Err(error) = client.send_outgoing_requests().await {
558 error!(?error, "Error while sending outgoing E2EE requests");
559 }
560 }
561 .instrument(Span::current()),
562 )
563 .abort_on_drop();
566
567 let response = request.await?;
569
570 e2ee_uploads.await.map_err(|error| Error::JoinError {
575 task_description: "e2ee_uploads".to_owned(),
576 error,
577 })?;
578
579 response
580 } else {
581 request.await?
582 }
583 };
584
585 #[cfg(not(feature = "e2e-encryption"))]
587 let response = request.await?;
588
589 debug!("Received response");
590
591 let this = self.clone();
601
602 let future = async move {
605 debug!("Start handling response");
606
607 let updates = this
613 .handle_response(response, &mut position_guard, requested_required_states)
614 .await?;
615
616 this.cache_to_storage(&position_guard).await?;
617
618 drop(position_guard);
621
622 debug!("Done handling response");
623
624 Ok(updates)
625 };
626
627 spawn(future.instrument(Span::current())).await.unwrap()
628 }
629
630 #[cfg(feature = "e2e-encryption")]
632 fn is_e2ee_enabled(&self) -> bool {
633 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
634 }
635
636 fn is_thread_subscriptions_enabled(&self) -> bool {
639 self.inner.sticky.read().unwrap().data().extensions.thread_subscriptions.enabled
640 == Some(true)
641 }
642
643 #[cfg(not(feature = "e2e-encryption"))]
644 fn is_e2ee_enabled(&self) -> bool {
645 false
646 }
647
648 async fn must_process_rooms_response(&self) -> bool {
650 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
653 || !self.inner.lists.read().await.is_empty()
654 }
655
656 #[doc(hidden)]
660 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
661 pub async fn sync_once(&self) -> Result<UpdateSummary> {
662 let (request, request_config, position_guard) =
663 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
664
665 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
667
668 self.inner.client.inner.sync_beat.notify(usize::MAX);
670
671 Ok(summaries)
672 }
673
674 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
684 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
685 debug!("Starting sync stream");
686
687 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
688
689 stream! {
690 loop {
691 debug!("Sync stream is running");
692
693 select! {
694 biased;
695
696 internal_message = internal_channel_receiver.recv() => {
697 use SlidingSyncInternalMessage::*;
698
699 debug!(?internal_message, "Sync stream has received an internal message");
700
701 match internal_message {
702 Err(_) | Ok(SyncLoopStop) => {
703 break;
704 }
705
706 Ok(SyncLoopSkipOverCurrentIteration) => {
707 continue;
708 }
709 }
710 }
711
712 update_summary = self.sync_once() => {
713 match update_summary {
714 Ok(updates) => {
715 yield Ok(updates);
716 }
717
718 Err(error) => {
720 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
721 self.expire_session().await;
723 }
724
725 yield Err(error);
726
727 break;
729 }
730 }
731 }
732 }
733 }
734
735 debug!("Sync stream has exited.");
736 }
737 }
738
739 pub fn stop_sync(&self) -> Result<()> {
748 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
749 }
750
751 #[doc(hidden)]
763 pub async fn expire_session(&self) {
764 info!("Session expired; resetting `pos` and sticky parameters");
765
766 {
767 let lists = self.inner.lists.read().await;
768 for list in lists.values() {
769 list.set_maximum_number_of_rooms(None);
771
772 list.invalidate_sticky_data();
774 }
775 }
776
777 {
779 let mut position = self.inner.position.lock().await;
780
781 position.pos = None;
783
784 if let Err(err) = self.cache_to_storage(&position).await {
788 warn!("Failed to invalidate cached sliding sync state: {err}");
789 }
790 }
791
792 {
793 let mut sticky = self.inner.sticky.write().unwrap();
794
795 sticky.data_mut().room_subscriptions.clear();
798 }
799 }
800}
801
802impl SlidingSyncInner {
803 #[instrument]
805 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
806 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
807 }
808
809 #[instrument]
812 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
813 let _ = self.internal_channel.send(message);
815 }
816}
817
818#[derive(Copy, Clone, Debug, PartialEq)]
819enum SlidingSyncInternalMessage {
820 SyncLoopStop,
822
823 SyncLoopSkipOverCurrentIteration,
826}
827
828#[cfg(any(test, feature = "testing"))]
829impl SlidingSync {
830 pub async fn set_pos(&self, new_pos: String) {
832 let mut position_lock = self.inner.position.lock().await;
833 position_lock.pos = Some(new_pos);
834 }
835
836 pub fn extensions_config(&self) -> http::request::Extensions {
842 let sticky = self.inner.sticky.read().unwrap();
843 sticky.data().extensions.clone()
844 }
845}
846
847#[derive(Clone, Debug)]
848pub(super) struct SlidingSyncPositionMarkers {
849 pos: Option<String>,
852}
853
854#[derive(Debug, Clone)]
857pub struct UpdateSummary {
858 pub lists: Vec<String>,
860 pub rooms: Vec<OwnedRoomId>,
862}
863
864#[derive(Debug, Default)]
868enum RoomSubscriptionState {
869 #[default]
873 Pending,
874
875 Applied,
878}
879
880#[derive(Debug)]
883pub(super) struct SlidingSyncStickyParameters {
884 room_subscriptions:
887 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
888
889 extensions: http::request::Extensions,
892}
893
894impl SlidingSyncStickyParameters {
895 pub fn new(
897 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
898 extensions: http::request::Extensions,
899 ) -> Self {
900 Self {
901 room_subscriptions: room_subscriptions
902 .into_iter()
903 .map(|(room_id, room_subscription)| {
904 (room_id, (RoomSubscriptionState::Pending, room_subscription))
905 })
906 .collect(),
907 extensions,
908 }
909 }
910}
911
912impl StickyData for SlidingSyncStickyParameters {
913 type Request = http::Request;
914
915 fn apply(&self, request: &mut Self::Request) {
916 request.room_subscriptions = self
917 .room_subscriptions
918 .iter()
919 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
920 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
921 .collect();
922 request.extensions = self.extensions.clone();
923 }
924
925 fn on_commit(&mut self) {
926 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
928 if matches!(state, RoomSubscriptionState::Pending) {
929 *state = RoomSubscriptionState::Applied;
930 }
931 }
932 }
933}
934
935#[cfg(all(test, not(target_family = "wasm")))]
936#[allow(clippy::dbg_macro)]
937mod tests {
938 use std::{
939 collections::BTreeMap,
940 future::ready,
941 ops::Not,
942 sync::{Arc, Mutex},
943 time::Duration,
944 };
945
946 use assert_matches::assert_matches;
947 use event_listener::Listener;
948 use futures_util::{StreamExt, future::join_all, pin_mut};
949 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
950 use matrix_sdk_common::executor::spawn;
951 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
952 use ruma::{
953 OwnedRoomId, TransactionId,
954 api::client::error::ErrorKind,
955 assign,
956 events::{direct::DirectEvent, room::member::MembershipState},
957 owned_room_id, room_id,
958 serde::Raw,
959 uint,
960 };
961 use serde::Deserialize;
962 use serde_json::json;
963 use wiremock::{
964 Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
965 };
966
967 use super::{
968 SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
969 SlidingSyncStickyParameters, http,
970 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
971 };
972 use crate::{
973 Client, Result,
974 sliding_sync::cache::restore_sliding_sync_state,
975 test_utils::{logged_in_client, mocks::MatrixMockServer},
976 };
977
978 #[derive(Copy, Clone)]
979 struct SlidingSyncMatcher;
980
981 impl Match for SlidingSyncMatcher {
982 fn matches(&self, request: &Request) -> bool {
983 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
984 && request.method == Method::POST
985 }
986 }
987
988 async fn new_sliding_sync(
989 lists: Vec<SlidingSyncListBuilder>,
990 ) -> Result<(MockServer, SlidingSync)> {
991 let server = MockServer::start().await;
992 let client = logged_in_client(Some(server.uri())).await;
993
994 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
995
996 for list in lists {
997 sliding_sync_builder = sliding_sync_builder.add_list(list);
998 }
999
1000 let sliding_sync = sliding_sync_builder.build().await?;
1001
1002 Ok((server, sliding_sync))
1003 }
1004
1005 #[async_test]
1006 async fn test_subscribe_to_rooms() -> Result<()> {
1007 let (server, sliding_sync) = new_sliding_sync(vec![
1008 SlidingSyncList::builder("foo")
1009 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1010 ])
1011 .await?;
1012
1013 let stream = sliding_sync.sync();
1014 pin_mut!(stream);
1015
1016 let room_id_0 = room_id!("!r0:bar.org");
1017 let room_id_1 = room_id!("!r1:bar.org");
1018 let room_id_2 = room_id!("!r2:bar.org");
1019
1020 {
1021 let _mock_guard = Mock::given(SlidingSyncMatcher)
1022 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1023 "pos": "1",
1024 "lists": {},
1025 "rooms": {
1026 room_id_0: {
1027 "name": "Room #0",
1028 "initial": true,
1029 },
1030 room_id_1: {
1031 "name": "Room #1",
1032 "initial": true,
1033 },
1034 room_id_2: {
1035 "name": "Room #2",
1036 "initial": true,
1037 },
1038 }
1039 })))
1040 .mount_as_scoped(&server)
1041 .await;
1042
1043 let _ = stream.next().await.unwrap()?;
1044 }
1045
1046 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1047
1048 assert!(room0.are_members_synced().not());
1052
1053 {
1054 struct MemberMatcher(OwnedRoomId);
1055
1056 impl Match for MemberMatcher {
1057 fn matches(&self, request: &Request) -> bool {
1058 request.url.path()
1059 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1060 && request.method == Method::GET
1061 }
1062 }
1063
1064 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1065 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1066 "chunk": [],
1067 })))
1068 .mount_as_scoped(&server)
1069 .await;
1070
1071 assert_matches!(room0.request_members().await, Ok(()));
1072 }
1073
1074 assert!(room0.are_members_synced());
1076
1077 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1078
1079 assert!(room0.are_members_synced().not());
1082
1083 {
1084 let sticky = sliding_sync.inner.sticky.read().unwrap();
1085 let room_subscriptions = &sticky.data().room_subscriptions;
1086
1087 assert!(room_subscriptions.contains_key(room_id_0));
1088 assert!(room_subscriptions.contains_key(room_id_1));
1089 assert!(!room_subscriptions.contains_key(room_id_2));
1090 }
1091
1092 {
1095 struct MemberMatcher(OwnedRoomId);
1096
1097 impl Match for MemberMatcher {
1098 fn matches(&self, request: &Request) -> bool {
1099 request.url.path()
1100 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1101 && request.method == Method::GET
1102 }
1103 }
1104
1105 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1106 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1107 "chunk": [],
1108 })))
1109 .mount_as_scoped(&server)
1110 .await;
1111
1112 assert_matches!(room0.request_members().await, Ok(()));
1113 }
1114
1115 assert!(room0.are_members_synced());
1117
1118 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1119
1120 assert!(room0.are_members_synced());
1123
1124 Ok(())
1125 }
1126
1127 #[async_test]
1128 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1129 let (_server, sliding_sync) = new_sliding_sync(vec![
1130 SlidingSyncList::builder("foo")
1131 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1132 ])
1133 .await?;
1134
1135 let room_id_0 = room_id!("!r0:bar.org");
1136 let room_id_1 = room_id!("!r1:bar.org");
1137 let room_id_2 = room_id!("!r2:bar.org");
1138
1139 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1141
1142 {
1143 let sticky = sliding_sync.inner.sticky.read().unwrap();
1144 let room_subscriptions = &sticky.data().room_subscriptions;
1145
1146 assert!(room_subscriptions.contains_key(room_id_0));
1147 assert!(room_subscriptions.contains_key(room_id_1));
1148 assert!(room_subscriptions.contains_key(room_id_2).not());
1149 }
1150
1151 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1153
1154 {
1155 let sticky = sliding_sync.inner.sticky.read().unwrap();
1156 let room_subscriptions = &sticky.data().room_subscriptions;
1157
1158 assert!(room_subscriptions.contains_key(room_id_0));
1159 assert!(room_subscriptions.contains_key(room_id_1));
1160 assert!(room_subscriptions.contains_key(room_id_2));
1161 }
1162
1163 sliding_sync.expire_session().await;
1165
1166 {
1167 let sticky = sliding_sync.inner.sticky.read().unwrap();
1168 let room_subscriptions = &sticky.data().room_subscriptions;
1169
1170 assert!(room_subscriptions.is_empty());
1171 }
1172
1173 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1175
1176 {
1177 let sticky = sliding_sync.inner.sticky.read().unwrap();
1178 let room_subscriptions = &sticky.data().room_subscriptions;
1179
1180 assert!(room_subscriptions.contains_key(room_id_0).not());
1181 assert!(room_subscriptions.contains_key(room_id_1).not());
1182 assert!(room_subscriptions.contains_key(room_id_2));
1183 }
1184
1185 Ok(())
1186 }
1187
1188 #[async_test]
1189 async fn test_add_list() -> Result<()> {
1190 let (_server, sliding_sync) = new_sliding_sync(vec![
1191 SlidingSyncList::builder("foo")
1192 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1193 ])
1194 .await?;
1195
1196 let _stream = sliding_sync.sync();
1197 pin_mut!(_stream);
1198
1199 sliding_sync
1200 .add_list(
1201 SlidingSyncList::builder("bar")
1202 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1203 )
1204 .await?;
1205
1206 let lists = sliding_sync.inner.lists.read().await;
1207
1208 assert!(lists.contains_key("foo"));
1209 assert!(lists.contains_key("bar"));
1210
1211 Ok(())
1214 }
1215
1216 #[test]
1217 fn test_sticky_parameters_api_invalidated_flow() {
1218 let r0 = room_id!("!r0.matrix.org");
1219 let r1 = room_id!("!r1:matrix.org");
1220
1221 let mut room_subscriptions = BTreeMap::new();
1222 room_subscriptions.insert(r0.to_owned(), Default::default());
1223
1224 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1226 room_subscriptions,
1227 Default::default(),
1228 ));
1229 assert!(sticky.is_invalidated());
1230
1231 let txn_id: &TransactionId = "tid123".into();
1233
1234 let mut request = http::Request::default();
1235 request.txn_id = Some(txn_id.to_string());
1236
1237 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1238
1239 assert!(request.txn_id.is_some());
1240 assert_eq!(request.room_subscriptions.len(), 1);
1241 assert!(request.room_subscriptions.contains_key(r0));
1242
1243 let tid = request.txn_id.unwrap();
1244
1245 sticky.maybe_commit(tid.as_str().into());
1246 assert!(!sticky.is_invalidated());
1247
1248 sticky
1250 .data_mut()
1251 .room_subscriptions
1252 .insert(r1.to_owned(), (Default::default(), Default::default()));
1253 assert!(sticky.is_invalidated());
1254
1255 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1257 assert!(sticky.is_invalidated());
1258
1259 let txn_id1: &TransactionId = "tid456".into();
1261 let mut request1 = http::Request::default();
1262 request1.txn_id = Some(txn_id1.to_string());
1263 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1264
1265 assert!(sticky.is_invalidated());
1266 assert_eq!(request1.room_subscriptions.len(), 1);
1270 assert!(request1.room_subscriptions.contains_key(r1));
1271
1272 let txn_id2: &TransactionId = "tid789".into();
1273 let mut request2 = http::Request::default();
1274 request2.txn_id = Some(txn_id2.to_string());
1275
1276 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1277 assert!(sticky.is_invalidated());
1278 assert_eq!(request2.room_subscriptions.len(), 1);
1281 assert!(request2.room_subscriptions.contains_key(r1));
1282
1283 sticky.maybe_commit(txn_id1);
1286 assert!(sticky.is_invalidated());
1287
1288 sticky.maybe_commit(txn_id2);
1290 assert!(!sticky.is_invalidated());
1291 }
1292
1293 #[test]
1294 fn test_room_subscriptions_are_sticky() {
1295 let r0 = room_id!("!r0.matrix.org");
1296 let r1 = room_id!("!r1:matrix.org");
1297
1298 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1299 BTreeMap::new(),
1300 Default::default(),
1301 ));
1302
1303 {
1305 sticky
1307 .data_mut()
1308 .room_subscriptions
1309 .insert(r0.to_owned(), (Default::default(), Default::default()));
1310
1311 let txn_id: &TransactionId = "tid0".into();
1313 let mut request = http::Request::default();
1314 request.txn_id = Some(txn_id.to_string());
1315
1316 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1317
1318 assert!(request.txn_id.is_some());
1319 assert_eq!(request.room_subscriptions.len(), 1);
1320 assert!(request.room_subscriptions.contains_key(r0));
1321
1322 let tid = request.txn_id.unwrap();
1324
1325 sticky.maybe_commit(tid.as_str().into());
1326 }
1327
1328 {
1330 sticky
1332 .data_mut()
1333 .room_subscriptions
1334 .insert(r1.to_owned(), (Default::default(), Default::default()));
1335
1336 let txn_id: &TransactionId = "tid1".into();
1338 let mut request = http::Request::default();
1339 request.txn_id = Some(txn_id.to_string());
1340
1341 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1342
1343 assert!(request.txn_id.is_some());
1344 assert_eq!(request.room_subscriptions.len(), 1);
1345 assert!(request.room_subscriptions.contains_key(r1));
1347
1348 }
1352
1353 {
1355 let txn_id: &TransactionId = "tid2".into();
1357 let mut request = http::Request::default();
1358 request.txn_id = Some(txn_id.to_string());
1359
1360 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1361
1362 assert!(request.txn_id.is_some());
1363 assert_eq!(request.room_subscriptions.len(), 1);
1364 assert!(request.room_subscriptions.contains_key(r1));
1366
1367 let tid = request.txn_id.unwrap();
1369
1370 sticky.maybe_commit(tid.as_str().into());
1371 }
1372
1373 {
1375 let txn_id: &TransactionId = "tid3".into();
1377 let mut request = http::Request::default();
1378 request.txn_id = Some(txn_id.to_string());
1379
1380 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1381
1382 assert!(request.txn_id.is_some());
1383 assert!(request.room_subscriptions.is_empty());
1385 }
1386 }
1387
1388 #[test]
1389 fn test_extensions_are_sticky() {
1390 let mut extensions = http::request::Extensions::default();
1391 extensions.account_data.enabled = Some(true);
1392
1393 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1395 Default::default(),
1396 extensions,
1397 ));
1398
1399 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1400
1401 let extensions = &sticky.data().extensions;
1404 assert_eq!(extensions.e2ee.enabled, None);
1405 assert_eq!(extensions.to_device.enabled, None);
1406 assert_eq!(extensions.to_device.since, None);
1407
1408 assert_eq!(extensions.account_data.enabled, Some(true));
1410
1411 let txn_id: &TransactionId = "tid123".into();
1412 let mut request = http::Request::default();
1413 request.txn_id = Some(txn_id.to_string());
1414 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1415 assert!(sticky.is_invalidated());
1416 assert_eq!(request.extensions.to_device.enabled, None);
1417 assert_eq!(request.extensions.to_device.since, None);
1418 assert_eq!(request.extensions.e2ee.enabled, None);
1419 assert_eq!(request.extensions.account_data.enabled, Some(true));
1420 }
1421
1422 #[async_test]
1423 async fn test_sticky_extensions_plus_since() -> Result<()> {
1424 let server = MockServer::start().await;
1425 let client = logged_in_client(Some(server.uri())).await;
1426
1427 let sync = client
1428 .sliding_sync("test-slidingsync")?
1429 .add_list(SlidingSyncList::builder("new_list"))
1430 .build()
1431 .await?;
1432
1433 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1435 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1436 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1437
1438 let sync = client
1440 .sliding_sync("test-slidingsync")?
1441 .add_list(SlidingSyncList::builder("new_list"))
1442 .with_to_device_extension(
1443 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1444 )
1445 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1446 .build()
1447 .await?;
1448
1449 let txn_id = TransactionId::new();
1452 let (request, _, _) = sync
1453 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1454 .await?;
1455
1456 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1457 assert_eq!(request.extensions.to_device.enabled, Some(true));
1458 assert!(request.extensions.to_device.since.is_none());
1459
1460 {
1461 let mut sticky = sync.inner.sticky.write().unwrap();
1463 assert!(sticky.is_invalidated());
1464 sticky.maybe_commit(
1465 "hopefully the rng won't generate this very specific transaction id".into(),
1466 );
1467 assert!(sticky.is_invalidated());
1468 }
1469
1470 let txn_id2 = TransactionId::new();
1472 let (request, _, _) = sync
1473 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1474 .await?;
1475
1476 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1477 assert_eq!(request.extensions.to_device.enabled, Some(true));
1478 assert!(request.extensions.to_device.since.is_none());
1479
1480 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1481
1482 {
1483 let mut sticky = sync.inner.sticky.write().unwrap();
1485 assert!(sticky.is_invalidated());
1486 sticky.maybe_commit(txn_id2.as_str().into());
1487 assert!(!sticky.is_invalidated());
1488 }
1489
1490 let txn_id = TransactionId::new();
1492 let (request, _, _) = sync
1493 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1494 .await?;
1495 assert!(request.extensions.e2ee.enabled.is_none());
1496 assert!(request.extensions.to_device.enabled.is_none());
1497 assert!(request.extensions.to_device.since.is_none());
1498
1499 let _since_token = "since";
1503
1504 #[cfg(feature = "e2e-encryption")]
1505 {
1506 use matrix_sdk_base::crypto::store::types::Changes;
1507 if let Some(olm_machine) = &*client.olm_machine().await {
1508 olm_machine
1509 .store()
1510 .save_changes(Changes {
1511 next_batch_token: Some(_since_token.to_owned()),
1512 ..Default::default()
1513 })
1514 .await?;
1515 }
1516 }
1517
1518 let txn_id = TransactionId::new();
1519 let (request, _, _) = sync
1520 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1521 .await?;
1522
1523 assert!(request.extensions.e2ee.enabled.is_none());
1524 assert!(request.extensions.to_device.enabled.is_none());
1525
1526 #[cfg(feature = "e2e-encryption")]
1527 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1528
1529 Ok(())
1530 }
1531
1532 #[async_test]
1538 #[cfg(feature = "e2e-encryption")]
1539 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1540 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1541 use matrix_sdk_test::ruma_response_from_json;
1542 use ruma::user_id;
1543
1544 let server = MockServer::start().await;
1545 let client = logged_in_client(Some(server.uri())).await;
1546
1547 let alice = user_id!("@alice:localhost");
1548 let bob = user_id!("@bob:localhost");
1549 let me = user_id!("@example:localhost");
1550
1551 {
1554 let olm_machine = client.olm_machine().await;
1555 let olm_machine = olm_machine.as_ref().unwrap();
1556
1557 olm_machine.update_tracked_users([alice, bob]).await?;
1558
1559 let outgoing_requests = olm_machine.outgoing_requests().await?;
1561
1562 assert_eq!(outgoing_requests.len(), 2);
1563 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1564 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1565
1566 olm_machine
1568 .mark_request_as_sent(
1569 outgoing_requests[0].request_id(),
1570 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1571 "one_time_key_counts": {}
1572 }))),
1573 )
1574 .await?;
1575
1576 olm_machine
1577 .mark_request_as_sent(
1578 outgoing_requests[1].request_id(),
1579 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1580 "device_keys": {
1581 alice: {},
1582 bob: {},
1583 }
1584 }))),
1585 )
1586 .await?;
1587
1588 let outgoing_requests = olm_machine.outgoing_requests().await?;
1590
1591 assert_eq!(outgoing_requests.len(), 1);
1592 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1593
1594 olm_machine
1595 .mark_request_as_sent(
1596 outgoing_requests[0].request_id(),
1597 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1598 "device_keys": {
1599 me: {},
1600 }
1601 }))),
1602 )
1603 .await?;
1604
1605 let outgoing_requests = olm_machine.outgoing_requests().await?;
1607
1608 assert!(outgoing_requests.is_empty());
1609 }
1610
1611 let sync = client
1612 .sliding_sync("test-slidingsync")?
1613 .add_list(SlidingSyncList::builder("new_list"))
1614 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1615 .build()
1616 .await?;
1617
1618 let txn_id = TransactionId::new();
1620 let (_request, _, _) = sync
1621 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1622 .await?;
1623
1624 {
1626 let olm_machine = client.olm_machine().await;
1627 let olm_machine = olm_machine.as_ref().unwrap();
1628
1629 let outgoing_requests = olm_machine.outgoing_requests().await?;
1631
1632 assert_eq!(outgoing_requests.len(), 1);
1633 assert_matches!(
1634 outgoing_requests[0].request(),
1635 AnyOutgoingRequest::KeysQuery(request) => {
1636 assert!(request.device_keys.contains_key(alice));
1637 assert!(request.device_keys.contains_key(bob));
1638 assert!(request.device_keys.contains_key(me));
1639 }
1640 );
1641
1642 olm_machine
1644 .mark_request_as_sent(
1645 outgoing_requests[0].request_id(),
1646 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1647 "device_keys": {
1648 alice: {},
1649 bob: {},
1650 me: {},
1651 }
1652 }))),
1653 )
1654 .await?;
1655 }
1656
1657 sync.set_pos("chocolat".to_owned()).await;
1659
1660 let txn_id = TransactionId::new();
1661 let (_request, _, _) = sync
1662 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1663 .await?;
1664
1665 {
1667 let olm_machine = client.olm_machine().await;
1668 let olm_machine = olm_machine.as_ref().unwrap();
1669
1670 let outgoing_requests = olm_machine.outgoing_requests().await?;
1672
1673 assert!(outgoing_requests.is_empty());
1674 }
1675
1676 Ok(())
1677 }
1678
1679 #[async_test]
1680 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1681 let server = MockServer::start().await;
1682 let client = logged_in_client(Some(server.uri())).await;
1683
1684 let sliding_sync = client
1685 .sliding_sync("test-slidingsync")?
1686 .with_to_device_extension(
1687 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1688 )
1689 .build()
1690 .await?;
1691
1692 let (request, _, _) =
1694 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1695 assert!(request.extensions.to_device.enabled.is_some());
1696
1697 let sync = sliding_sync.sync();
1698 pin_mut!(sync);
1699
1700 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1702
1703 #[derive(Deserialize)]
1704 struct PartialRequest {
1705 txn_id: Option<String>,
1706 }
1707
1708 {
1709 let _mock_guard = Mock::given(SlidingSyncMatcher)
1710 .respond_with(|request: &Request| {
1711 let request: PartialRequest = request.body_json().unwrap();
1713
1714 ResponseTemplate::new(200).set_body_json(json!({
1715 "txn_id": request.txn_id,
1716 "pos": "0",
1717 }))
1718 })
1719 .mount_as_scoped(&server)
1720 .await;
1721
1722 let next = sync.next().await;
1723 assert_matches!(next, Some(Ok(_update_summary)));
1724
1725 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1727 }
1728
1729 let (request, _, _) =
1731 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1732 assert!(request.extensions.to_device.enabled.is_none());
1733
1734 {
1736 let _mock_guard = Mock::given(SlidingSyncMatcher)
1737 .respond_with(|request: &Request| {
1738 let request: PartialRequest = request.body_json().unwrap();
1740
1741 ResponseTemplate::new(200).set_body_json(json!({
1742 "txn_id": request.txn_id,
1743 "pos": "1",
1744 }))
1745 })
1746 .mount_as_scoped(&server)
1747 .await;
1748
1749 let next = sync.next().await;
1750 assert_matches!(next, Some(Ok(_update_summary)));
1751
1752 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1754 }
1755
1756 {
1759 let _mock_guard = Mock::given(SlidingSyncMatcher)
1760 .respond_with(|request: &Request| {
1761 let request: PartialRequest = request.body_json().unwrap();
1763
1764 ResponseTemplate::new(200).set_body_json(json!({
1765 "txn_id": request.txn_id,
1766 "pos": "0", }))
1768 })
1769 .up_to_n_times(1) .mount_as_scoped(&server)
1771 .await;
1772
1773 let next = sync.next().await;
1774 assert_matches!(next, Some(Ok(_update_summary)));
1775
1776 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1778 }
1779
1780 {
1785 let _mock_guard = Mock::given(SlidingSyncMatcher)
1786 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1787 "error": "foo",
1788 "errcode": "M_UNKNOWN_POS",
1789 })))
1790 .mount_as_scoped(&server)
1791 .await;
1792
1793 let next = sync.next().await;
1794
1795 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1797
1798 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1800
1801 let (request, _, _) =
1803 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1804
1805 assert!(request.extensions.to_device.enabled.is_some());
1806
1807 assert!(sync.next().await.is_none());
1809 }
1810
1811 Ok(())
1812 }
1813
1814 #[cfg(feature = "e2e-encryption")]
1815 #[async_test]
1816 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1817 let server = MockServer::start().await;
1818
1819 #[derive(Deserialize)]
1820 struct PartialRequest {
1821 txn_id: Option<String>,
1822 }
1823
1824 let server_pos = Arc::new(Mutex::new(0));
1825 let _mock_guard = Mock::given(SlidingSyncMatcher)
1826 .respond_with(move |request: &Request| {
1827 let request: PartialRequest = request.body_json().unwrap();
1829 let pos = {
1830 let mut pos = server_pos.lock().unwrap();
1831 let prev = *pos;
1832 *pos += 1;
1833 prev
1834 };
1835
1836 ResponseTemplate::new(200).set_body_json(json!({
1837 "txn_id": request.txn_id,
1838 "pos": pos.to_string(),
1839 }))
1840 })
1841 .mount_as_scoped(&server)
1842 .await;
1843
1844 let client = logged_in_client(Some(server.uri())).await;
1845
1846 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1847
1848 {
1850 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1851
1852 let (request, _, _) =
1853 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1854 assert!(request.pos.is_none());
1855 }
1856
1857 let sync = sliding_sync.sync();
1858 pin_mut!(sync);
1859
1860 let next = sync.next().await;
1863 assert_matches!(next, Some(Ok(_update_summary)));
1864
1865 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1866
1867 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1868 .await?
1869 .expect("must have restored fields");
1870
1871 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1874
1875 {
1879 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1880
1881 let mut position_guard = other_sync.inner.position.lock().await;
1882 position_guard.pos = Some("yolo".to_owned());
1883
1884 other_sync.cache_to_storage(&position_guard).await?;
1885 }
1886
1887 {
1889 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1890 let (request, _, _) =
1891 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1892 assert_eq!(request.pos.as_deref(), Some("0"));
1893 }
1894
1895 {
1898 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1899 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1900 }
1901
1902 Ok(())
1903 }
1904
1905 #[cfg(feature = "e2e-encryption")]
1906 #[async_test]
1907 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1908 let server = MockServer::start().await;
1909
1910 #[derive(Deserialize)]
1911 struct PartialRequest {
1912 txn_id: Option<String>,
1913 }
1914
1915 let server_pos = Arc::new(Mutex::new(0));
1916 let _mock_guard = Mock::given(SlidingSyncMatcher)
1917 .respond_with(move |request: &Request| {
1918 let request: PartialRequest = request.body_json().unwrap();
1920 let pos = {
1921 let mut pos = server_pos.lock().unwrap();
1922 let prev = *pos;
1923 *pos += 1;
1924 prev
1925 };
1926
1927 ResponseTemplate::new(200).set_body_json(json!({
1928 "txn_id": request.txn_id,
1929 "pos": pos.to_string(),
1930 }))
1931 })
1932 .mount_as_scoped(&server)
1933 .await;
1934
1935 let client = logged_in_client(Some(server.uri())).await;
1936
1937 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1938
1939 {
1941 let (request, _, _) =
1942 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1943
1944 assert!(request.pos.is_none());
1945 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1946 }
1947
1948 let sync = sliding_sync.sync();
1949 pin_mut!(sync);
1950
1951 let next = sync.next().await;
1954 assert_matches!(next, Some(Ok(_update_summary)));
1955
1956 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1957
1958 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1959 .await?
1960 .expect("must have restored fields");
1961
1962 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1965
1966 {
1968 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1969
1970 let mut position_guard = other_sync.inner.position.lock().await;
1971 position_guard.pos = Some("42".to_owned());
1972
1973 other_sync.cache_to_storage(&position_guard).await?;
1974 }
1975
1976 {
1978 let (request, _, _) =
1979 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1980 assert_eq!(request.pos.as_deref(), Some("42"));
1981 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1982 }
1983
1984 {
1986 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1987 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1988
1989 let (request, _, _) =
1990 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1991 assert_eq!(request.pos.as_deref(), Some("42"));
1992 }
1993
1994 sliding_sync.expire_session().await;
1997
1998 {
1999 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2000
2001 let (request, _, _) =
2002 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2003 assert!(request.pos.is_none());
2004 }
2005
2006 {
2008 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2009 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2010
2011 let (request, _, _) =
2012 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2013 assert!(request.pos.is_none());
2014 }
2015
2016 Ok(())
2017 }
2018
2019 #[async_test]
2020 async fn test_stop_sync_loop() -> Result<()> {
2021 let (_server, sliding_sync) = new_sliding_sync(vec![
2022 SlidingSyncList::builder("foo")
2023 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2024 ])
2025 .await?;
2026
2027 let stream = sliding_sync.sync();
2029 pin_mut!(stream);
2030
2031 assert!(stream.next().await.is_some());
2033
2034 sliding_sync.stop_sync()?;
2036
2037 assert!(stream.next().await.is_none());
2039
2040 let stream = sliding_sync.sync();
2042 pin_mut!(stream);
2043
2044 assert!(stream.next().await.is_some());
2046
2047 Ok(())
2048 }
2049
2050 #[async_test]
2051 async fn test_process_read_receipts() -> Result<()> {
2052 let room = owned_room_id!("!pony:example.org");
2053
2054 let server = MockServer::start().await;
2055 let client = logged_in_client(Some(server.uri())).await;
2056 client.event_cache().subscribe().unwrap();
2057
2058 let sliding_sync = client
2059 .sliding_sync("test")?
2060 .with_receipt_extension(
2061 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2062 )
2063 .add_list(
2064 SlidingSyncList::builder("all")
2065 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2066 )
2067 .build()
2068 .await?;
2069
2070 {
2072 let server_response = assign!(http::Response::new("0".to_owned()), {
2073 rooms: BTreeMap::from([(
2074 room.clone(),
2075 http::response::Room::default(),
2076 )])
2077 });
2078
2079 let _summary = {
2080 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2081 sliding_sync
2082 .handle_response(
2083 server_response.clone(),
2084 &mut pos_guard,
2085 RequestedRequiredStates::default(),
2086 )
2087 .await?
2088 };
2089 }
2090
2091 let server_response = assign!(http::Response::new("1".to_owned()), {
2092 extensions: assign!(http::response::Extensions::default(), {
2093 receipts: assign!(http::response::Receipts::default(), {
2094 rooms: BTreeMap::from([
2095 (
2096 room.clone(),
2097 Raw::from_json_string(
2098 json!({
2099 "room_id": room,
2100 "type": "m.receipt",
2101 "content": {
2102 "$event:bar.org": {
2103 "m.read": {
2104 client.user_id().unwrap(): {
2105 "ts": 1436451550,
2106 }
2107 }
2108 }
2109 }
2110 })
2111 .to_string(),
2112 ).unwrap()
2113 )
2114 ])
2115 })
2116 })
2117 });
2118
2119 let summary = {
2120 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2121 sliding_sync
2122 .handle_response(
2123 server_response.clone(),
2124 &mut pos_guard,
2125 RequestedRequiredStates::default(),
2126 )
2127 .await?
2128 };
2129
2130 assert!(summary.rooms.contains(&room));
2131
2132 Ok(())
2133 }
2134
2135 #[async_test]
2136 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2137 let room_id = owned_room_id!("!unicorn:example.org");
2138
2139 let server = MockServer::start().await;
2140 let client = logged_in_client(Some(server.uri())).await;
2141
2142 let sliding_sync = client
2145 .sliding_sync("test")?
2146 .with_account_data_extension(
2147 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2148 )
2149 .add_list(
2150 SlidingSyncList::builder("all")
2151 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2152 )
2153 .build()
2154 .await?;
2155
2156 {
2158 let server_response = assign!(http::Response::new("0".to_owned()), {
2159 rooms: BTreeMap::from([(
2160 room_id.clone(),
2161 http::response::Room::default(),
2162 )])
2163 });
2164
2165 let _summary = {
2166 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2167 sliding_sync
2168 .handle_response(
2169 server_response.clone(),
2170 &mut pos_guard,
2171 RequestedRequiredStates::default(),
2172 )
2173 .await?
2174 };
2175 }
2176
2177 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2181
2182 let update_summary = {
2183 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2184 sliding_sync
2185 .handle_response(
2186 server_response.clone(),
2187 &mut pos_guard,
2188 RequestedRequiredStates::default(),
2189 )
2190 .await?
2191 };
2192
2193 assert!(update_summary.rooms.contains(&room_id));
2196
2197 let room = client.get_room(&room_id).unwrap();
2198
2199 assert!(room.is_marked_unread());
2202
2203 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2206
2207 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2208 sliding_sync
2209 .handle_response(
2210 server_response.clone(),
2211 &mut pos_guard,
2212 RequestedRequiredStates::default(),
2213 )
2214 .await?;
2215
2216 let room = client.get_room(&room_id).unwrap();
2217
2218 assert!(!room.is_marked_unread());
2219
2220 Ok(())
2221 }
2222
2223 fn make_mark_unread_response(
2224 response_number: &str,
2225 room_id: OwnedRoomId,
2226 unread: bool,
2227 add_rooms_section: bool,
2228 ) -> http::Response {
2229 let rooms = if add_rooms_section {
2230 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2231 } else {
2232 BTreeMap::new()
2233 };
2234
2235 let extensions = assign!(http::response::Extensions::default(), {
2236 account_data: assign!(http::response::AccountData::default(), {
2237 rooms: BTreeMap::from([
2238 (
2239 room_id,
2240 vec![
2241 Raw::from_json_string(
2242 json!({
2243 "content": {
2244 "unread": unread
2245 },
2246 "type": "m.marked_unread"
2247 })
2248 .to_string(),
2249 ).unwrap()
2250 ]
2251 )
2252 ])
2253 })
2254 });
2255
2256 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2257 }
2258
2259 #[async_test]
2260 async fn test_process_rooms_account_data() -> Result<()> {
2261 let room = owned_room_id!("!pony:example.org");
2262
2263 let server = MockServer::start().await;
2264 let client = logged_in_client(Some(server.uri())).await;
2265
2266 let sliding_sync = client
2267 .sliding_sync("test")?
2268 .with_account_data_extension(
2269 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2270 )
2271 .add_list(
2272 SlidingSyncList::builder("all")
2273 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2274 )
2275 .build()
2276 .await?;
2277
2278 {
2280 let server_response = assign!(http::Response::new("0".to_owned()), {
2281 rooms: BTreeMap::from([(
2282 room.clone(),
2283 http::response::Room::default(),
2284 )])
2285 });
2286
2287 let _summary = {
2288 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2289 sliding_sync
2290 .handle_response(
2291 server_response.clone(),
2292 &mut pos_guard,
2293 RequestedRequiredStates::default(),
2294 )
2295 .await?
2296 };
2297 }
2298
2299 let server_response = assign!(http::Response::new("1".to_owned()), {
2300 extensions: assign!(http::response::Extensions::default(), {
2301 account_data: assign!(http::response::AccountData::default(), {
2302 rooms: BTreeMap::from([
2303 (
2304 room.clone(),
2305 vec![
2306 Raw::from_json_string(
2307 json!({
2308 "content": {
2309 "tags": {
2310 "u.work": {
2311 "order": 0.9
2312 }
2313 }
2314 },
2315 "type": "m.tag"
2316 })
2317 .to_string(),
2318 ).unwrap()
2319 ]
2320 )
2321 ])
2322 })
2323 })
2324 });
2325 let summary = {
2326 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2327 sliding_sync
2328 .handle_response(
2329 server_response.clone(),
2330 &mut pos_guard,
2331 RequestedRequiredStates::default(),
2332 )
2333 .await?
2334 };
2335
2336 assert!(summary.rooms.contains(&room));
2337
2338 Ok(())
2339 }
2340
2341 #[async_test]
2342 #[cfg(feature = "e2e-encryption")]
2343 async fn test_process_only_encryption_events() -> Result<()> {
2344 use ruma::OneTimeKeyAlgorithm;
2345
2346 let room = owned_room_id!("!croissant:example.org");
2347
2348 let server = MockServer::start().await;
2349 let client = logged_in_client(Some(server.uri())).await;
2350
2351 let server_response = assign!(http::Response::new("0".to_owned()), {
2352 rooms: BTreeMap::from([(
2353 room.clone(),
2354 assign!(http::response::Room::default(), {
2355 name: Some("Croissants lovers".to_owned()),
2356 timeline: Vec::new(),
2357 }),
2358 )]),
2359
2360 extensions: assign!(http::response::Extensions::default(), {
2361 e2ee: assign!(http::response::E2EE::default(), {
2362 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2363 }),
2364 to_device: Some(assign!(http::response::ToDevice::default(), {
2365 next_batch: "to-device-token".to_owned(),
2366 })),
2367 })
2368 });
2369
2370 let sliding_sync = client
2374 .sliding_sync("test")?
2375 .with_to_device_extension(
2376 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2377 )
2378 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2379 .build()
2380 .await?;
2381
2382 {
2383 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2384
2385 sliding_sync
2386 .handle_response(
2387 server_response.clone(),
2388 &mut position_guard,
2389 RequestedRequiredStates::default(),
2390 )
2391 .await?;
2392 }
2393
2394 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2396 assert_eq!(uploaded_key_count, 42);
2397
2398 {
2399 let olm_machine = &*client.olm_machine_for_testing().await;
2400 assert_eq!(
2401 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2402 Some("to-device-token")
2403 );
2404 }
2405
2406 assert!(client.get_room(&room).is_none());
2408
2409 let client = logged_in_client(Some(server.uri())).await;
2412
2413 let sliding_sync = client
2414 .sliding_sync("test")?
2415 .add_list(SlidingSyncList::builder("thelist"))
2416 .build()
2417 .await?;
2418
2419 {
2420 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2421
2422 sliding_sync
2423 .handle_response(
2424 server_response.clone(),
2425 &mut position_guard,
2426 RequestedRequiredStates::default(),
2427 )
2428 .await?;
2429 }
2430
2431 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2433 assert_eq!(uploaded_key_count, 0);
2434
2435 {
2436 let olm_machine = &*client.olm_machine_for_testing().await;
2437 assert_eq!(
2438 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2439 None
2440 );
2441 }
2442
2443 assert!(client.get_room(&room).is_some());
2445
2446 let client = logged_in_client(Some(server.uri())).await;
2448
2449 let sliding_sync = client
2450 .sliding_sync("test")?
2451 .add_list(SlidingSyncList::builder("thelist"))
2452 .with_to_device_extension(
2453 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2454 )
2455 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2456 .build()
2457 .await?;
2458
2459 {
2460 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2461
2462 sliding_sync
2463 .handle_response(
2464 server_response.clone(),
2465 &mut position_guard,
2466 RequestedRequiredStates::default(),
2467 )
2468 .await?;
2469 }
2470
2471 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2473 assert_eq!(uploaded_key_count, 42);
2474
2475 {
2476 let olm_machine = &*client.olm_machine_for_testing().await;
2477 assert_eq!(
2478 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2479 Some("to-device-token")
2480 );
2481 }
2482
2483 assert!(client.get_room(&room).is_some());
2485
2486 Ok(())
2487 }
2488
2489 #[async_test]
2490 async fn test_lock_multiple_requests() -> Result<()> {
2491 let server = MockServer::start().await;
2492 let client = logged_in_client(Some(server.uri())).await;
2493
2494 let pos = Arc::new(Mutex::new(0));
2495 let _mock_guard = Mock::given(SlidingSyncMatcher)
2496 .respond_with(move |_: &Request| {
2497 let mut pos = pos.lock().unwrap();
2498 *pos += 1;
2499 ResponseTemplate::new(200).set_body_json(json!({
2500 "pos": pos.to_string(),
2501 "lists": {},
2502 "rooms": {}
2503 }))
2504 })
2505 .mount_as_scoped(&server)
2506 .await;
2507
2508 let sliding_sync = client
2509 .sliding_sync("test")?
2510 .with_to_device_extension(
2511 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2512 )
2513 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2514 .build()
2515 .await?;
2516
2517 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2520
2521 for result in requests.await {
2522 result?;
2523 }
2524
2525 Ok(())
2526 }
2527
2528 #[async_test]
2529 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2530 let server = MockServer::start().await;
2531 let client = logged_in_client(Some(server.uri())).await;
2532
2533 let pos = Arc::new(Mutex::new(0));
2534 let _mock_guard = Mock::given(SlidingSyncMatcher)
2535 .respond_with(move |_: &Request| {
2536 let mut pos = pos.lock().unwrap();
2537 *pos += 1;
2538 ResponseTemplate::new(200)
2540 .set_body_json(json!({
2541 "pos": pos.to_string(),
2542 "lists": {},
2543 "rooms": {}
2544 }))
2545 .set_delay(Duration::from_secs(2))
2546 })
2547 .mount_as_scoped(&server)
2548 .await;
2549
2550 let sliding_sync =
2551 client
2552 .sliding_sync("test")?
2553 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2554 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2555 ))
2556 .add_list(
2557 SlidingSyncList::builder("another-list")
2558 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2559 )
2560 .build()
2561 .await?;
2562
2563 let stream = sliding_sync.sync();
2564 pin_mut!(stream);
2565
2566 let cloned_sync = sliding_sync.clone();
2567 spawn(async move {
2568 tokio::time::sleep(Duration::from_millis(100)).await;
2569
2570 cloned_sync
2571 .on_list("another-list", |list| {
2572 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2573 ready(())
2574 })
2575 .await;
2576 });
2577
2578 assert_matches!(stream.next().await, Some(Ok(_)));
2579
2580 sliding_sync.stop_sync().unwrap();
2581
2582 assert_matches!(stream.next().await, None);
2583
2584 let mut num_requests = 0;
2585
2586 for request in server.received_requests().await.unwrap() {
2587 if !SlidingSyncMatcher.matches(&request) {
2588 continue;
2589 }
2590
2591 let another_list_ranges = if num_requests == 0 {
2592 json!([[0, 10]])
2594 } else {
2595 json!([[10, 20]])
2597 };
2598
2599 num_requests += 1;
2600 assert!(num_requests <= 2, "more than one request hit the server");
2601
2602 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2603
2604 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2605 &json_value,
2606 &json!({
2607 "conn_id": "test",
2608 "lists": {
2609 "room-list": {
2610 "ranges": [[0, 9]],
2611 "required_state": [
2612 ["m.room.encryption", ""],
2613 ["m.room.tombstone", ""]
2614 ],
2615 },
2616 "another-list": {
2617 "ranges": another_list_ranges,
2618 "required_state": [
2619 ["m.room.encryption", ""],
2620 ["m.room.tombstone", ""]
2621 ],
2622 },
2623 }
2624 }),
2625 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2626 ) {
2627 dbg!(json_value);
2628 panic!("json differ: {err}");
2629 }
2630 }
2631
2632 assert_eq!(num_requests, 2);
2633
2634 Ok(())
2635 }
2636
2637 #[async_test]
2638 async fn test_timeout_zero_list() -> Result<()> {
2639 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2640
2641 let (request, _, _) =
2642 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2643
2644 assert!(request.timeout.is_some());
2647
2648 Ok(())
2649 }
2650
2651 #[async_test]
2652 async fn test_timeout_one_list() -> Result<()> {
2653 let (_server, sliding_sync) = new_sliding_sync(vec![
2654 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2655 ])
2656 .await?;
2657
2658 let (request, _, _) =
2659 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2660
2661 assert!(request.timeout.is_none());
2663
2664 {
2666 let server_response = assign!(http::Response::new("0".to_owned()), {
2667 lists: BTreeMap::from([(
2668 "foo".to_owned(),
2669 assign!(http::response::List::default(), {
2670 count: uint!(7),
2671 })
2672 )])
2673 });
2674
2675 let _summary = {
2676 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2677 sliding_sync
2678 .handle_response(
2679 server_response.clone(),
2680 &mut pos_guard,
2681 RequestedRequiredStates::default(),
2682 )
2683 .await?
2684 };
2685 }
2686
2687 let (request, _, _) =
2688 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2689
2690 assert!(request.timeout.is_some());
2692
2693 Ok(())
2694 }
2695
2696 #[async_test]
2697 async fn test_timeout_three_lists() -> Result<()> {
2698 let (_server, sliding_sync) = new_sliding_sync(vec![
2699 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2700 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2701 SlidingSyncList::builder("baz")
2702 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2703 ])
2704 .await?;
2705
2706 let (request, _, _) =
2707 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2708
2709 assert!(request.timeout.is_none());
2711
2712 {
2714 let server_response = assign!(http::Response::new("0".to_owned()), {
2715 lists: BTreeMap::from([(
2716 "foo".to_owned(),
2717 assign!(http::response::List::default(), {
2718 count: uint!(7),
2719 })
2720 )])
2721 });
2722
2723 let _summary = {
2724 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2725 sliding_sync
2726 .handle_response(
2727 server_response.clone(),
2728 &mut pos_guard,
2729 RequestedRequiredStates::default(),
2730 )
2731 .await?
2732 };
2733 }
2734
2735 let (request, _, _) =
2736 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2737
2738 assert!(request.timeout.is_none());
2740
2741 {
2743 let server_response = assign!(http::Response::new("1".to_owned()), {
2744 lists: BTreeMap::from([(
2745 "bar".to_owned(),
2746 assign!(http::response::List::default(), {
2747 count: uint!(7),
2748 })
2749 )])
2750 });
2751
2752 let _summary = {
2753 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2754 sliding_sync
2755 .handle_response(
2756 server_response.clone(),
2757 &mut pos_guard,
2758 RequestedRequiredStates::default(),
2759 )
2760 .await?
2761 };
2762 }
2763
2764 let (request, _, _) =
2765 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2766
2767 assert!(request.timeout.is_some());
2769
2770 Ok(())
2771 }
2772
2773 #[async_test]
2774 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2775 let server = MockServer::start().await;
2776 let client = logged_in_client(Some(server.uri())).await;
2777
2778 let _mock_guard = Mock::given(SlidingSyncMatcher)
2779 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2780 "pos": "0",
2781 "lists": {},
2782 "rooms": {}
2783 })))
2784 .mount_as_scoped(&server)
2785 .await;
2786
2787 let sliding_sync = client
2788 .sliding_sync("test")?
2789 .with_to_device_extension(
2790 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2791 )
2792 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2793 .build()
2794 .await?;
2795
2796 let sliding_sync = Arc::new(sliding_sync);
2797
2798 let sync_beat_listener = client.inner.sync_beat.listen();
2800 sliding_sync.sync_once().await?;
2801
2802 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2804 Ok(())
2805 }
2806
2807 #[async_test]
2808 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2809 let server = MockServer::start().await;
2810 let client = logged_in_client(Some(server.uri())).await;
2811
2812 let _mock_guard = Mock::given(SlidingSyncMatcher)
2813 .respond_with(ResponseTemplate::new(404))
2814 .mount_as_scoped(&server)
2815 .await;
2816
2817 let sliding_sync = client
2818 .sliding_sync("test")?
2819 .with_to_device_extension(
2820 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2821 )
2822 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2823 .build()
2824 .await?;
2825
2826 let sliding_sync = Arc::new(sliding_sync);
2827
2828 let sync_beat_listener = client.inner.sync_beat.listen();
2830 let sync_result = sliding_sync.sync_once().await;
2831 assert!(sync_result.is_err());
2832
2833 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2835
2836 Ok(())
2837 }
2838
2839 #[async_test]
2840 async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2841 let server = MatrixMockServer::new().await;
2842 let client = server.client_builder().build().await;
2843 let room_id = room_id!("!mu5hr00m:example.org");
2844
2845 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2846 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2847 "pos": "0",
2848 "lists": {},
2849 "extensions": {
2850 "account_data": {
2851 "global": [
2852 {
2853 "type": "m.direct",
2854 "content": {
2855 "@de4dlockh0lmes:example.org": [
2856 "!mu5hr00m:example.org"
2857 ]
2858 }
2859 }
2860 ]
2861 }
2862 },
2863 "rooms": {
2864 room_id: {
2865 "name": "Mario Bros Fanbase Room",
2866 "initial": true,
2867 },
2868 }
2869 })))
2870 .mount_as_scoped(server.server())
2871 .await;
2872
2873 let f = EventFactory::new().room(room_id);
2874
2875 Mock::given(method("GET"))
2876 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2877 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2878 "chunk": [
2879 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2880 ]
2881 })))
2882 .mount(server.server())
2883 .await;
2884
2885 let (tx, rx) = tokio::sync::oneshot::channel();
2886
2887 let tx = Arc::new(Mutex::new(Some(tx)));
2888 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2889 let members =
2891 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2892 assert_eq!(members.len(), 1);
2893 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2894 });
2895
2896 let sliding_sync = client
2897 .sliding_sync("test")?
2898 .add_list(SlidingSyncList::builder("thelist"))
2899 .with_account_data_extension(
2900 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2901 )
2902 .build()
2903 .await?;
2904
2905 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2906 .await
2907 .expect("Sync did not complete in time")
2908 .expect("Sync failed");
2909
2910 tokio::time::timeout(Duration::from_secs(5), rx)
2912 .await
2913 .expect("Event handler did not complete in time")
2914 .expect("Event handler failed");
2915
2916 Ok(())
2917 }
2918}