1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23
24use std::{
25 collections::{BTreeMap, btree_map::Entry},
26 fmt::Debug,
27 future::Future,
28 sync::{Arc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard},
29 time::Duration,
30};
31
32use async_stream::stream;
33pub use client::{Version, VersionBuilder};
34use futures_core::stream::Stream;
35use matrix_sdk_base::RequestedRequiredStates;
36#[cfg(feature = "e2e-encryption")]
37use matrix_sdk_common::executor::JoinHandleExt as _;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40 OwnedRoomId, RoomId,
41 api::{client::sync::sync_events::v5 as http, error::ErrorKind},
42 assign,
43};
44use tokio::{
45 select,
46 sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
47};
48use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
49
50pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
51use self::{cache::restore_sliding_sync_state, client::SlidingSyncResponseProcessor};
52use crate::{Client, Result, config::RequestConfig};
53
54#[derive(Clone, Debug)]
58pub struct SlidingSync {
59 inner: Arc<SlidingSyncInner>,
61}
62
63#[derive(Debug)]
64pub(super) struct SlidingSyncInner {
65 id: String,
69
70 client: Client,
72
73 poll_timeout: Duration,
75
76 network_timeout: Duration,
79
80 storage_key: String,
82
83 share_pos: bool,
90
91 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
104
105 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
107
108 room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, http::request::RoomSubscription>>,
111
112 extensions: http::request::Extensions,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127 cache::store_sliding_sync_state(self, position).await
128 }
129
130 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132 SlidingSyncBuilder::new(id, client)
133 }
134
135 pub fn subscribe_to_rooms(
142 &self,
143 room_ids: &[&RoomId],
144 settings: Option<http::request::RoomSubscription>,
145 cancel_in_flight_request: bool,
146 ) {
147 if subscribe_to_rooms(
148 self.inner.room_subscriptions.write().unwrap(),
149 &self.inner.client,
150 room_ids,
151 settings,
152 cancel_in_flight_request,
153 ) {
154 self.inner.internal_channel_send_if_possible(
155 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
156 );
157 }
158 }
159
160 pub fn unsubscribe_to_rooms(&self, room_ids: &[&RoomId], cancel_in_flight_request: bool) {
162 let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
163 let mut skip_over_current_sync_loop_iteration = false;
164
165 for room_id in room_ids {
166 if room_subscriptions.remove(*room_id).is_some() {
167 skip_over_current_sync_loop_iteration = true;
168 }
169 }
170
171 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172 self.inner.internal_channel_send_if_possible(
173 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174 );
175 }
176 }
177
178 pub fn clear_and_subscribe_to_rooms(
183 &self,
184 room_ids: &[&RoomId],
185 settings: Option<http::request::RoomSubscription>,
186 cancel_in_flight_request: bool,
187 ) {
188 let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
189 room_subscriptions.clear();
190
191 if subscribe_to_rooms(
192 room_subscriptions,
193 &self.inner.client,
194 room_ids,
195 settings,
196 cancel_in_flight_request,
197 ) {
198 self.inner.internal_channel_send_if_possible(
199 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
200 );
201 }
202 }
203
204 pub async fn on_list<Function, FunctionOutput, R>(
206 &self,
207 list_name: &str,
208 function: Function,
209 ) -> Option<R>
210 where
211 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
212 FunctionOutput: Future<Output = R>,
213 {
214 let lists = self.inner.lists.read().await;
215
216 match lists.get(list_name) {
217 Some(list) => Some(function(list).await),
218 None => None,
219 }
220 }
221
222 pub async fn add_list(
228 &self,
229 list_builder: SlidingSyncListBuilder,
230 ) -> Result<Option<SlidingSyncList>> {
231 let list = list_builder.build(self.inner.internal_channel.clone());
232
233 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
234
235 self.inner.internal_channel_send_if_possible(
236 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
237 );
238
239 Ok(old_list)
240 }
241
242 pub async fn add_cached_list(
249 &self,
250 mut list_builder: SlidingSyncListBuilder,
251 ) -> Result<Option<SlidingSyncList>> {
252 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
253
254 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
255
256 self.add_list(list_builder).await
257 }
258
259 #[instrument(skip_all)]
261 async fn handle_response(
262 &self,
263 mut sliding_sync_response: http::Response,
264 position: &mut SlidingSyncPositionMarkers,
265 requested_required_states: RequestedRequiredStates,
266 ) -> Result<UpdateSummary, crate::Error> {
267 let pos = Some(sliding_sync_response.pos.clone());
268
269 let must_process_rooms_response = self.must_process_rooms_response().await;
270
271 trace!(yes = must_process_rooms_response, "Must process rooms response?");
272
273 let sync_response = {
281 let _timer = timer!("response processor");
282
283 let response_processor = {
284 let state_store_guard = {
287 let _timer = timer!("acquiring the `state_store_lock`");
288
289 self.inner.client.base_client().state_store_lock().lock().await
290 };
291
292 let mut response_processor =
293 SlidingSyncResponseProcessor::new(self.inner.client.clone());
294
295 if self.is_thread_subscriptions_enabled() {
301 response_processor
302 .handle_thread_subscriptions(
303 position.pos.as_deref(),
304 std::mem::take(
305 &mut sliding_sync_response.extensions.thread_subscriptions,
306 ),
307 )
308 .await?;
309 }
310
311 #[cfg(feature = "e2e-encryption")]
312 if self.is_e2ee_enabled() {
313 response_processor
314 .handle_encryption(&sliding_sync_response.extensions, &state_store_guard)
315 .await?
316 }
317
318 if must_process_rooms_response {
321 response_processor
322 .handle_room_response(
323 &sliding_sync_response,
324 &requested_required_states,
325 &state_store_guard,
326 )
327 .await?;
328 }
329
330 response_processor
331 };
332
333 response_processor.process_and_take_response().await?
335 };
336
337 debug!("Sliding Sync response has been handled by the client");
338 trace!(?sync_response);
339
340 let update_summary = {
341 let updated_rooms = {
343 let mut updated_rooms = Vec::with_capacity(
344 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
345 );
346
347 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
348
349 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
357
358 updated_rooms
359 };
360
361 let updated_lists = {
363 debug!(
364 lists = ?sliding_sync_response.lists,
365 "Update lists"
366 );
367
368 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
369 let mut lists = self.inner.lists.write().await;
370
371 for (name, list) in lists.iter_mut() {
374 if let Some(updates) = sliding_sync_response.lists.get(name) {
375 let maximum_number_of_rooms: u32 =
376 updates.count.try_into().expect("failed to convert `count` to `u32`");
377
378 if list.update(Some(maximum_number_of_rooms))? {
379 updated_lists.push(name.clone());
380 }
381 } else if list.update(None)? {
382 updated_lists.push(name.clone());
383 }
384 }
385
386 for name in sliding_sync_response.lists.keys() {
388 if !lists.contains_key(name) {
389 error!("Response for list `{name}` - unknown to us; skipping");
390 }
391 }
392
393 updated_lists
394 };
395
396 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
397 };
398
399 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
403
404 position.pos = pos;
405
406 Ok(update_summary)
407 }
408
409 async fn generate_sync_request(
410 &self,
411 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
412 let mut requests_lists = BTreeMap::new();
414
415 let timeout = {
416 let lists = self.inner.lists.read().await;
417
418 let mut timeout = PollTimeout::Default;
420
421 for (name, list) in lists.iter() {
422 requests_lists.insert(name.clone(), list.next_request()?);
423 timeout = timeout.min(list.requires_timeout());
424 }
425
426 timeout
427 };
428
429 let mut position_guard = {
437 debug!("Waiting to acquire the `position` lock");
438
439 let _timer = timer!("acquiring the `position` lock");
440
441 self.inner.position.clone().lock_owned().await
442 };
443
444 debug!(pos = ?position_guard.pos, "Got a position");
445
446 let to_device_enabled = self.inner.extensions.to_device.enabled == Some(true);
447
448 let restored_fields = if self.inner.share_pos || to_device_enabled {
449 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
450 } else {
451 None
452 };
453
454 let pos = if self.inner.share_pos {
457 if let Some(fields) = &restored_fields {
458 if fields.pos != position_guard.pos {
460 info!(
461 "Pos from previous request ('{:?}') was different from \
462 pos in database ('{:?}').",
463 position_guard.pos, fields.pos
464 );
465 position_guard.pos = fields.pos.clone();
466 }
467 fields.pos.clone()
468 } else {
469 position_guard.pos.clone()
470 }
471 } else {
472 position_guard.pos.clone()
473 };
474
475 #[cfg(feature = "e2e-encryption")]
484 if pos.is_none() && self.is_e2ee_enabled() {
485 info!("Marking all tracked users as dirty");
486
487 let olm_machine = self.inner.client.olm_machine().await;
488 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
489 olm_machine.mark_all_tracked_users_as_dirty().await?;
490 }
491
492 let timeout = match timeout {
497 PollTimeout::None => None,
498 PollTimeout::Some(timeout) => Some(Duration::from_secs(timeout.into())),
499 PollTimeout::Default => Some(self.inner.poll_timeout),
500 };
501
502 Span::current()
503 .record("pos", &pos)
504 .record("timeout", timeout.map(|duration| duration.as_millis()));
505
506 let mut request = assign!(http::Request::new(), {
507 conn_id: Some(self.inner.id.clone()),
508 pos,
509 timeout,
510 lists: requests_lists,
511 });
512
513 request.room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
515
516 request.extensions = self.inner.extensions.clone();
518
519 if to_device_enabled {
521 request.extensions.to_device.since =
522 restored_fields.and_then(|fields| fields.to_device_token);
523 }
524
525 Ok((
526 request,
528 RequestConfig::default()
531 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
532 .retry_limit(3),
533 position_guard,
534 ))
535 }
536
537 async fn send_sync_request(
541 &self,
542 request: http::Request,
543 request_config: RequestConfig,
544 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
545 ) -> Result<UpdateSummary> {
546 debug!("Sending request");
547
548 let requested_required_states = RequestedRequiredStates::from(&request);
550 let request = self.inner.client.send(request).with_request_config(request_config);
551
552 #[cfg(feature = "e2e-encryption")]
559 let response = {
560 if self.is_e2ee_enabled() {
561 let client = self.inner.client.clone();
578 let e2ee_uploads = spawn(
579 async move {
580 if let Err(error) = client.send_outgoing_requests().await {
581 error!(?error, "Error while sending outgoing E2EE requests");
582 }
583 }
584 .instrument(Span::current()),
585 )
586 .abort_on_drop();
589
590 let response = request.await?;
592
593 e2ee_uploads.await.map_err(|error| Error::JoinError {
598 task_description: "e2ee_uploads".to_owned(),
599 error,
600 })?;
601
602 response
603 } else {
604 request.await?
605 }
606 };
607
608 #[cfg(not(feature = "e2e-encryption"))]
610 let response = request.await?;
611
612 debug!("Received response");
613
614 let this = self.clone();
624
625 let future = async move {
628 debug!("Start handling response");
629
630 let updates = this
636 .handle_response(response, &mut position_guard, requested_required_states)
637 .await?;
638
639 this.cache_to_storage(&position_guard).await?;
640
641 drop(position_guard);
644
645 debug!("Done handling response");
646
647 Ok(updates)
648 };
649
650 spawn(future.instrument(Span::current())).await.map_err(|error| Error::JoinError {
651 task_description: "handle_response".to_owned(),
652 error,
653 })?
654 }
655
656 #[cfg(feature = "e2e-encryption")]
658 fn is_e2ee_enabled(&self) -> bool {
659 self.inner.extensions.e2ee.enabled == Some(true)
660 }
661
662 fn is_thread_subscriptions_enabled(&self) -> bool {
665 self.inner.extensions.thread_subscriptions.enabled == Some(true)
666 }
667
668 #[cfg(not(feature = "e2e-encryption"))]
669 fn is_e2ee_enabled(&self) -> bool {
670 false
671 }
672
673 async fn must_process_rooms_response(&self) -> bool {
675 !self.inner.room_subscriptions.read().unwrap().is_empty()
678 || !self.inner.lists.read().await.is_empty()
679 }
680
681 #[doc(hidden)]
685 #[instrument(skip_all, fields(conn_id = self.inner.id, pos, timeout))]
686 pub async fn sync_once(&self) -> Result<UpdateSummary> {
687 let (request, request_config, position_guard) = self.generate_sync_request().await?;
688
689 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
691
692 self.inner.client.inner.sync_beat.notify(usize::MAX);
694
695 Ok(summaries)
696 }
697
698 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
708 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
709 debug!("Starting sync stream");
710
711 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
712
713 stream! {
714 loop {
715 debug!("Sync stream is running");
716
717 select! {
718 biased;
719
720 internal_message = internal_channel_receiver.recv() => {
721 use SlidingSyncInternalMessage::*;
722
723 debug!(?internal_message, "Sync stream has received an internal message");
724
725 match internal_message {
726 Err(_) | Ok(SyncLoopStop) => {
727 break;
728 }
729
730 Ok(SyncLoopSkipOverCurrentIteration) => {
731 continue;
732 }
733 }
734 }
735
736 update_summary = self.sync_once() => {
737 match update_summary {
738 Ok(updates) => {
739 yield Ok(updates);
740 }
741
742 Err(error) => {
744 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
745 self.expire_session().await;
747 }
748
749 yield Err(error);
750
751 break;
753 }
754 }
755 }
756 }
757 }
758
759 debug!("Sync stream has exited.");
760 }
761 }
762
763 pub fn stop_sync(&self) -> Result<()> {
772 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
773 }
774
775 #[doc(hidden)]
786 pub async fn expire_session(&self) {
787 info!("Session expired; resetting `pos`");
788
789 {
790 let lists = self.inner.lists.read().await;
791
792 for list in lists.values() {
793 list.set_maximum_number_of_rooms(None);
795 }
796 }
797
798 {
800 let mut position = self.inner.position.lock().await;
801
802 position.pos = None;
804
805 if let Err(err) = self.cache_to_storage(&position).await {
809 warn!("Failed to invalidate cached sliding sync state: {err}");
810 }
811 }
812
813 {
814 self.inner.room_subscriptions.write().unwrap().clear();
817 }
818 }
819}
820
821fn subscribe_to_rooms(
824 mut room_subscriptions: StdRwLockWriteGuard<
825 '_,
826 BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
827 >,
828 client: &Client,
829 room_ids: &[&RoomId],
830 settings: Option<http::request::RoomSubscription>,
831 cancel_in_flight_request: bool,
832) -> bool {
833 let settings = settings.unwrap_or_default();
834 let mut skip_over_current_sync_loop_iteration = false;
835
836 for room_id in room_ids {
837 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
838 if let Some(room) = client.get_room(room_id) {
839 room.mark_members_missing();
840 }
841
842 entry.insert(settings.clone());
843
844 skip_over_current_sync_loop_iteration = true;
845 }
846 }
847
848 cancel_in_flight_request && skip_over_current_sync_loop_iteration
849}
850
851impl SlidingSyncInner {
852 #[instrument]
854 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
855 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
856 }
857
858 #[instrument]
861 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
862 let _ = self.internal_channel.send(message);
864 }
865}
866
867#[derive(Copy, Clone, Debug, PartialEq)]
868enum SlidingSyncInternalMessage {
869 SyncLoopStop,
871
872 SyncLoopSkipOverCurrentIteration,
875}
876
877#[cfg(any(test, feature = "testing"))]
878impl SlidingSync {
879 pub async fn set_pos(&self, new_pos: String) {
881 let mut position_lock = self.inner.position.lock().await;
882 position_lock.pos = Some(new_pos);
883 }
884}
885
886#[derive(Clone, Debug)]
887pub(super) struct SlidingSyncPositionMarkers {
888 pos: Option<String>,
891}
892
893#[derive(Debug, Clone)]
896pub struct UpdateSummary {
897 pub lists: Vec<String>,
899 pub rooms: Vec<OwnedRoomId>,
901}
902
903#[derive(Debug)]
912pub enum PollTimeout {
913 None,
915
916 Some(u32),
919
920 Default,
923}
924
925impl PollTimeout {
926 fn min(self, left: Self) -> Self {
937 match (self, left) {
938 (Self::None, _) => Self::None,
939
940 (Self::Some(_), Self::None) => Self::None,
941 (Self::Some(right), Self::Some(left)) => Self::Some(right.min(left)),
942 (Self::Some(right), Self::Default) => Self::Some(right),
943
944 (Self::Default, Self::None) => Self::None,
945 (Self::Default, Self::Some(left)) => Self::Some(left),
946 (Self::Default, Self::Default) => Self::Default,
947 }
948 }
949}
950
951#[cfg(all(test, not(target_family = "wasm")))]
952#[allow(clippy::dbg_macro)]
953mod tests {
954 use std::{
955 collections::BTreeMap,
956 future::ready,
957 ops::Not,
958 sync::{Arc, Mutex},
959 time::Duration,
960 };
961
962 use assert_matches::assert_matches;
963 use event_listener::Listener;
964 use futures_util::{StreamExt, future::join_all, pin_mut};
965 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
966 use matrix_sdk_common::executor::spawn;
967 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
968 use ruma::{
969 OwnedRoomId, assign,
970 events::{direct::DirectEvent, room::member::MembershipState},
971 owned_room_id, room_id,
972 serde::Raw,
973 uint,
974 };
975 use serde::Deserialize;
976 use serde_json::json;
977 use wiremock::{
978 Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
979 };
980
981 use super::{
982 SlidingSync, SlidingSyncBuilder, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
983 cache::restore_sliding_sync_state, http,
984 };
985 use crate::{
986 Client, Result,
987 test_utils::{logged_in_client, mocks::MatrixMockServer},
988 };
989
990 #[derive(Copy, Clone)]
991 struct SlidingSyncMatcher;
992
993 impl Match for SlidingSyncMatcher {
994 fn matches(&self, request: &Request) -> bool {
995 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
996 && request.method == Method::POST
997 }
998 }
999
1000 async fn new_sliding_sync(
1001 lists: Vec<SlidingSyncListBuilder>,
1002 ) -> Result<(MockServer, SlidingSync)> {
1003 let server = MockServer::start().await;
1004 let client = logged_in_client(Some(server.uri())).await;
1005
1006 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1007
1008 for list in lists {
1009 sliding_sync_builder = sliding_sync_builder.add_list(list);
1010 }
1011
1012 let sliding_sync = sliding_sync_builder.build().await?;
1013
1014 Ok((server, sliding_sync))
1015 }
1016
1017 #[async_test]
1018 async fn test_subscribe_to_rooms() -> Result<()> {
1019 let (server, sliding_sync) = new_sliding_sync(vec![
1020 SlidingSyncList::builder("foo")
1021 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1022 ])
1023 .await?;
1024
1025 let stream = sliding_sync.sync();
1026 pin_mut!(stream);
1027
1028 let room_id_0 = room_id!("!r0:bar.org");
1029 let room_id_1 = room_id!("!r1:bar.org");
1030 let room_id_2 = room_id!("!r2:bar.org");
1031
1032 {
1033 let _mock_guard = Mock::given(SlidingSyncMatcher)
1034 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1035 "pos": "1",
1036 "lists": {},
1037 "rooms": {
1038 room_id_0: {
1039 "name": "Room #0",
1040 "initial": true,
1041 },
1042 room_id_1: {
1043 "name": "Room #1",
1044 "initial": true,
1045 },
1046 room_id_2: {
1047 "name": "Room #2",
1048 "initial": true,
1049 },
1050 }
1051 })))
1052 .mount_as_scoped(&server)
1053 .await;
1054
1055 let _ = stream.next().await.unwrap()?;
1056 }
1057
1058 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1059
1060 assert!(room0.are_members_synced().not());
1064
1065 {
1066 struct MemberMatcher(OwnedRoomId);
1067
1068 impl Match for MemberMatcher {
1069 fn matches(&self, request: &Request) -> bool {
1070 request.url.path()
1071 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1072 && request.method == Method::GET
1073 }
1074 }
1075
1076 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1077 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1078 "chunk": [],
1079 })))
1080 .mount_as_scoped(&server)
1081 .await;
1082
1083 assert_matches!(room0.request_members().await, Ok(()));
1084 }
1085
1086 assert!(room0.are_members_synced());
1088
1089 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1090
1091 assert!(room0.are_members_synced().not());
1094
1095 {
1096 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1097
1098 assert!(room_subscriptions.contains_key(room_id_0));
1099 assert!(room_subscriptions.contains_key(room_id_1));
1100 assert!(!room_subscriptions.contains_key(room_id_2));
1101 }
1102
1103 {
1106 struct MemberMatcher(OwnedRoomId);
1107
1108 impl Match for MemberMatcher {
1109 fn matches(&self, request: &Request) -> bool {
1110 request.url.path()
1111 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1112 && request.method == Method::GET
1113 }
1114 }
1115
1116 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1117 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1118 "chunk": [],
1119 })))
1120 .mount_as_scoped(&server)
1121 .await;
1122
1123 assert_matches!(room0.request_members().await, Ok(()));
1124 }
1125
1126 assert!(room0.are_members_synced());
1128
1129 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1130
1131 assert!(room0.are_members_synced());
1134
1135 Ok(())
1136 }
1137
1138 #[async_test]
1139 async fn test_subscribe_unsubscribe_and_clear_and_subscribe_to_rooms() -> Result<()> {
1140 let (_server, sliding_sync) = new_sliding_sync(vec![
1141 SlidingSyncList::builder("foo")
1142 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1143 ])
1144 .await?;
1145
1146 let room_id_0 = room_id!("!r0:bar.org");
1147 let room_id_1 = room_id!("!r1:bar.org");
1148 let room_id_2 = room_id!("!r2:bar.org");
1149 let room_id_3 = room_id!("!r3:bar.org");
1150
1151 {
1153 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1154
1155 assert!(room_subscriptions.is_empty());
1156 }
1157
1158 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1160
1161 {
1162 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1163
1164 assert_eq!(room_subscriptions.len(), 2);
1165 assert!(room_subscriptions.contains_key(room_id_0));
1166 assert!(room_subscriptions.contains_key(room_id_1));
1167 }
1168
1169 sliding_sync.unsubscribe_to_rooms(&[room_id_0], false);
1171
1172 {
1173 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1174
1175 assert_eq!(room_subscriptions.len(), 1);
1176 assert!(room_subscriptions.contains_key(room_id_1));
1177 }
1178
1179 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1181
1182 {
1183 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1184
1185 assert_eq!(room_subscriptions.len(), 2);
1186 assert!(room_subscriptions.contains_key(room_id_0));
1187 assert!(room_subscriptions.contains_key(room_id_1));
1188 }
1189
1190 sliding_sync.clear_and_subscribe_to_rooms(
1192 &[room_id_2, room_id_3],
1193 Default::default(),
1194 false,
1195 );
1196
1197 {
1198 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1199
1200 assert_eq!(room_subscriptions.len(), 2);
1201 assert!(room_subscriptions.contains_key(room_id_2));
1202 assert!(room_subscriptions.contains_key(room_id_3));
1203 }
1204
1205 Ok(())
1206 }
1207
1208 #[async_test]
1209 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1210 let (_server, sliding_sync) = new_sliding_sync(vec![
1211 SlidingSyncList::builder("foo")
1212 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1213 ])
1214 .await?;
1215
1216 let room_id_0 = room_id!("!r0:bar.org");
1217 let room_id_1 = room_id!("!r1:bar.org");
1218 let room_id_2 = room_id!("!r2:bar.org");
1219
1220 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1222
1223 {
1224 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1225
1226 assert!(room_subscriptions.contains_key(room_id_0));
1227 assert!(room_subscriptions.contains_key(room_id_1));
1228 assert!(room_subscriptions.contains_key(room_id_2).not());
1229 }
1230
1231 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1233
1234 {
1235 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1236
1237 assert!(room_subscriptions.contains_key(room_id_0));
1238 assert!(room_subscriptions.contains_key(room_id_1));
1239 assert!(room_subscriptions.contains_key(room_id_2));
1240 }
1241
1242 sliding_sync.expire_session().await;
1244
1245 {
1246 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1247
1248 assert!(room_subscriptions.is_empty());
1249 }
1250
1251 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1253
1254 {
1255 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1256
1257 assert!(room_subscriptions.contains_key(room_id_0).not());
1258 assert!(room_subscriptions.contains_key(room_id_1).not());
1259 assert!(room_subscriptions.contains_key(room_id_2));
1260 }
1261
1262 Ok(())
1263 }
1264
1265 #[async_test]
1266 async fn test_add_list() -> Result<()> {
1267 let (_server, sliding_sync) = new_sliding_sync(vec![
1268 SlidingSyncList::builder("foo")
1269 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1270 ])
1271 .await?;
1272
1273 let _stream = sliding_sync.sync();
1274 pin_mut!(_stream);
1275
1276 sliding_sync
1277 .add_list(
1278 SlidingSyncList::builder("bar")
1279 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1280 )
1281 .await?;
1282
1283 let lists = sliding_sync.inner.lists.read().await;
1284
1285 assert!(lists.contains_key("foo"));
1286 assert!(lists.contains_key("bar"));
1287
1288 Ok(())
1291 }
1292
1293 #[cfg(feature = "e2e-encryption")]
1294 #[async_test]
1295 async fn test_extensions_to_device_since_is_set() {
1296 use matrix_sdk_base::crypto::store::types::Changes;
1297
1298 let client = logged_in_client(None).await;
1299 let sliding_sync = SlidingSyncBuilder::new("foo".to_owned(), client.clone())
1300 .unwrap()
1301 .with_to_device_extension(assign!(
1302 http::request::ToDevice::default(),
1303 {
1304 enabled: Some(true),
1305 }
1306 ))
1307 .build()
1308 .await
1309 .unwrap();
1310
1311 {
1313 let to_device = &sliding_sync.inner.extensions.to_device;
1314
1315 assert_eq!(to_device.enabled, Some(true));
1316 assert!(to_device.since.is_none());
1317 }
1318
1319 {
1321 let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1322
1323 let to_device = &request.extensions.to_device;
1324
1325 assert_eq!(to_device.enabled, Some(true));
1326 assert!(to_device.since.is_none());
1327 }
1328
1329 let since_token = "depuis".to_owned();
1331
1332 {
1333 if let Some(olm_machine) = &*client.olm_machine().await {
1334 olm_machine
1335 .store()
1336 .save_changes(Changes {
1337 next_batch_token: Some(since_token.clone()),
1338 ..Default::default()
1339 })
1340 .await
1341 .unwrap();
1342 } else {
1343 panic!("Where is the Olm machine?");
1344 }
1345 }
1346
1347 {
1349 let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1350
1351 let to_device = &request.extensions.to_device;
1352
1353 assert_eq!(to_device.enabled, Some(true));
1354 assert_eq!(to_device.since, Some(since_token));
1355 }
1356 }
1357
1358 #[async_test]
1364 #[cfg(feature = "e2e-encryption")]
1365 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1366 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1367 use matrix_sdk_test::ruma_response_from_json;
1368 use ruma::user_id;
1369
1370 let server = MockServer::start().await;
1371 let client = logged_in_client(Some(server.uri())).await;
1372
1373 let alice = user_id!("@alice:localhost");
1374 let bob = user_id!("@bob:localhost");
1375 let me = user_id!("@example:localhost");
1376
1377 {
1380 let olm_machine = client.olm_machine().await;
1381 let olm_machine = olm_machine.as_ref().unwrap();
1382
1383 olm_machine.update_tracked_users([alice, bob]).await?;
1384
1385 let outgoing_requests = olm_machine.outgoing_requests().await?;
1387
1388 assert_eq!(outgoing_requests.len(), 2);
1389 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1390 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1391
1392 olm_machine
1394 .mark_request_as_sent(
1395 outgoing_requests[0].request_id(),
1396 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1397 "one_time_key_counts": {}
1398 }))),
1399 )
1400 .await?;
1401
1402 olm_machine
1403 .mark_request_as_sent(
1404 outgoing_requests[1].request_id(),
1405 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1406 "device_keys": {
1407 alice: {},
1408 bob: {},
1409 }
1410 }))),
1411 )
1412 .await?;
1413
1414 let outgoing_requests = olm_machine.outgoing_requests().await?;
1416
1417 assert_eq!(outgoing_requests.len(), 1);
1418 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1419
1420 olm_machine
1421 .mark_request_as_sent(
1422 outgoing_requests[0].request_id(),
1423 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1424 "device_keys": {
1425 me: {},
1426 }
1427 }))),
1428 )
1429 .await?;
1430
1431 let outgoing_requests = olm_machine.outgoing_requests().await?;
1433
1434 assert!(outgoing_requests.is_empty());
1435 }
1436
1437 let sync = client
1438 .sliding_sync("test-slidingsync")?
1439 .add_list(SlidingSyncList::builder("new_list"))
1440 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1441 .build()
1442 .await?;
1443
1444 let (_request, _, _) = sync.generate_sync_request().await?;
1446
1447 {
1449 let olm_machine = client.olm_machine().await;
1450 let olm_machine = olm_machine.as_ref().unwrap();
1451
1452 let outgoing_requests = olm_machine.outgoing_requests().await?;
1454
1455 assert_eq!(outgoing_requests.len(), 1);
1456 assert_matches!(
1457 outgoing_requests[0].request(),
1458 AnyOutgoingRequest::KeysQuery(request) => {
1459 assert!(request.device_keys.contains_key(alice));
1460 assert!(request.device_keys.contains_key(bob));
1461 assert!(request.device_keys.contains_key(me));
1462 }
1463 );
1464
1465 olm_machine
1467 .mark_request_as_sent(
1468 outgoing_requests[0].request_id(),
1469 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1470 "device_keys": {
1471 alice: {},
1472 bob: {},
1473 me: {},
1474 }
1475 }))),
1476 )
1477 .await?;
1478 }
1479
1480 sync.set_pos("chocolat".to_owned()).await;
1482
1483 let (_request, _, _) = sync.generate_sync_request().await?;
1484
1485 {
1487 let olm_machine = client.olm_machine().await;
1488 let olm_machine = olm_machine.as_ref().unwrap();
1489
1490 let outgoing_requests = olm_machine.outgoing_requests().await?;
1492
1493 assert!(outgoing_requests.is_empty());
1494 }
1495
1496 Ok(())
1497 }
1498
1499 #[cfg(feature = "e2e-encryption")]
1500 #[async_test]
1501 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1502 let server = MockServer::start().await;
1503
1504 #[derive(Deserialize)]
1505 struct PartialRequest {
1506 txn_id: Option<String>,
1507 }
1508
1509 let server_pos = Arc::new(Mutex::new(0));
1510 let _mock_guard = Mock::given(SlidingSyncMatcher)
1511 .respond_with(move |request: &Request| {
1512 let request: PartialRequest = request.body_json().unwrap();
1514 let pos = {
1515 let mut pos = server_pos.lock().unwrap();
1516 let prev = *pos;
1517 *pos += 1;
1518 prev
1519 };
1520
1521 ResponseTemplate::new(200).set_body_json(json!({
1522 "txn_id": request.txn_id,
1523 "pos": pos.to_string(),
1524 }))
1525 })
1526 .mount_as_scoped(&server)
1527 .await;
1528
1529 let client = logged_in_client(Some(server.uri())).await;
1530
1531 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1532
1533 {
1535 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1536
1537 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1538 assert!(request.pos.is_none());
1539 }
1540
1541 let sync = sliding_sync.sync();
1542 pin_mut!(sync);
1543
1544 let next = sync.next().await;
1547 assert_matches!(next, Some(Ok(_update_summary)));
1548
1549 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1550
1551 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1552 .await?
1553 .expect("must have restored fields");
1554
1555 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1558
1559 {
1563 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1564
1565 let mut position_guard = other_sync.inner.position.lock().await;
1566 position_guard.pos = Some("yolo".to_owned());
1567
1568 other_sync.cache_to_storage(&position_guard).await?;
1569 }
1570
1571 {
1573 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1574 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1575 assert_eq!(request.pos.as_deref(), Some("0"));
1576 }
1577
1578 {
1581 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1582 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1583 }
1584
1585 Ok(())
1586 }
1587
1588 #[cfg(feature = "e2e-encryption")]
1589 #[async_test]
1590 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1591 let server = MockServer::start().await;
1592
1593 #[derive(Deserialize)]
1594 struct PartialRequest {
1595 txn_id: Option<String>,
1596 }
1597
1598 let server_pos = Arc::new(Mutex::new(0));
1599 let _mock_guard = Mock::given(SlidingSyncMatcher)
1600 .respond_with(move |request: &Request| {
1601 let request: PartialRequest = request.body_json().unwrap();
1603 let pos = {
1604 let mut pos = server_pos.lock().unwrap();
1605 let prev = *pos;
1606 *pos += 1;
1607 prev
1608 };
1609
1610 ResponseTemplate::new(200).set_body_json(json!({
1611 "txn_id": request.txn_id,
1612 "pos": pos.to_string(),
1613 }))
1614 })
1615 .mount_as_scoped(&server)
1616 .await;
1617
1618 let client = logged_in_client(Some(server.uri())).await;
1619
1620 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1621
1622 {
1624 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1625
1626 assert!(request.pos.is_none());
1627 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1628 }
1629
1630 let sync = sliding_sync.sync();
1631 pin_mut!(sync);
1632
1633 let next = sync.next().await;
1636 assert_matches!(next, Some(Ok(_update_summary)));
1637
1638 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1639
1640 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1641 .await?
1642 .expect("must have restored fields");
1643
1644 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1647
1648 {
1650 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1651
1652 let mut position_guard = other_sync.inner.position.lock().await;
1653 position_guard.pos = Some("42".to_owned());
1654
1655 other_sync.cache_to_storage(&position_guard).await?;
1656 }
1657
1658 {
1660 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1661 assert_eq!(request.pos.as_deref(), Some("42"));
1662 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1663 }
1664
1665 {
1667 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1668 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1669
1670 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1671 assert_eq!(request.pos.as_deref(), Some("42"));
1672 }
1673
1674 sliding_sync.expire_session().await;
1677
1678 {
1679 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1680
1681 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1682 assert!(request.pos.is_none());
1683 }
1684
1685 {
1687 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1688 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1689
1690 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1691 assert!(request.pos.is_none());
1692 }
1693
1694 Ok(())
1695 }
1696
1697 #[async_test]
1698 async fn test_stop_sync_loop() -> Result<()> {
1699 let (_server, sliding_sync) = new_sliding_sync(vec![
1700 SlidingSyncList::builder("foo")
1701 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1702 ])
1703 .await?;
1704
1705 let stream = sliding_sync.sync();
1707 pin_mut!(stream);
1708
1709 assert!(stream.next().await.is_some());
1711
1712 sliding_sync.stop_sync()?;
1714
1715 assert!(stream.next().await.is_none());
1717
1718 let stream = sliding_sync.sync();
1720 pin_mut!(stream);
1721
1722 assert!(stream.next().await.is_some());
1724
1725 Ok(())
1726 }
1727
1728 #[async_test]
1729 async fn test_process_read_receipts() -> Result<()> {
1730 let room = owned_room_id!("!pony:example.org");
1731
1732 let server = MockServer::start().await;
1733 let client = logged_in_client(Some(server.uri())).await;
1734 client.event_cache().subscribe().unwrap();
1735
1736 let sliding_sync = client
1737 .sliding_sync("test")?
1738 .with_receipt_extension(
1739 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
1740 )
1741 .add_list(
1742 SlidingSyncList::builder("all")
1743 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1744 )
1745 .build()
1746 .await?;
1747
1748 {
1750 let server_response = assign!(http::Response::new("0".to_owned()), {
1751 rooms: BTreeMap::from([(
1752 room.clone(),
1753 http::response::Room::default(),
1754 )])
1755 });
1756
1757 let _summary = {
1758 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1759 sliding_sync
1760 .handle_response(
1761 server_response.clone(),
1762 &mut pos_guard,
1763 RequestedRequiredStates::default(),
1764 )
1765 .await?
1766 };
1767 }
1768
1769 let server_response = assign!(http::Response::new("1".to_owned()), {
1770 extensions: assign!(http::response::Extensions::default(), {
1771 receipts: assign!(http::response::Receipts::default(), {
1772 rooms: BTreeMap::from([
1773 (
1774 room.clone(),
1775 Raw::from_json_string(
1776 json!({
1777 "room_id": room,
1778 "type": "m.receipt",
1779 "content": {
1780 "$event:bar.org": {
1781 "m.read": {
1782 client.user_id().unwrap(): {
1783 "ts": 1436451550,
1784 }
1785 }
1786 }
1787 }
1788 })
1789 .to_string(),
1790 ).unwrap()
1791 )
1792 ])
1793 })
1794 })
1795 });
1796
1797 let summary = {
1798 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1799 sliding_sync
1800 .handle_response(
1801 server_response.clone(),
1802 &mut pos_guard,
1803 RequestedRequiredStates::default(),
1804 )
1805 .await?
1806 };
1807
1808 assert!(summary.rooms.contains(&room));
1809
1810 Ok(())
1811 }
1812
1813 #[async_test]
1814 async fn test_process_marked_unread_room_account_data() -> Result<()> {
1815 let room_id = owned_room_id!("!unicorn:example.org");
1816
1817 let server = MockServer::start().await;
1818 let client = logged_in_client(Some(server.uri())).await;
1819
1820 let sliding_sync = client
1823 .sliding_sync("test")?
1824 .with_account_data_extension(
1825 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1826 )
1827 .add_list(
1828 SlidingSyncList::builder("all")
1829 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1830 )
1831 .build()
1832 .await?;
1833
1834 {
1836 let server_response = assign!(http::Response::new("0".to_owned()), {
1837 rooms: BTreeMap::from([(
1838 room_id.clone(),
1839 http::response::Room::default(),
1840 )])
1841 });
1842
1843 let _summary = {
1844 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1845 sliding_sync
1846 .handle_response(
1847 server_response.clone(),
1848 &mut pos_guard,
1849 RequestedRequiredStates::default(),
1850 )
1851 .await?
1852 };
1853 }
1854
1855 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
1859
1860 let update_summary = {
1861 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1862 sliding_sync
1863 .handle_response(
1864 server_response.clone(),
1865 &mut pos_guard,
1866 RequestedRequiredStates::default(),
1867 )
1868 .await?
1869 };
1870
1871 assert!(update_summary.rooms.contains(&room_id));
1874
1875 let room = client.get_room(&room_id).unwrap();
1876
1877 assert!(room.is_marked_unread());
1880
1881 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
1884
1885 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1886 sliding_sync
1887 .handle_response(
1888 server_response.clone(),
1889 &mut pos_guard,
1890 RequestedRequiredStates::default(),
1891 )
1892 .await?;
1893
1894 let room = client.get_room(&room_id).unwrap();
1895
1896 assert!(!room.is_marked_unread());
1897
1898 Ok(())
1899 }
1900
1901 fn make_mark_unread_response(
1902 response_number: &str,
1903 room_id: OwnedRoomId,
1904 unread: bool,
1905 add_rooms_section: bool,
1906 ) -> http::Response {
1907 let rooms = if add_rooms_section {
1908 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
1909 } else {
1910 BTreeMap::new()
1911 };
1912
1913 let extensions = assign!(http::response::Extensions::default(), {
1914 account_data: assign!(http::response::AccountData::default(), {
1915 rooms: BTreeMap::from([
1916 (
1917 room_id,
1918 vec![
1919 Raw::from_json_string(
1920 json!({
1921 "content": {
1922 "unread": unread
1923 },
1924 "type": "m.marked_unread"
1925 })
1926 .to_string(),
1927 ).unwrap()
1928 ]
1929 )
1930 ])
1931 })
1932 });
1933
1934 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
1935 }
1936
1937 #[async_test]
1938 async fn test_process_rooms_account_data() -> Result<()> {
1939 let room = owned_room_id!("!pony:example.org");
1940
1941 let server = MockServer::start().await;
1942 let client = logged_in_client(Some(server.uri())).await;
1943
1944 let sliding_sync = client
1945 .sliding_sync("test")?
1946 .with_account_data_extension(
1947 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1948 )
1949 .add_list(
1950 SlidingSyncList::builder("all")
1951 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1952 )
1953 .build()
1954 .await?;
1955
1956 {
1958 let server_response = assign!(http::Response::new("0".to_owned()), {
1959 rooms: BTreeMap::from([(
1960 room.clone(),
1961 http::response::Room::default(),
1962 )])
1963 });
1964
1965 let _summary = {
1966 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1967 sliding_sync
1968 .handle_response(
1969 server_response.clone(),
1970 &mut pos_guard,
1971 RequestedRequiredStates::default(),
1972 )
1973 .await?
1974 };
1975 }
1976
1977 let server_response = assign!(http::Response::new("1".to_owned()), {
1978 extensions: assign!(http::response::Extensions::default(), {
1979 account_data: assign!(http::response::AccountData::default(), {
1980 rooms: BTreeMap::from([
1981 (
1982 room.clone(),
1983 vec![
1984 Raw::from_json_string(
1985 json!({
1986 "content": {
1987 "tags": {
1988 "u.work": {
1989 "order": 0.9
1990 }
1991 }
1992 },
1993 "type": "m.tag"
1994 })
1995 .to_string(),
1996 ).unwrap()
1997 ]
1998 )
1999 ])
2000 })
2001 })
2002 });
2003 let summary = {
2004 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2005 sliding_sync
2006 .handle_response(
2007 server_response.clone(),
2008 &mut pos_guard,
2009 RequestedRequiredStates::default(),
2010 )
2011 .await?
2012 };
2013
2014 assert!(summary.rooms.contains(&room));
2015
2016 Ok(())
2017 }
2018
2019 #[async_test]
2020 #[cfg(feature = "e2e-encryption")]
2021 async fn test_process_only_encryption_events() -> Result<()> {
2022 use ruma::OneTimeKeyAlgorithm;
2023
2024 let room = owned_room_id!("!croissant:example.org");
2025
2026 let server = MockServer::start().await;
2027 let client = logged_in_client(Some(server.uri())).await;
2028
2029 let server_response = assign!(http::Response::new("0".to_owned()), {
2030 rooms: BTreeMap::from([(
2031 room.clone(),
2032 assign!(http::response::Room::default(), {
2033 name: Some("Croissants lovers".to_owned()),
2034 timeline: Vec::new(),
2035 }),
2036 )]),
2037
2038 extensions: assign!(http::response::Extensions::default(), {
2039 e2ee: assign!(http::response::E2EE::default(), {
2040 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2041 }),
2042 to_device: Some(assign!(http::response::ToDevice::default(), {
2043 next_batch: "to-device-token".to_owned(),
2044 })),
2045 })
2046 });
2047
2048 let sliding_sync = client
2052 .sliding_sync("test")?
2053 .with_to_device_extension(
2054 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2055 )
2056 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2057 .build()
2058 .await?;
2059
2060 {
2061 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2062
2063 sliding_sync
2064 .handle_response(
2065 server_response.clone(),
2066 &mut position_guard,
2067 RequestedRequiredStates::default(),
2068 )
2069 .await?;
2070 }
2071
2072 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2074 assert_eq!(uploaded_key_count, 42);
2075
2076 {
2077 let olm_machine = &*client.olm_machine_for_testing().await;
2078 assert_eq!(
2079 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2080 Some("to-device-token")
2081 );
2082 }
2083
2084 assert!(client.get_room(&room).is_none());
2086
2087 let client = logged_in_client(Some(server.uri())).await;
2090
2091 let sliding_sync = client
2092 .sliding_sync("test")?
2093 .add_list(SlidingSyncList::builder("thelist"))
2094 .build()
2095 .await?;
2096
2097 {
2098 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2099
2100 sliding_sync
2101 .handle_response(
2102 server_response.clone(),
2103 &mut position_guard,
2104 RequestedRequiredStates::default(),
2105 )
2106 .await?;
2107 }
2108
2109 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2111 assert_eq!(uploaded_key_count, 0);
2112
2113 {
2114 let olm_machine = &*client.olm_machine_for_testing().await;
2115 assert_eq!(
2116 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2117 None
2118 );
2119 }
2120
2121 assert!(client.get_room(&room).is_some());
2123
2124 let client = logged_in_client(Some(server.uri())).await;
2126
2127 let sliding_sync = client
2128 .sliding_sync("test")?
2129 .add_list(SlidingSyncList::builder("thelist"))
2130 .with_to_device_extension(
2131 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2132 )
2133 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2134 .build()
2135 .await?;
2136
2137 {
2138 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2139
2140 sliding_sync
2141 .handle_response(
2142 server_response.clone(),
2143 &mut position_guard,
2144 RequestedRequiredStates::default(),
2145 )
2146 .await?;
2147 }
2148
2149 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2151 assert_eq!(uploaded_key_count, 42);
2152
2153 {
2154 let olm_machine = &*client.olm_machine_for_testing().await;
2155 assert_eq!(
2156 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2157 Some("to-device-token")
2158 );
2159 }
2160
2161 assert!(client.get_room(&room).is_some());
2163
2164 Ok(())
2165 }
2166
2167 #[async_test]
2168 async fn test_lock_multiple_requests() -> Result<()> {
2169 let server = MockServer::start().await;
2170 let client = logged_in_client(Some(server.uri())).await;
2171
2172 let pos = Arc::new(Mutex::new(0));
2173 let _mock_guard = Mock::given(SlidingSyncMatcher)
2174 .respond_with(move |_: &Request| {
2175 let mut pos = pos.lock().unwrap();
2176 *pos += 1;
2177 ResponseTemplate::new(200).set_body_json(json!({
2178 "pos": pos.to_string(),
2179 "lists": {},
2180 "rooms": {}
2181 }))
2182 })
2183 .mount_as_scoped(&server)
2184 .await;
2185
2186 let sliding_sync = client
2187 .sliding_sync("test")?
2188 .with_to_device_extension(
2189 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2190 )
2191 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2192 .build()
2193 .await?;
2194
2195 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2198
2199 for result in requests.await {
2200 result?;
2201 }
2202
2203 Ok(())
2204 }
2205
2206 #[async_test]
2207 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2208 let server = MockServer::start().await;
2209 let client = logged_in_client(Some(server.uri())).await;
2210
2211 let pos = Arc::new(Mutex::new(0));
2212 let _mock_guard = Mock::given(SlidingSyncMatcher)
2213 .respond_with(move |_: &Request| {
2214 let mut pos = pos.lock().unwrap();
2215 *pos += 1;
2216 ResponseTemplate::new(200)
2218 .set_body_json(json!({
2219 "pos": pos.to_string(),
2220 "lists": {},
2221 "rooms": {}
2222 }))
2223 .set_delay(Duration::from_secs(2))
2224 })
2225 .mount_as_scoped(&server)
2226 .await;
2227
2228 let sliding_sync =
2229 client
2230 .sliding_sync("test")?
2231 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2232 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2233 ))
2234 .add_list(
2235 SlidingSyncList::builder("another-list")
2236 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2237 )
2238 .build()
2239 .await?;
2240
2241 let stream = sliding_sync.sync();
2242 pin_mut!(stream);
2243
2244 let cloned_sync = sliding_sync.clone();
2245 spawn(async move {
2246 tokio::time::sleep(Duration::from_millis(100)).await;
2247
2248 cloned_sync
2249 .on_list("another-list", |list| {
2250 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2251 ready(())
2252 })
2253 .await;
2254 });
2255
2256 assert_matches!(stream.next().await, Some(Ok(_)));
2257
2258 sliding_sync.stop_sync().unwrap();
2259
2260 assert_matches!(stream.next().await, None);
2261
2262 let mut num_requests = 0;
2263
2264 for request in server.received_requests().await.unwrap() {
2265 if !SlidingSyncMatcher.matches(&request) {
2266 continue;
2267 }
2268
2269 let another_list_ranges = if num_requests == 0 {
2270 json!([[0, 10]])
2272 } else {
2273 json!([[10, 20]])
2275 };
2276
2277 num_requests += 1;
2278 assert!(num_requests <= 2, "more than one request hit the server");
2279
2280 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2281
2282 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2283 &json_value,
2284 &json!({
2285 "conn_id": "test",
2286 "lists": {
2287 "room-list": {
2288 "ranges": [[0, 9]],
2289 "required_state": [
2290 ["m.room.encryption", ""],
2291 ["m.room.tombstone", ""]
2292 ],
2293 },
2294 "another-list": {
2295 "ranges": another_list_ranges,
2296 "required_state": [
2297 ["m.room.encryption", ""],
2298 ["m.room.tombstone", ""]
2299 ],
2300 },
2301 }
2302 }),
2303 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2304 ) {
2305 dbg!(json_value);
2306 panic!("json differ: {err}");
2307 }
2308 }
2309
2310 assert_eq!(num_requests, 2);
2311
2312 Ok(())
2313 }
2314
2315 #[async_test]
2316 async fn test_timeout_zero_list() -> Result<()> {
2317 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2318
2319 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2320
2321 assert!(request.timeout.is_some());
2324
2325 Ok(())
2326 }
2327
2328 #[async_test]
2329 async fn test_timeout_one_list() -> Result<()> {
2330 let (_server, sliding_sync) = new_sliding_sync(vec![
2331 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2332 ])
2333 .await?;
2334
2335 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2336
2337 assert!(request.timeout.is_none());
2339
2340 {
2342 let server_response = assign!(http::Response::new("0".to_owned()), {
2343 lists: BTreeMap::from([(
2344 "foo".to_owned(),
2345 assign!(http::response::List::default(), {
2346 count: uint!(7),
2347 })
2348 )])
2349 });
2350
2351 let _summary = {
2352 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2353 sliding_sync
2354 .handle_response(
2355 server_response.clone(),
2356 &mut pos_guard,
2357 RequestedRequiredStates::default(),
2358 )
2359 .await?
2360 };
2361 }
2362
2363 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2364
2365 assert!(request.timeout.is_some());
2367
2368 Ok(())
2369 }
2370
2371 #[async_test]
2372 async fn test_timeout_three_lists() -> Result<()> {
2373 let (_server, sliding_sync) = new_sliding_sync(vec![
2374 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2375 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2376 SlidingSyncList::builder("baz")
2377 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2378 ])
2379 .await?;
2380
2381 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2382
2383 assert!(request.timeout.is_none());
2385
2386 {
2388 let server_response = assign!(http::Response::new("0".to_owned()), {
2389 lists: BTreeMap::from([(
2390 "foo".to_owned(),
2391 assign!(http::response::List::default(), {
2392 count: uint!(7),
2393 })
2394 )])
2395 });
2396
2397 let _summary = {
2398 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2399 sliding_sync
2400 .handle_response(
2401 server_response.clone(),
2402 &mut pos_guard,
2403 RequestedRequiredStates::default(),
2404 )
2405 .await?
2406 };
2407 }
2408
2409 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2410
2411 assert!(request.timeout.is_none());
2413
2414 {
2416 let server_response = assign!(http::Response::new("1".to_owned()), {
2417 lists: BTreeMap::from([(
2418 "bar".to_owned(),
2419 assign!(http::response::List::default(), {
2420 count: uint!(7),
2421 })
2422 )])
2423 });
2424
2425 let _summary = {
2426 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2427 sliding_sync
2428 .handle_response(
2429 server_response.clone(),
2430 &mut pos_guard,
2431 RequestedRequiredStates::default(),
2432 )
2433 .await?
2434 };
2435 }
2436
2437 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2438
2439 assert!(request.timeout.is_some());
2441
2442 Ok(())
2443 }
2444
2445 #[async_test]
2446 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2447 let server = MockServer::start().await;
2448 let client = logged_in_client(Some(server.uri())).await;
2449
2450 let _mock_guard = Mock::given(SlidingSyncMatcher)
2451 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2452 "pos": "0",
2453 "lists": {},
2454 "rooms": {}
2455 })))
2456 .mount_as_scoped(&server)
2457 .await;
2458
2459 let sliding_sync = client
2460 .sliding_sync("test")?
2461 .with_to_device_extension(
2462 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2463 )
2464 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2465 .build()
2466 .await?;
2467
2468 let sliding_sync = Arc::new(sliding_sync);
2469
2470 let sync_beat_listener = client.inner.sync_beat.listen();
2472 sliding_sync.sync_once().await?;
2473
2474 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2476 Ok(())
2477 }
2478
2479 #[async_test]
2480 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2481 let server = MockServer::start().await;
2482 let client = logged_in_client(Some(server.uri())).await;
2483
2484 let _mock_guard = Mock::given(SlidingSyncMatcher)
2485 .respond_with(ResponseTemplate::new(404))
2486 .mount_as_scoped(&server)
2487 .await;
2488
2489 let sliding_sync = client
2490 .sliding_sync("test")?
2491 .with_to_device_extension(
2492 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2493 )
2494 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2495 .build()
2496 .await?;
2497
2498 let sliding_sync = Arc::new(sliding_sync);
2499
2500 let sync_beat_listener = client.inner.sync_beat.listen();
2502 let sync_result = sliding_sync.sync_once().await;
2503 assert!(sync_result.is_err());
2504
2505 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2507
2508 Ok(())
2509 }
2510
2511 #[async_test]
2512 async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2513 let server = MatrixMockServer::new().await;
2514 let client = server.client_builder().build().await;
2515 let room_id = room_id!("!mu5hr00m:example.org");
2516
2517 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2518 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2519 "pos": "0",
2520 "lists": {},
2521 "extensions": {
2522 "account_data": {
2523 "global": [
2524 {
2525 "type": "m.direct",
2526 "content": {
2527 "@de4dlockh0lmes:example.org": [
2528 "!mu5hr00m:example.org"
2529 ]
2530 }
2531 }
2532 ]
2533 }
2534 },
2535 "rooms": {
2536 room_id: {
2537 "name": "Mario Bros Fanbase Room",
2538 "initial": true,
2539 },
2540 }
2541 })))
2542 .mount_as_scoped(server.server())
2543 .await;
2544
2545 let f = EventFactory::new().room(room_id);
2546
2547 Mock::given(method("GET"))
2548 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2549 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2550 "chunk": [
2551 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2552 ]
2553 })))
2554 .mount(server.server())
2555 .await;
2556
2557 let (tx, rx) = tokio::sync::oneshot::channel();
2558
2559 let tx = Arc::new(Mutex::new(Some(tx)));
2560 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2561 let members =
2563 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2564 assert_eq!(members.len(), 1);
2565 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2566 });
2567
2568 let sliding_sync = client
2569 .sliding_sync("test")?
2570 .add_list(SlidingSyncList::builder("thelist"))
2571 .with_account_data_extension(
2572 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2573 )
2574 .build()
2575 .await?;
2576
2577 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2578 .await
2579 .expect("Sync did not complete in time")
2580 .expect("Sync failed");
2581
2582 tokio::time::timeout(Duration::from_secs(5), rx)
2584 .await
2585 .expect("Event handler did not complete in time")
2586 .expect("Event handler failed");
2587
2588 Ok(())
2589 }
2590}