1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26 collections::{BTreeMap, btree_map::Entry},
27 fmt::Debug,
28 future::Future,
29 sync::{Arc, RwLock as StdRwLock},
30 time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41 OwnedRoomId, RoomId,
42 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
43 assign,
44};
45use tokio::{
46 select,
47 sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
48};
49use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53 cache::restore_sliding_sync_state,
54 client::SlidingSyncResponseProcessor,
55 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{Client, Result, config::RequestConfig};
58
59#[derive(Clone, Debug)]
63pub struct SlidingSync {
64 inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70 id: String,
74
75 client: Client,
77
78 poll_timeout: Duration,
80
81 network_timeout: Duration,
84
85 storage_key: String,
87
88 share_pos: bool,
95
96 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113 sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 pub async fn has_pos(&self) -> bool {
129 self.inner.position.lock().await.pos.is_some()
130 }
131
132 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
133 cache::store_sliding_sync_state(self, position).await
134 }
135
136 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
138 SlidingSyncBuilder::new(id, client)
139 }
140
141 pub fn subscribe_to_rooms(
148 &self,
149 room_ids: &[&RoomId],
150 settings: Option<http::request::RoomSubscription>,
151 cancel_in_flight_request: bool,
152 ) {
153 let settings = settings.unwrap_or_default();
154 let mut sticky = self.inner.sticky.write().unwrap();
155 let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
156
157 let mut skip_over_current_sync_loop_iteration = false;
158
159 for room_id in room_ids {
160 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
167 if let Some(room) = self.inner.client.get_room(room_id) {
168 room.mark_members_missing();
169 }
170
171 entry.insert((RoomSubscriptionState::default(), settings.clone()));
172
173 skip_over_current_sync_loop_iteration = true;
174 }
175 }
176
177 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
178 self.inner.internal_channel_send_if_possible(
179 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
180 );
181 }
182 }
183
184 pub async fn on_list<Function, FunctionOutput, R>(
186 &self,
187 list_name: &str,
188 function: Function,
189 ) -> Option<R>
190 where
191 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
192 FunctionOutput: Future<Output = R>,
193 {
194 let lists = self.inner.lists.read().await;
195
196 match lists.get(list_name) {
197 Some(list) => Some(function(list).await),
198 None => None,
199 }
200 }
201
202 pub async fn add_list(
208 &self,
209 list_builder: SlidingSyncListBuilder,
210 ) -> Result<Option<SlidingSyncList>> {
211 let list = list_builder.build(self.inner.internal_channel.clone());
212
213 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
214
215 self.inner.internal_channel_send_if_possible(
216 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
217 );
218
219 Ok(old_list)
220 }
221
222 pub async fn add_cached_list(
229 &self,
230 mut list_builder: SlidingSyncListBuilder,
231 ) -> Result<Option<SlidingSyncList>> {
232 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
233
234 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
235
236 self.add_list(list_builder).await
237 }
238
239 #[instrument(skip_all)]
241 async fn handle_response(
242 &self,
243 mut sliding_sync_response: http::Response,
244 position: &mut SlidingSyncPositionMarkers,
245 requested_required_states: RequestedRequiredStates,
246 ) -> Result<UpdateSummary, crate::Error> {
247 let pos = Some(sliding_sync_response.pos.clone());
248
249 let must_process_rooms_response = self.must_process_rooms_response().await;
250
251 trace!(yes = must_process_rooms_response, "Must process rooms response?");
252
253 let sync_response = {
261 let _timer = timer!("response processor");
262
263 let response_processor = {
264 let _sync_lock = {
267 let _timer = timer!("acquiring the `sync_lock`");
268
269 self.inner.client.base_client().sync_lock().lock().await
270 };
271
272 let mut response_processor =
273 SlidingSyncResponseProcessor::new(self.inner.client.clone());
274
275 if self.is_thread_subscriptions_enabled() {
281 response_processor
282 .handle_thread_subscriptions(
283 position.pos.as_deref(),
284 std::mem::take(
285 &mut sliding_sync_response.extensions.thread_subscriptions,
286 ),
287 )
288 .await?;
289 }
290
291 #[cfg(feature = "e2e-encryption")]
292 if self.is_e2ee_enabled() {
293 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
294 }
295
296 if must_process_rooms_response {
299 response_processor
300 .handle_room_response(&sliding_sync_response, &requested_required_states)
301 .await?;
302 }
303
304 response_processor
305 };
306
307 response_processor.process_and_take_response().await?
309 };
310
311 debug!("Sliding Sync response has been handled by the client");
312 trace!(?sync_response);
313
314 if let Some(ref txn_id) = sliding_sync_response.txn_id {
316 let txn_id = txn_id.as_str().into();
317 self.inner.sticky.write().unwrap().maybe_commit(txn_id);
318 let mut lists = self.inner.lists.write().await;
319 lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
320 }
321
322 let update_summary = {
323 let updated_rooms = {
325 let mut updated_rooms = Vec::with_capacity(
326 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
327 );
328
329 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
330
331 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
339
340 updated_rooms
341 };
342
343 let updated_lists = {
345 debug!(
346 lists = ?sliding_sync_response.lists,
347 "Update lists"
348 );
349
350 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
351 let mut lists = self.inner.lists.write().await;
352
353 for (name, list) in lists.iter_mut() {
356 if let Some(updates) = sliding_sync_response.lists.get(name) {
357 let maximum_number_of_rooms: u32 =
358 updates.count.try_into().expect("failed to convert `count` to `u32`");
359
360 if list.update(Some(maximum_number_of_rooms))? {
361 updated_lists.push(name.clone());
362 }
363 } else if list.update(None)? {
364 updated_lists.push(name.clone());
365 }
366 }
367
368 for name in sliding_sync_response.lists.keys() {
370 if !lists.contains_key(name) {
371 error!("Response for list `{name}` - unknown to us; skipping");
372 }
373 }
374
375 updated_lists
376 };
377
378 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
379 };
380
381 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
385
386 position.pos = pos;
387
388 Ok(update_summary)
389 }
390
391 #[instrument(skip_all)]
392 async fn generate_sync_request(
393 &self,
394 txn_id: &mut LazyTransactionId,
395 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
396 let mut requests_lists = BTreeMap::new();
398
399 let require_timeout = {
400 let lists = self.inner.lists.read().await;
401
402 let mut require_timeout = true;
404
405 for (name, list) in lists.iter() {
406 requests_lists.insert(name.clone(), list.next_request(txn_id)?);
407 require_timeout = require_timeout && list.requires_timeout();
408 }
409
410 require_timeout
411 };
412
413 let mut position_guard = {
421 debug!("Waiting to acquire the `position` lock");
422
423 let _timer = timer!("acquiring the `position` lock");
424
425 self.inner.position.clone().lock_owned().await
426 };
427
428 debug!(pos = ?position_guard.pos, "Got a position");
429
430 let to_device_enabled =
431 self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
432
433 let restored_fields = if self.inner.share_pos || to_device_enabled {
434 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
435 } else {
436 None
437 };
438
439 let pos = if self.inner.share_pos {
442 if let Some(fields) = &restored_fields {
443 if fields.pos != position_guard.pos {
445 info!(
446 "Pos from previous request ('{:?}') was different from \
447 pos in database ('{:?}').",
448 position_guard.pos, fields.pos
449 );
450 position_guard.pos = fields.pos.clone();
451 }
452 fields.pos.clone()
453 } else {
454 position_guard.pos.clone()
455 }
456 } else {
457 position_guard.pos.clone()
458 };
459
460 Span::current().record("pos", &pos);
461
462 #[cfg(feature = "e2e-encryption")]
471 if pos.is_none() && self.is_e2ee_enabled() {
472 info!("Marking all tracked users as dirty");
473
474 let olm_machine = self.inner.client.olm_machine().await;
475 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
476 olm_machine.mark_all_tracked_users_as_dirty().await?;
477 }
478
479 let timeout = require_timeout.then(|| self.inner.poll_timeout);
484
485 let mut request = assign!(http::Request::new(), {
486 conn_id: Some(self.inner.id.clone()),
487 pos,
488 timeout,
489 lists: requests_lists,
490 });
491
492 self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
494
495 if to_device_enabled {
499 request.extensions.to_device.since =
500 restored_fields.and_then(|fields| fields.to_device_token);
501 }
502
503 if let Some(txn_id) = txn_id.get() {
505 request.txn_id = Some(txn_id.to_string());
506 }
507
508 Ok((
509 request,
511 RequestConfig::default()
514 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
515 .retry_limit(3),
516 position_guard,
517 ))
518 }
519
520 async fn send_sync_request(
524 &self,
525 request: http::Request,
526 request_config: RequestConfig,
527 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
528 ) -> Result<UpdateSummary> {
529 debug!("Sending request");
530
531 let requested_required_states = RequestedRequiredStates::from(&request);
533 let request = self.inner.client.send(request).with_request_config(request_config);
534
535 #[cfg(feature = "e2e-encryption")]
542 let response = {
543 if self.is_e2ee_enabled() {
544 let client = self.inner.client.clone();
561 let e2ee_uploads = spawn(
562 async move {
563 if let Err(error) = client.send_outgoing_requests().await {
564 error!(?error, "Error while sending outgoing E2EE requests");
565 }
566 }
567 .instrument(Span::current()),
568 )
569 .abort_on_drop();
572
573 let response = request.await?;
575
576 e2ee_uploads.await.map_err(|error| Error::JoinError {
581 task_description: "e2ee_uploads".to_owned(),
582 error,
583 })?;
584
585 response
586 } else {
587 request.await?
588 }
589 };
590
591 #[cfg(not(feature = "e2e-encryption"))]
593 let response = request.await?;
594
595 debug!("Received response");
596
597 let this = self.clone();
607
608 let future = async move {
611 debug!("Start handling response");
612
613 let updates = this
619 .handle_response(response, &mut position_guard, requested_required_states)
620 .await?;
621
622 this.cache_to_storage(&position_guard).await?;
623
624 drop(position_guard);
627
628 debug!("Done handling response");
629
630 Ok(updates)
631 };
632
633 spawn(future.instrument(Span::current())).await.unwrap()
634 }
635
636 #[cfg(feature = "e2e-encryption")]
638 fn is_e2ee_enabled(&self) -> bool {
639 self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
640 }
641
642 fn is_thread_subscriptions_enabled(&self) -> bool {
645 self.inner.sticky.read().unwrap().data().extensions.thread_subscriptions.enabled
646 == Some(true)
647 }
648
649 #[cfg(not(feature = "e2e-encryption"))]
650 fn is_e2ee_enabled(&self) -> bool {
651 false
652 }
653
654 async fn must_process_rooms_response(&self) -> bool {
656 !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
659 || !self.inner.lists.read().await.is_empty()
660 }
661
662 #[doc(hidden)]
666 #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
667 pub async fn sync_once(&self) -> Result<UpdateSummary> {
668 let (request, request_config, position_guard) =
669 self.generate_sync_request(&mut LazyTransactionId::new()).await?;
670
671 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
673
674 self.inner.client.inner.sync_beat.notify(usize::MAX);
676
677 Ok(summaries)
678 }
679
680 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
690 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
691 debug!("Starting sync stream");
692
693 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
694
695 stream! {
696 loop {
697 debug!("Sync stream is running");
698
699 select! {
700 biased;
701
702 internal_message = internal_channel_receiver.recv() => {
703 use SlidingSyncInternalMessage::*;
704
705 debug!(?internal_message, "Sync stream has received an internal message");
706
707 match internal_message {
708 Err(_) | Ok(SyncLoopStop) => {
709 break;
710 }
711
712 Ok(SyncLoopSkipOverCurrentIteration) => {
713 continue;
714 }
715 }
716 }
717
718 update_summary = self.sync_once() => {
719 match update_summary {
720 Ok(updates) => {
721 yield Ok(updates);
722 }
723
724 Err(error) => {
726 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
727 self.expire_session().await;
729 }
730
731 yield Err(error);
732
733 break;
735 }
736 }
737 }
738 }
739 }
740
741 debug!("Sync stream has exited.");
742 }
743 }
744
745 pub fn stop_sync(&self) -> Result<()> {
754 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
755 }
756
757 #[doc(hidden)]
769 pub async fn expire_session(&self) {
770 info!("Session expired; resetting `pos` and sticky parameters");
771
772 {
773 let lists = self.inner.lists.read().await;
774 for list in lists.values() {
775 list.set_maximum_number_of_rooms(None);
777
778 list.invalidate_sticky_data();
780 }
781 }
782
783 {
785 let mut position = self.inner.position.lock().await;
786
787 position.pos = None;
789
790 if let Err(err) = self.cache_to_storage(&position).await {
794 warn!("Failed to invalidate cached sliding sync state: {err}");
795 }
796 }
797
798 {
799 let mut sticky = self.inner.sticky.write().unwrap();
800
801 sticky.data_mut().room_subscriptions.clear();
804 }
805 }
806}
807
808impl SlidingSyncInner {
809 #[instrument]
811 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
812 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
813 }
814
815 #[instrument]
818 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
819 let _ = self.internal_channel.send(message);
821 }
822}
823
824#[derive(Copy, Clone, Debug, PartialEq)]
825enum SlidingSyncInternalMessage {
826 SyncLoopStop,
828
829 SyncLoopSkipOverCurrentIteration,
832}
833
834#[cfg(any(test, feature = "testing"))]
835impl SlidingSync {
836 pub async fn set_pos(&self, new_pos: String) {
838 let mut position_lock = self.inner.position.lock().await;
839 position_lock.pos = Some(new_pos);
840 }
841
842 pub fn extensions_config(&self) -> http::request::Extensions {
848 let sticky = self.inner.sticky.read().unwrap();
849 sticky.data().extensions.clone()
850 }
851}
852
853#[derive(Clone, Debug)]
854pub(super) struct SlidingSyncPositionMarkers {
855 pos: Option<String>,
858}
859
860#[derive(Debug, Clone)]
863pub struct UpdateSummary {
864 pub lists: Vec<String>,
866 pub rooms: Vec<OwnedRoomId>,
868}
869
870#[derive(Debug, Default)]
874enum RoomSubscriptionState {
875 #[default]
879 Pending,
880
881 Applied,
884}
885
886#[derive(Debug)]
889pub(super) struct SlidingSyncStickyParameters {
890 room_subscriptions:
893 BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
894
895 extensions: http::request::Extensions,
898}
899
900impl SlidingSyncStickyParameters {
901 pub fn new(
903 room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
904 extensions: http::request::Extensions,
905 ) -> Self {
906 Self {
907 room_subscriptions: room_subscriptions
908 .into_iter()
909 .map(|(room_id, room_subscription)| {
910 (room_id, (RoomSubscriptionState::Pending, room_subscription))
911 })
912 .collect(),
913 extensions,
914 }
915 }
916}
917
918impl StickyData for SlidingSyncStickyParameters {
919 type Request = http::Request;
920
921 fn apply(&self, request: &mut Self::Request) {
922 request.room_subscriptions = self
923 .room_subscriptions
924 .iter()
925 .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
926 .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
927 .collect();
928 request.extensions = self.extensions.clone();
929 }
930
931 fn on_commit(&mut self) {
932 for (state, _room_subscription) in self.room_subscriptions.values_mut() {
934 if matches!(state, RoomSubscriptionState::Pending) {
935 *state = RoomSubscriptionState::Applied;
936 }
937 }
938 }
939}
940
941#[cfg(all(test, not(target_family = "wasm")))]
942#[allow(clippy::dbg_macro)]
943mod tests {
944 use std::{
945 collections::BTreeMap,
946 future::ready,
947 ops::Not,
948 sync::{Arc, Mutex},
949 time::Duration,
950 };
951
952 use assert_matches::assert_matches;
953 use event_listener::Listener;
954 use futures_util::{StreamExt, future::join_all, pin_mut};
955 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
956 use matrix_sdk_common::executor::spawn;
957 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
958 use ruma::{
959 OwnedRoomId, TransactionId,
960 api::client::error::ErrorKind,
961 assign,
962 events::{direct::DirectEvent, room::member::MembershipState},
963 owned_room_id, room_id,
964 serde::Raw,
965 uint,
966 };
967 use serde::Deserialize;
968 use serde_json::json;
969 use wiremock::{
970 Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
971 };
972
973 use super::{
974 SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
975 SlidingSyncStickyParameters, http,
976 sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
977 };
978 use crate::{
979 Client, Result,
980 sliding_sync::cache::restore_sliding_sync_state,
981 test_utils::{logged_in_client, mocks::MatrixMockServer},
982 };
983
984 #[derive(Copy, Clone)]
985 struct SlidingSyncMatcher;
986
987 impl Match for SlidingSyncMatcher {
988 fn matches(&self, request: &Request) -> bool {
989 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
990 && request.method == Method::POST
991 }
992 }
993
994 async fn new_sliding_sync(
995 lists: Vec<SlidingSyncListBuilder>,
996 ) -> Result<(MockServer, SlidingSync)> {
997 let server = MockServer::start().await;
998 let client = logged_in_client(Some(server.uri())).await;
999
1000 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1001
1002 for list in lists {
1003 sliding_sync_builder = sliding_sync_builder.add_list(list);
1004 }
1005
1006 let sliding_sync = sliding_sync_builder.build().await?;
1007
1008 Ok((server, sliding_sync))
1009 }
1010
1011 #[async_test]
1012 async fn test_subscribe_to_rooms() -> Result<()> {
1013 let (server, sliding_sync) = new_sliding_sync(vec![
1014 SlidingSyncList::builder("foo")
1015 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1016 ])
1017 .await?;
1018
1019 let stream = sliding_sync.sync();
1020 pin_mut!(stream);
1021
1022 let room_id_0 = room_id!("!r0:bar.org");
1023 let room_id_1 = room_id!("!r1:bar.org");
1024 let room_id_2 = room_id!("!r2:bar.org");
1025
1026 {
1027 let _mock_guard = Mock::given(SlidingSyncMatcher)
1028 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1029 "pos": "1",
1030 "lists": {},
1031 "rooms": {
1032 room_id_0: {
1033 "name": "Room #0",
1034 "initial": true,
1035 },
1036 room_id_1: {
1037 "name": "Room #1",
1038 "initial": true,
1039 },
1040 room_id_2: {
1041 "name": "Room #2",
1042 "initial": true,
1043 },
1044 }
1045 })))
1046 .mount_as_scoped(&server)
1047 .await;
1048
1049 let _ = stream.next().await.unwrap()?;
1050 }
1051
1052 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1053
1054 assert!(room0.are_members_synced().not());
1058
1059 {
1060 struct MemberMatcher(OwnedRoomId);
1061
1062 impl Match for MemberMatcher {
1063 fn matches(&self, request: &Request) -> bool {
1064 request.url.path()
1065 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1066 && request.method == Method::GET
1067 }
1068 }
1069
1070 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1071 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1072 "chunk": [],
1073 })))
1074 .mount_as_scoped(&server)
1075 .await;
1076
1077 assert_matches!(room0.request_members().await, Ok(()));
1078 }
1079
1080 assert!(room0.are_members_synced());
1082
1083 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1084
1085 assert!(room0.are_members_synced().not());
1088
1089 {
1090 let sticky = sliding_sync.inner.sticky.read().unwrap();
1091 let room_subscriptions = &sticky.data().room_subscriptions;
1092
1093 assert!(room_subscriptions.contains_key(room_id_0));
1094 assert!(room_subscriptions.contains_key(room_id_1));
1095 assert!(!room_subscriptions.contains_key(room_id_2));
1096 }
1097
1098 {
1101 struct MemberMatcher(OwnedRoomId);
1102
1103 impl Match for MemberMatcher {
1104 fn matches(&self, request: &Request) -> bool {
1105 request.url.path()
1106 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1107 && request.method == Method::GET
1108 }
1109 }
1110
1111 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1112 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1113 "chunk": [],
1114 })))
1115 .mount_as_scoped(&server)
1116 .await;
1117
1118 assert_matches!(room0.request_members().await, Ok(()));
1119 }
1120
1121 assert!(room0.are_members_synced());
1123
1124 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1125
1126 assert!(room0.are_members_synced());
1129
1130 Ok(())
1131 }
1132
1133 #[async_test]
1134 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1135 let (_server, sliding_sync) = new_sliding_sync(vec![
1136 SlidingSyncList::builder("foo")
1137 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1138 ])
1139 .await?;
1140
1141 let room_id_0 = room_id!("!r0:bar.org");
1142 let room_id_1 = room_id!("!r1:bar.org");
1143 let room_id_2 = room_id!("!r2:bar.org");
1144
1145 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1147
1148 {
1149 let sticky = sliding_sync.inner.sticky.read().unwrap();
1150 let room_subscriptions = &sticky.data().room_subscriptions;
1151
1152 assert!(room_subscriptions.contains_key(room_id_0));
1153 assert!(room_subscriptions.contains_key(room_id_1));
1154 assert!(room_subscriptions.contains_key(room_id_2).not());
1155 }
1156
1157 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1159
1160 {
1161 let sticky = sliding_sync.inner.sticky.read().unwrap();
1162 let room_subscriptions = &sticky.data().room_subscriptions;
1163
1164 assert!(room_subscriptions.contains_key(room_id_0));
1165 assert!(room_subscriptions.contains_key(room_id_1));
1166 assert!(room_subscriptions.contains_key(room_id_2));
1167 }
1168
1169 sliding_sync.expire_session().await;
1171
1172 {
1173 let sticky = sliding_sync.inner.sticky.read().unwrap();
1174 let room_subscriptions = &sticky.data().room_subscriptions;
1175
1176 assert!(room_subscriptions.is_empty());
1177 }
1178
1179 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1181
1182 {
1183 let sticky = sliding_sync.inner.sticky.read().unwrap();
1184 let room_subscriptions = &sticky.data().room_subscriptions;
1185
1186 assert!(room_subscriptions.contains_key(room_id_0).not());
1187 assert!(room_subscriptions.contains_key(room_id_1).not());
1188 assert!(room_subscriptions.contains_key(room_id_2));
1189 }
1190
1191 Ok(())
1192 }
1193
1194 #[async_test]
1195 async fn test_add_list() -> Result<()> {
1196 let (_server, sliding_sync) = new_sliding_sync(vec![
1197 SlidingSyncList::builder("foo")
1198 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1199 ])
1200 .await?;
1201
1202 let _stream = sliding_sync.sync();
1203 pin_mut!(_stream);
1204
1205 sliding_sync
1206 .add_list(
1207 SlidingSyncList::builder("bar")
1208 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1209 )
1210 .await?;
1211
1212 let lists = sliding_sync.inner.lists.read().await;
1213
1214 assert!(lists.contains_key("foo"));
1215 assert!(lists.contains_key("bar"));
1216
1217 Ok(())
1220 }
1221
1222 #[test]
1223 fn test_sticky_parameters_api_invalidated_flow() {
1224 let r0 = room_id!("!r0.matrix.org");
1225 let r1 = room_id!("!r1:matrix.org");
1226
1227 let mut room_subscriptions = BTreeMap::new();
1228 room_subscriptions.insert(r0.to_owned(), Default::default());
1229
1230 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1232 room_subscriptions,
1233 Default::default(),
1234 ));
1235 assert!(sticky.is_invalidated());
1236
1237 let txn_id: &TransactionId = "tid123".into();
1239
1240 let mut request = http::Request::default();
1241 request.txn_id = Some(txn_id.to_string());
1242
1243 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1244
1245 assert!(request.txn_id.is_some());
1246 assert_eq!(request.room_subscriptions.len(), 1);
1247 assert!(request.room_subscriptions.contains_key(r0));
1248
1249 let tid = request.txn_id.unwrap();
1250
1251 sticky.maybe_commit(tid.as_str().into());
1252 assert!(!sticky.is_invalidated());
1253
1254 sticky
1256 .data_mut()
1257 .room_subscriptions
1258 .insert(r1.to_owned(), (Default::default(), Default::default()));
1259 assert!(sticky.is_invalidated());
1260
1261 sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1263 assert!(sticky.is_invalidated());
1264
1265 let txn_id1: &TransactionId = "tid456".into();
1267 let mut request1 = http::Request::default();
1268 request1.txn_id = Some(txn_id1.to_string());
1269 sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1270
1271 assert!(sticky.is_invalidated());
1272 assert_eq!(request1.room_subscriptions.len(), 1);
1276 assert!(request1.room_subscriptions.contains_key(r1));
1277
1278 let txn_id2: &TransactionId = "tid789".into();
1279 let mut request2 = http::Request::default();
1280 request2.txn_id = Some(txn_id2.to_string());
1281
1282 sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1283 assert!(sticky.is_invalidated());
1284 assert_eq!(request2.room_subscriptions.len(), 1);
1287 assert!(request2.room_subscriptions.contains_key(r1));
1288
1289 sticky.maybe_commit(txn_id1);
1292 assert!(sticky.is_invalidated());
1293
1294 sticky.maybe_commit(txn_id2);
1296 assert!(!sticky.is_invalidated());
1297 }
1298
1299 #[test]
1300 fn test_room_subscriptions_are_sticky() {
1301 let r0 = room_id!("!r0.matrix.org");
1302 let r1 = room_id!("!r1:matrix.org");
1303
1304 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1305 BTreeMap::new(),
1306 Default::default(),
1307 ));
1308
1309 {
1311 sticky
1313 .data_mut()
1314 .room_subscriptions
1315 .insert(r0.to_owned(), (Default::default(), Default::default()));
1316
1317 let txn_id: &TransactionId = "tid0".into();
1319 let mut request = http::Request::default();
1320 request.txn_id = Some(txn_id.to_string());
1321
1322 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1323
1324 assert!(request.txn_id.is_some());
1325 assert_eq!(request.room_subscriptions.len(), 1);
1326 assert!(request.room_subscriptions.contains_key(r0));
1327
1328 let tid = request.txn_id.unwrap();
1330
1331 sticky.maybe_commit(tid.as_str().into());
1332 }
1333
1334 {
1336 sticky
1338 .data_mut()
1339 .room_subscriptions
1340 .insert(r1.to_owned(), (Default::default(), Default::default()));
1341
1342 let txn_id: &TransactionId = "tid1".into();
1344 let mut request = http::Request::default();
1345 request.txn_id = Some(txn_id.to_string());
1346
1347 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1348
1349 assert!(request.txn_id.is_some());
1350 assert_eq!(request.room_subscriptions.len(), 1);
1351 assert!(request.room_subscriptions.contains_key(r1));
1353
1354 }
1358
1359 {
1361 let txn_id: &TransactionId = "tid2".into();
1363 let mut request = http::Request::default();
1364 request.txn_id = Some(txn_id.to_string());
1365
1366 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1367
1368 assert!(request.txn_id.is_some());
1369 assert_eq!(request.room_subscriptions.len(), 1);
1370 assert!(request.room_subscriptions.contains_key(r1));
1372
1373 let tid = request.txn_id.unwrap();
1375
1376 sticky.maybe_commit(tid.as_str().into());
1377 }
1378
1379 {
1381 let txn_id: &TransactionId = "tid3".into();
1383 let mut request = http::Request::default();
1384 request.txn_id = Some(txn_id.to_string());
1385
1386 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1387
1388 assert!(request.txn_id.is_some());
1389 assert!(request.room_subscriptions.is_empty());
1391 }
1392 }
1393
1394 #[test]
1395 fn test_extensions_are_sticky() {
1396 let mut extensions = http::request::Extensions::default();
1397 extensions.account_data.enabled = Some(true);
1398
1399 let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1401 Default::default(),
1402 extensions,
1403 ));
1404
1405 assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1406
1407 let extensions = &sticky.data().extensions;
1410 assert_eq!(extensions.e2ee.enabled, None);
1411 assert_eq!(extensions.to_device.enabled, None);
1412 assert_eq!(extensions.to_device.since, None);
1413
1414 assert_eq!(extensions.account_data.enabled, Some(true));
1416
1417 let txn_id: &TransactionId = "tid123".into();
1418 let mut request = http::Request::default();
1419 request.txn_id = Some(txn_id.to_string());
1420 sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1421 assert!(sticky.is_invalidated());
1422 assert_eq!(request.extensions.to_device.enabled, None);
1423 assert_eq!(request.extensions.to_device.since, None);
1424 assert_eq!(request.extensions.e2ee.enabled, None);
1425 assert_eq!(request.extensions.account_data.enabled, Some(true));
1426 }
1427
1428 #[async_test]
1429 async fn test_sticky_extensions_plus_since() -> Result<()> {
1430 let server = MockServer::start().await;
1431 let client = logged_in_client(Some(server.uri())).await;
1432
1433 let sync = client
1434 .sliding_sync("test-slidingsync")?
1435 .add_list(SlidingSyncList::builder("new_list"))
1436 .build()
1437 .await?;
1438
1439 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1441 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1442 assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1443
1444 let sync = client
1446 .sliding_sync("test-slidingsync")?
1447 .add_list(SlidingSyncList::builder("new_list"))
1448 .with_to_device_extension(
1449 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1450 )
1451 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1452 .build()
1453 .await?;
1454
1455 let txn_id = TransactionId::new();
1458 let (request, _, _) = sync
1459 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1460 .await?;
1461
1462 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1463 assert_eq!(request.extensions.to_device.enabled, Some(true));
1464 assert!(request.extensions.to_device.since.is_none());
1465
1466 {
1467 let mut sticky = sync.inner.sticky.write().unwrap();
1469 assert!(sticky.is_invalidated());
1470 sticky.maybe_commit(
1471 "hopefully the rng won't generate this very specific transaction id".into(),
1472 );
1473 assert!(sticky.is_invalidated());
1474 }
1475
1476 let txn_id2 = TransactionId::new();
1478 let (request, _, _) = sync
1479 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1480 .await?;
1481
1482 assert_eq!(request.extensions.e2ee.enabled, Some(true));
1483 assert_eq!(request.extensions.to_device.enabled, Some(true));
1484 assert!(request.extensions.to_device.since.is_none());
1485
1486 assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1487
1488 {
1489 let mut sticky = sync.inner.sticky.write().unwrap();
1491 assert!(sticky.is_invalidated());
1492 sticky.maybe_commit(txn_id2.as_str().into());
1493 assert!(!sticky.is_invalidated());
1494 }
1495
1496 let txn_id = TransactionId::new();
1498 let (request, _, _) = sync
1499 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1500 .await?;
1501 assert!(request.extensions.e2ee.enabled.is_none());
1502 assert!(request.extensions.to_device.enabled.is_none());
1503 assert!(request.extensions.to_device.since.is_none());
1504
1505 let _since_token = "since";
1509
1510 #[cfg(feature = "e2e-encryption")]
1511 {
1512 use matrix_sdk_base::crypto::store::types::Changes;
1513 if let Some(olm_machine) = &*client.olm_machine().await {
1514 olm_machine
1515 .store()
1516 .save_changes(Changes {
1517 next_batch_token: Some(_since_token.to_owned()),
1518 ..Default::default()
1519 })
1520 .await?;
1521 }
1522 }
1523
1524 let txn_id = TransactionId::new();
1525 let (request, _, _) = sync
1526 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1527 .await?;
1528
1529 assert!(request.extensions.e2ee.enabled.is_none());
1530 assert!(request.extensions.to_device.enabled.is_none());
1531
1532 #[cfg(feature = "e2e-encryption")]
1533 assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1534
1535 Ok(())
1536 }
1537
1538 #[async_test]
1544 #[cfg(feature = "e2e-encryption")]
1545 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1546 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1547 use matrix_sdk_test::ruma_response_from_json;
1548 use ruma::user_id;
1549
1550 let server = MockServer::start().await;
1551 let client = logged_in_client(Some(server.uri())).await;
1552
1553 let alice = user_id!("@alice:localhost");
1554 let bob = user_id!("@bob:localhost");
1555 let me = user_id!("@example:localhost");
1556
1557 {
1560 let olm_machine = client.olm_machine().await;
1561 let olm_machine = olm_machine.as_ref().unwrap();
1562
1563 olm_machine.update_tracked_users([alice, bob]).await?;
1564
1565 let outgoing_requests = olm_machine.outgoing_requests().await?;
1567
1568 assert_eq!(outgoing_requests.len(), 2);
1569 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1570 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1571
1572 olm_machine
1574 .mark_request_as_sent(
1575 outgoing_requests[0].request_id(),
1576 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1577 "one_time_key_counts": {}
1578 }))),
1579 )
1580 .await?;
1581
1582 olm_machine
1583 .mark_request_as_sent(
1584 outgoing_requests[1].request_id(),
1585 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1586 "device_keys": {
1587 alice: {},
1588 bob: {},
1589 }
1590 }))),
1591 )
1592 .await?;
1593
1594 let outgoing_requests = olm_machine.outgoing_requests().await?;
1596
1597 assert_eq!(outgoing_requests.len(), 1);
1598 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1599
1600 olm_machine
1601 .mark_request_as_sent(
1602 outgoing_requests[0].request_id(),
1603 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1604 "device_keys": {
1605 me: {},
1606 }
1607 }))),
1608 )
1609 .await?;
1610
1611 let outgoing_requests = olm_machine.outgoing_requests().await?;
1613
1614 assert!(outgoing_requests.is_empty());
1615 }
1616
1617 let sync = client
1618 .sliding_sync("test-slidingsync")?
1619 .add_list(SlidingSyncList::builder("new_list"))
1620 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1621 .build()
1622 .await?;
1623
1624 let txn_id = TransactionId::new();
1626 let (_request, _, _) = sync
1627 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1628 .await?;
1629
1630 {
1632 let olm_machine = client.olm_machine().await;
1633 let olm_machine = olm_machine.as_ref().unwrap();
1634
1635 let outgoing_requests = olm_machine.outgoing_requests().await?;
1637
1638 assert_eq!(outgoing_requests.len(), 1);
1639 assert_matches!(
1640 outgoing_requests[0].request(),
1641 AnyOutgoingRequest::KeysQuery(request) => {
1642 assert!(request.device_keys.contains_key(alice));
1643 assert!(request.device_keys.contains_key(bob));
1644 assert!(request.device_keys.contains_key(me));
1645 }
1646 );
1647
1648 olm_machine
1650 .mark_request_as_sent(
1651 outgoing_requests[0].request_id(),
1652 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1653 "device_keys": {
1654 alice: {},
1655 bob: {},
1656 me: {},
1657 }
1658 }))),
1659 )
1660 .await?;
1661 }
1662
1663 sync.set_pos("chocolat".to_owned()).await;
1665
1666 let txn_id = TransactionId::new();
1667 let (_request, _, _) = sync
1668 .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1669 .await?;
1670
1671 {
1673 let olm_machine = client.olm_machine().await;
1674 let olm_machine = olm_machine.as_ref().unwrap();
1675
1676 let outgoing_requests = olm_machine.outgoing_requests().await?;
1678
1679 assert!(outgoing_requests.is_empty());
1680 }
1681
1682 Ok(())
1683 }
1684
1685 #[async_test]
1686 async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1687 let server = MockServer::start().await;
1688 let client = logged_in_client(Some(server.uri())).await;
1689
1690 let sliding_sync = client
1691 .sliding_sync("test-slidingsync")?
1692 .with_to_device_extension(
1693 assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1694 )
1695 .build()
1696 .await?;
1697
1698 let (request, _, _) =
1700 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1701 assert!(request.extensions.to_device.enabled.is_some());
1702
1703 let sync = sliding_sync.sync();
1704 pin_mut!(sync);
1705
1706 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1708
1709 #[derive(Deserialize)]
1710 struct PartialRequest {
1711 txn_id: Option<String>,
1712 }
1713
1714 {
1715 let _mock_guard = Mock::given(SlidingSyncMatcher)
1716 .respond_with(|request: &Request| {
1717 let request: PartialRequest = request.body_json().unwrap();
1719
1720 ResponseTemplate::new(200).set_body_json(json!({
1721 "txn_id": request.txn_id,
1722 "pos": "0",
1723 }))
1724 })
1725 .mount_as_scoped(&server)
1726 .await;
1727
1728 let next = sync.next().await;
1729 assert_matches!(next, Some(Ok(_update_summary)));
1730
1731 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1733 }
1734
1735 let (request, _, _) =
1737 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1738 assert!(request.extensions.to_device.enabled.is_none());
1739
1740 {
1742 let _mock_guard = Mock::given(SlidingSyncMatcher)
1743 .respond_with(|request: &Request| {
1744 let request: PartialRequest = request.body_json().unwrap();
1746
1747 ResponseTemplate::new(200).set_body_json(json!({
1748 "txn_id": request.txn_id,
1749 "pos": "1",
1750 }))
1751 })
1752 .mount_as_scoped(&server)
1753 .await;
1754
1755 let next = sync.next().await;
1756 assert_matches!(next, Some(Ok(_update_summary)));
1757
1758 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1760 }
1761
1762 {
1765 let _mock_guard = Mock::given(SlidingSyncMatcher)
1766 .respond_with(|request: &Request| {
1767 let request: PartialRequest = request.body_json().unwrap();
1769
1770 ResponseTemplate::new(200).set_body_json(json!({
1771 "txn_id": request.txn_id,
1772 "pos": "0", }))
1774 })
1775 .up_to_n_times(1) .mount_as_scoped(&server)
1777 .await;
1778
1779 let next = sync.next().await;
1780 assert_matches!(next, Some(Ok(_update_summary)));
1781
1782 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1784 }
1785
1786 {
1791 let _mock_guard = Mock::given(SlidingSyncMatcher)
1792 .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1793 "error": "foo",
1794 "errcode": "M_UNKNOWN_POS",
1795 })))
1796 .mount_as_scoped(&server)
1797 .await;
1798
1799 let next = sync.next().await;
1800
1801 assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1803
1804 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1806
1807 let (request, _, _) =
1809 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1810
1811 assert!(request.extensions.to_device.enabled.is_some());
1812
1813 assert!(sync.next().await.is_none());
1815 }
1816
1817 Ok(())
1818 }
1819
1820 #[cfg(feature = "e2e-encryption")]
1821 #[async_test]
1822 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1823 let server = MockServer::start().await;
1824
1825 #[derive(Deserialize)]
1826 struct PartialRequest {
1827 txn_id: Option<String>,
1828 }
1829
1830 let server_pos = Arc::new(Mutex::new(0));
1831 let _mock_guard = Mock::given(SlidingSyncMatcher)
1832 .respond_with(move |request: &Request| {
1833 let request: PartialRequest = request.body_json().unwrap();
1835 let pos = {
1836 let mut pos = server_pos.lock().unwrap();
1837 let prev = *pos;
1838 *pos += 1;
1839 prev
1840 };
1841
1842 ResponseTemplate::new(200).set_body_json(json!({
1843 "txn_id": request.txn_id,
1844 "pos": pos.to_string(),
1845 }))
1846 })
1847 .mount_as_scoped(&server)
1848 .await;
1849
1850 let client = logged_in_client(Some(server.uri())).await;
1851
1852 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1853
1854 {
1856 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1857
1858 let (request, _, _) =
1859 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1860 assert!(request.pos.is_none());
1861 }
1862
1863 let sync = sliding_sync.sync();
1864 pin_mut!(sync);
1865
1866 let next = sync.next().await;
1869 assert_matches!(next, Some(Ok(_update_summary)));
1870
1871 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1872
1873 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1874 .await?
1875 .expect("must have restored fields");
1876
1877 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1880
1881 {
1885 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1886
1887 let mut position_guard = other_sync.inner.position.lock().await;
1888 position_guard.pos = Some("yolo".to_owned());
1889
1890 other_sync.cache_to_storage(&position_guard).await?;
1891 }
1892
1893 {
1895 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1896 let (request, _, _) =
1897 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1898 assert_eq!(request.pos.as_deref(), Some("0"));
1899 }
1900
1901 {
1904 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1905 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1906 }
1907
1908 Ok(())
1909 }
1910
1911 #[cfg(feature = "e2e-encryption")]
1912 #[async_test]
1913 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1914 let server = MockServer::start().await;
1915
1916 #[derive(Deserialize)]
1917 struct PartialRequest {
1918 txn_id: Option<String>,
1919 }
1920
1921 let server_pos = Arc::new(Mutex::new(0));
1922 let _mock_guard = Mock::given(SlidingSyncMatcher)
1923 .respond_with(move |request: &Request| {
1924 let request: PartialRequest = request.body_json().unwrap();
1926 let pos = {
1927 let mut pos = server_pos.lock().unwrap();
1928 let prev = *pos;
1929 *pos += 1;
1930 prev
1931 };
1932
1933 ResponseTemplate::new(200).set_body_json(json!({
1934 "txn_id": request.txn_id,
1935 "pos": pos.to_string(),
1936 }))
1937 })
1938 .mount_as_scoped(&server)
1939 .await;
1940
1941 let client = logged_in_client(Some(server.uri())).await;
1942
1943 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1944
1945 {
1947 let (request, _, _) =
1948 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1949
1950 assert!(request.pos.is_none());
1951 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1952 }
1953
1954 let sync = sliding_sync.sync();
1955 pin_mut!(sync);
1956
1957 let next = sync.next().await;
1960 assert_matches!(next, Some(Ok(_update_summary)));
1961
1962 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1963
1964 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1965 .await?
1966 .expect("must have restored fields");
1967
1968 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1971
1972 {
1974 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1975
1976 let mut position_guard = other_sync.inner.position.lock().await;
1977 position_guard.pos = Some("42".to_owned());
1978
1979 other_sync.cache_to_storage(&position_guard).await?;
1980 }
1981
1982 {
1984 let (request, _, _) =
1985 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1986 assert_eq!(request.pos.as_deref(), Some("42"));
1987 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1988 }
1989
1990 {
1992 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1993 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1994
1995 let (request, _, _) =
1996 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1997 assert_eq!(request.pos.as_deref(), Some("42"));
1998 }
1999
2000 sliding_sync.expire_session().await;
2003
2004 {
2005 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2006
2007 let (request, _, _) =
2008 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2009 assert!(request.pos.is_none());
2010 }
2011
2012 {
2014 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2015 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2016
2017 let (request, _, _) =
2018 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2019 assert!(request.pos.is_none());
2020 }
2021
2022 Ok(())
2023 }
2024
2025 #[async_test]
2026 async fn test_stop_sync_loop() -> Result<()> {
2027 let (_server, sliding_sync) = new_sliding_sync(vec![
2028 SlidingSyncList::builder("foo")
2029 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2030 ])
2031 .await?;
2032
2033 let stream = sliding_sync.sync();
2035 pin_mut!(stream);
2036
2037 assert!(stream.next().await.is_some());
2039
2040 sliding_sync.stop_sync()?;
2042
2043 assert!(stream.next().await.is_none());
2045
2046 let stream = sliding_sync.sync();
2048 pin_mut!(stream);
2049
2050 assert!(stream.next().await.is_some());
2052
2053 Ok(())
2054 }
2055
2056 #[async_test]
2057 async fn test_process_read_receipts() -> Result<()> {
2058 let room = owned_room_id!("!pony:example.org");
2059
2060 let server = MockServer::start().await;
2061 let client = logged_in_client(Some(server.uri())).await;
2062 client.event_cache().subscribe().unwrap();
2063
2064 let sliding_sync = client
2065 .sliding_sync("test")?
2066 .with_receipt_extension(
2067 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2068 )
2069 .add_list(
2070 SlidingSyncList::builder("all")
2071 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2072 )
2073 .build()
2074 .await?;
2075
2076 {
2078 let server_response = assign!(http::Response::new("0".to_owned()), {
2079 rooms: BTreeMap::from([(
2080 room.clone(),
2081 http::response::Room::default(),
2082 )])
2083 });
2084
2085 let _summary = {
2086 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2087 sliding_sync
2088 .handle_response(
2089 server_response.clone(),
2090 &mut pos_guard,
2091 RequestedRequiredStates::default(),
2092 )
2093 .await?
2094 };
2095 }
2096
2097 let server_response = assign!(http::Response::new("1".to_owned()), {
2098 extensions: assign!(http::response::Extensions::default(), {
2099 receipts: assign!(http::response::Receipts::default(), {
2100 rooms: BTreeMap::from([
2101 (
2102 room.clone(),
2103 Raw::from_json_string(
2104 json!({
2105 "room_id": room,
2106 "type": "m.receipt",
2107 "content": {
2108 "$event:bar.org": {
2109 "m.read": {
2110 client.user_id().unwrap(): {
2111 "ts": 1436451550,
2112 }
2113 }
2114 }
2115 }
2116 })
2117 .to_string(),
2118 ).unwrap()
2119 )
2120 ])
2121 })
2122 })
2123 });
2124
2125 let summary = {
2126 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2127 sliding_sync
2128 .handle_response(
2129 server_response.clone(),
2130 &mut pos_guard,
2131 RequestedRequiredStates::default(),
2132 )
2133 .await?
2134 };
2135
2136 assert!(summary.rooms.contains(&room));
2137
2138 Ok(())
2139 }
2140
2141 #[async_test]
2142 async fn test_process_marked_unread_room_account_data() -> Result<()> {
2143 let room_id = owned_room_id!("!unicorn:example.org");
2144
2145 let server = MockServer::start().await;
2146 let client = logged_in_client(Some(server.uri())).await;
2147
2148 let sliding_sync = client
2151 .sliding_sync("test")?
2152 .with_account_data_extension(
2153 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2154 )
2155 .add_list(
2156 SlidingSyncList::builder("all")
2157 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2158 )
2159 .build()
2160 .await?;
2161
2162 {
2164 let server_response = assign!(http::Response::new("0".to_owned()), {
2165 rooms: BTreeMap::from([(
2166 room_id.clone(),
2167 http::response::Room::default(),
2168 )])
2169 });
2170
2171 let _summary = {
2172 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2173 sliding_sync
2174 .handle_response(
2175 server_response.clone(),
2176 &mut pos_guard,
2177 RequestedRequiredStates::default(),
2178 )
2179 .await?
2180 };
2181 }
2182
2183 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2187
2188 let update_summary = {
2189 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2190 sliding_sync
2191 .handle_response(
2192 server_response.clone(),
2193 &mut pos_guard,
2194 RequestedRequiredStates::default(),
2195 )
2196 .await?
2197 };
2198
2199 assert!(update_summary.rooms.contains(&room_id));
2202
2203 let room = client.get_room(&room_id).unwrap();
2204
2205 assert!(room.is_marked_unread());
2208
2209 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2212
2213 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2214 sliding_sync
2215 .handle_response(
2216 server_response.clone(),
2217 &mut pos_guard,
2218 RequestedRequiredStates::default(),
2219 )
2220 .await?;
2221
2222 let room = client.get_room(&room_id).unwrap();
2223
2224 assert!(!room.is_marked_unread());
2225
2226 Ok(())
2227 }
2228
2229 fn make_mark_unread_response(
2230 response_number: &str,
2231 room_id: OwnedRoomId,
2232 unread: bool,
2233 add_rooms_section: bool,
2234 ) -> http::Response {
2235 let rooms = if add_rooms_section {
2236 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2237 } else {
2238 BTreeMap::new()
2239 };
2240
2241 let extensions = assign!(http::response::Extensions::default(), {
2242 account_data: assign!(http::response::AccountData::default(), {
2243 rooms: BTreeMap::from([
2244 (
2245 room_id,
2246 vec![
2247 Raw::from_json_string(
2248 json!({
2249 "content": {
2250 "unread": unread
2251 },
2252 "type": "m.marked_unread"
2253 })
2254 .to_string(),
2255 ).unwrap()
2256 ]
2257 )
2258 ])
2259 })
2260 });
2261
2262 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2263 }
2264
2265 #[async_test]
2266 async fn test_process_rooms_account_data() -> Result<()> {
2267 let room = owned_room_id!("!pony:example.org");
2268
2269 let server = MockServer::start().await;
2270 let client = logged_in_client(Some(server.uri())).await;
2271
2272 let sliding_sync = client
2273 .sliding_sync("test")?
2274 .with_account_data_extension(
2275 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2276 )
2277 .add_list(
2278 SlidingSyncList::builder("all")
2279 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2280 )
2281 .build()
2282 .await?;
2283
2284 {
2286 let server_response = assign!(http::Response::new("0".to_owned()), {
2287 rooms: BTreeMap::from([(
2288 room.clone(),
2289 http::response::Room::default(),
2290 )])
2291 });
2292
2293 let _summary = {
2294 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2295 sliding_sync
2296 .handle_response(
2297 server_response.clone(),
2298 &mut pos_guard,
2299 RequestedRequiredStates::default(),
2300 )
2301 .await?
2302 };
2303 }
2304
2305 let server_response = assign!(http::Response::new("1".to_owned()), {
2306 extensions: assign!(http::response::Extensions::default(), {
2307 account_data: assign!(http::response::AccountData::default(), {
2308 rooms: BTreeMap::from([
2309 (
2310 room.clone(),
2311 vec![
2312 Raw::from_json_string(
2313 json!({
2314 "content": {
2315 "tags": {
2316 "u.work": {
2317 "order": 0.9
2318 }
2319 }
2320 },
2321 "type": "m.tag"
2322 })
2323 .to_string(),
2324 ).unwrap()
2325 ]
2326 )
2327 ])
2328 })
2329 })
2330 });
2331 let summary = {
2332 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2333 sliding_sync
2334 .handle_response(
2335 server_response.clone(),
2336 &mut pos_guard,
2337 RequestedRequiredStates::default(),
2338 )
2339 .await?
2340 };
2341
2342 assert!(summary.rooms.contains(&room));
2343
2344 Ok(())
2345 }
2346
2347 #[async_test]
2348 #[cfg(feature = "e2e-encryption")]
2349 async fn test_process_only_encryption_events() -> Result<()> {
2350 use ruma::OneTimeKeyAlgorithm;
2351
2352 let room = owned_room_id!("!croissant:example.org");
2353
2354 let server = MockServer::start().await;
2355 let client = logged_in_client(Some(server.uri())).await;
2356
2357 let server_response = assign!(http::Response::new("0".to_owned()), {
2358 rooms: BTreeMap::from([(
2359 room.clone(),
2360 assign!(http::response::Room::default(), {
2361 name: Some("Croissants lovers".to_owned()),
2362 timeline: Vec::new(),
2363 }),
2364 )]),
2365
2366 extensions: assign!(http::response::Extensions::default(), {
2367 e2ee: assign!(http::response::E2EE::default(), {
2368 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2369 }),
2370 to_device: Some(assign!(http::response::ToDevice::default(), {
2371 next_batch: "to-device-token".to_owned(),
2372 })),
2373 })
2374 });
2375
2376 let sliding_sync = client
2380 .sliding_sync("test")?
2381 .with_to_device_extension(
2382 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2383 )
2384 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2385 .build()
2386 .await?;
2387
2388 {
2389 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2390
2391 sliding_sync
2392 .handle_response(
2393 server_response.clone(),
2394 &mut position_guard,
2395 RequestedRequiredStates::default(),
2396 )
2397 .await?;
2398 }
2399
2400 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2402 assert_eq!(uploaded_key_count, 42);
2403
2404 {
2405 let olm_machine = &*client.olm_machine_for_testing().await;
2406 assert_eq!(
2407 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2408 Some("to-device-token")
2409 );
2410 }
2411
2412 assert!(client.get_room(&room).is_none());
2414
2415 let client = logged_in_client(Some(server.uri())).await;
2418
2419 let sliding_sync = client
2420 .sliding_sync("test")?
2421 .add_list(SlidingSyncList::builder("thelist"))
2422 .build()
2423 .await?;
2424
2425 {
2426 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2427
2428 sliding_sync
2429 .handle_response(
2430 server_response.clone(),
2431 &mut position_guard,
2432 RequestedRequiredStates::default(),
2433 )
2434 .await?;
2435 }
2436
2437 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2439 assert_eq!(uploaded_key_count, 0);
2440
2441 {
2442 let olm_machine = &*client.olm_machine_for_testing().await;
2443 assert_eq!(
2444 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2445 None
2446 );
2447 }
2448
2449 assert!(client.get_room(&room).is_some());
2451
2452 let client = logged_in_client(Some(server.uri())).await;
2454
2455 let sliding_sync = client
2456 .sliding_sync("test")?
2457 .add_list(SlidingSyncList::builder("thelist"))
2458 .with_to_device_extension(
2459 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2460 )
2461 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2462 .build()
2463 .await?;
2464
2465 {
2466 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2467
2468 sliding_sync
2469 .handle_response(
2470 server_response.clone(),
2471 &mut position_guard,
2472 RequestedRequiredStates::default(),
2473 )
2474 .await?;
2475 }
2476
2477 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2479 assert_eq!(uploaded_key_count, 42);
2480
2481 {
2482 let olm_machine = &*client.olm_machine_for_testing().await;
2483 assert_eq!(
2484 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2485 Some("to-device-token")
2486 );
2487 }
2488
2489 assert!(client.get_room(&room).is_some());
2491
2492 Ok(())
2493 }
2494
2495 #[async_test]
2496 async fn test_lock_multiple_requests() -> Result<()> {
2497 let server = MockServer::start().await;
2498 let client = logged_in_client(Some(server.uri())).await;
2499
2500 let pos = Arc::new(Mutex::new(0));
2501 let _mock_guard = Mock::given(SlidingSyncMatcher)
2502 .respond_with(move |_: &Request| {
2503 let mut pos = pos.lock().unwrap();
2504 *pos += 1;
2505 ResponseTemplate::new(200).set_body_json(json!({
2506 "pos": pos.to_string(),
2507 "lists": {},
2508 "rooms": {}
2509 }))
2510 })
2511 .mount_as_scoped(&server)
2512 .await;
2513
2514 let sliding_sync = client
2515 .sliding_sync("test")?
2516 .with_to_device_extension(
2517 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2518 )
2519 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2520 .build()
2521 .await?;
2522
2523 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2526
2527 for result in requests.await {
2528 result?;
2529 }
2530
2531 Ok(())
2532 }
2533
2534 #[async_test]
2535 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2536 let server = MockServer::start().await;
2537 let client = logged_in_client(Some(server.uri())).await;
2538
2539 let pos = Arc::new(Mutex::new(0));
2540 let _mock_guard = Mock::given(SlidingSyncMatcher)
2541 .respond_with(move |_: &Request| {
2542 let mut pos = pos.lock().unwrap();
2543 *pos += 1;
2544 ResponseTemplate::new(200)
2546 .set_body_json(json!({
2547 "pos": pos.to_string(),
2548 "lists": {},
2549 "rooms": {}
2550 }))
2551 .set_delay(Duration::from_secs(2))
2552 })
2553 .mount_as_scoped(&server)
2554 .await;
2555
2556 let sliding_sync =
2557 client
2558 .sliding_sync("test")?
2559 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2560 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2561 ))
2562 .add_list(
2563 SlidingSyncList::builder("another-list")
2564 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2565 )
2566 .build()
2567 .await?;
2568
2569 let stream = sliding_sync.sync();
2570 pin_mut!(stream);
2571
2572 let cloned_sync = sliding_sync.clone();
2573 spawn(async move {
2574 tokio::time::sleep(Duration::from_millis(100)).await;
2575
2576 cloned_sync
2577 .on_list("another-list", |list| {
2578 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2579 ready(())
2580 })
2581 .await;
2582 });
2583
2584 assert_matches!(stream.next().await, Some(Ok(_)));
2585
2586 sliding_sync.stop_sync().unwrap();
2587
2588 assert_matches!(stream.next().await, None);
2589
2590 let mut num_requests = 0;
2591
2592 for request in server.received_requests().await.unwrap() {
2593 if !SlidingSyncMatcher.matches(&request) {
2594 continue;
2595 }
2596
2597 let another_list_ranges = if num_requests == 0 {
2598 json!([[0, 10]])
2600 } else {
2601 json!([[10, 20]])
2603 };
2604
2605 num_requests += 1;
2606 assert!(num_requests <= 2, "more than one request hit the server");
2607
2608 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2609
2610 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2611 &json_value,
2612 &json!({
2613 "conn_id": "test",
2614 "lists": {
2615 "room-list": {
2616 "ranges": [[0, 9]],
2617 "required_state": [
2618 ["m.room.encryption", ""],
2619 ["m.room.tombstone", ""]
2620 ],
2621 },
2622 "another-list": {
2623 "ranges": another_list_ranges,
2624 "required_state": [
2625 ["m.room.encryption", ""],
2626 ["m.room.tombstone", ""]
2627 ],
2628 },
2629 }
2630 }),
2631 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2632 ) {
2633 dbg!(json_value);
2634 panic!("json differ: {err}");
2635 }
2636 }
2637
2638 assert_eq!(num_requests, 2);
2639
2640 Ok(())
2641 }
2642
2643 #[async_test]
2644 async fn test_timeout_zero_list() -> Result<()> {
2645 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2646
2647 let (request, _, _) =
2648 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2649
2650 assert!(request.timeout.is_some());
2653
2654 Ok(())
2655 }
2656
2657 #[async_test]
2658 async fn test_timeout_one_list() -> Result<()> {
2659 let (_server, sliding_sync) = new_sliding_sync(vec![
2660 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2661 ])
2662 .await?;
2663
2664 let (request, _, _) =
2665 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2666
2667 assert!(request.timeout.is_none());
2669
2670 {
2672 let server_response = assign!(http::Response::new("0".to_owned()), {
2673 lists: BTreeMap::from([(
2674 "foo".to_owned(),
2675 assign!(http::response::List::default(), {
2676 count: uint!(7),
2677 })
2678 )])
2679 });
2680
2681 let _summary = {
2682 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2683 sliding_sync
2684 .handle_response(
2685 server_response.clone(),
2686 &mut pos_guard,
2687 RequestedRequiredStates::default(),
2688 )
2689 .await?
2690 };
2691 }
2692
2693 let (request, _, _) =
2694 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2695
2696 assert!(request.timeout.is_some());
2698
2699 Ok(())
2700 }
2701
2702 #[async_test]
2703 async fn test_timeout_three_lists() -> Result<()> {
2704 let (_server, sliding_sync) = new_sliding_sync(vec![
2705 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2706 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2707 SlidingSyncList::builder("baz")
2708 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2709 ])
2710 .await?;
2711
2712 let (request, _, _) =
2713 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2714
2715 assert!(request.timeout.is_none());
2717
2718 {
2720 let server_response = assign!(http::Response::new("0".to_owned()), {
2721 lists: BTreeMap::from([(
2722 "foo".to_owned(),
2723 assign!(http::response::List::default(), {
2724 count: uint!(7),
2725 })
2726 )])
2727 });
2728
2729 let _summary = {
2730 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2731 sliding_sync
2732 .handle_response(
2733 server_response.clone(),
2734 &mut pos_guard,
2735 RequestedRequiredStates::default(),
2736 )
2737 .await?
2738 };
2739 }
2740
2741 let (request, _, _) =
2742 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2743
2744 assert!(request.timeout.is_none());
2746
2747 {
2749 let server_response = assign!(http::Response::new("1".to_owned()), {
2750 lists: BTreeMap::from([(
2751 "bar".to_owned(),
2752 assign!(http::response::List::default(), {
2753 count: uint!(7),
2754 })
2755 )])
2756 });
2757
2758 let _summary = {
2759 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2760 sliding_sync
2761 .handle_response(
2762 server_response.clone(),
2763 &mut pos_guard,
2764 RequestedRequiredStates::default(),
2765 )
2766 .await?
2767 };
2768 }
2769
2770 let (request, _, _) =
2771 sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2772
2773 assert!(request.timeout.is_some());
2775
2776 Ok(())
2777 }
2778
2779 #[async_test]
2780 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2781 let server = MockServer::start().await;
2782 let client = logged_in_client(Some(server.uri())).await;
2783
2784 let _mock_guard = Mock::given(SlidingSyncMatcher)
2785 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2786 "pos": "0",
2787 "lists": {},
2788 "rooms": {}
2789 })))
2790 .mount_as_scoped(&server)
2791 .await;
2792
2793 let sliding_sync = client
2794 .sliding_sync("test")?
2795 .with_to_device_extension(
2796 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2797 )
2798 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2799 .build()
2800 .await?;
2801
2802 let sliding_sync = Arc::new(sliding_sync);
2803
2804 let sync_beat_listener = client.inner.sync_beat.listen();
2806 sliding_sync.sync_once().await?;
2807
2808 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2810 Ok(())
2811 }
2812
2813 #[async_test]
2814 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2815 let server = MockServer::start().await;
2816 let client = logged_in_client(Some(server.uri())).await;
2817
2818 let _mock_guard = Mock::given(SlidingSyncMatcher)
2819 .respond_with(ResponseTemplate::new(404))
2820 .mount_as_scoped(&server)
2821 .await;
2822
2823 let sliding_sync = client
2824 .sliding_sync("test")?
2825 .with_to_device_extension(
2826 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2827 )
2828 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2829 .build()
2830 .await?;
2831
2832 let sliding_sync = Arc::new(sliding_sync);
2833
2834 let sync_beat_listener = client.inner.sync_beat.listen();
2836 let sync_result = sliding_sync.sync_once().await;
2837 assert!(sync_result.is_err());
2838
2839 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2841
2842 Ok(())
2843 }
2844
2845 #[async_test]
2846 async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2847 let server = MatrixMockServer::new().await;
2848 let client = server.client_builder().build().await;
2849 let room_id = room_id!("!mu5hr00m:example.org");
2850
2851 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2852 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2853 "pos": "0",
2854 "lists": {},
2855 "extensions": {
2856 "account_data": {
2857 "global": [
2858 {
2859 "type": "m.direct",
2860 "content": {
2861 "@de4dlockh0lmes:example.org": [
2862 "!mu5hr00m:example.org"
2863 ]
2864 }
2865 }
2866 ]
2867 }
2868 },
2869 "rooms": {
2870 room_id: {
2871 "name": "Mario Bros Fanbase Room",
2872 "initial": true,
2873 },
2874 }
2875 })))
2876 .mount_as_scoped(server.server())
2877 .await;
2878
2879 let f = EventFactory::new().room(room_id);
2880
2881 Mock::given(method("GET"))
2882 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2883 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2884 "chunk": [
2885 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2886 ]
2887 })))
2888 .mount(server.server())
2889 .await;
2890
2891 let (tx, rx) = tokio::sync::oneshot::channel();
2892
2893 let tx = Arc::new(Mutex::new(Some(tx)));
2894 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2895 let members =
2897 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2898 assert_eq!(members.len(), 1);
2899 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2900 });
2901
2902 let sliding_sync = client
2903 .sliding_sync("test")?
2904 .add_list(SlidingSyncList::builder("thelist"))
2905 .with_account_data_extension(
2906 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2907 )
2908 .build()
2909 .await?;
2910
2911 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2912 .await
2913 .expect("Sync did not complete in time")
2914 .expect("Sync failed");
2915
2916 tokio::time::timeout(Duration::from_secs(5), rx)
2918 .await
2919 .expect("Event handler did not complete in time")
2920 .expect("Event handler failed");
2921
2922 Ok(())
2923 }
2924}