1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23
24use std::{
25 collections::{BTreeMap, btree_map::Entry},
26 fmt::Debug,
27 future::Future,
28 sync::{Arc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard},
29 time::Duration,
30};
31
32use async_stream::stream;
33pub use client::{Version, VersionBuilder};
34use futures_core::stream::Stream;
35use matrix_sdk_base::RequestedRequiredStates;
36#[cfg(feature = "e2e-encryption")]
37use matrix_sdk_common::executor::JoinHandleExt as _;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40 OwnedRoomId, RoomId,
41 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42 assign,
43};
44use tokio::{
45 select,
46 sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
47};
48use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
49
50pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
51use self::{cache::restore_sliding_sync_state, client::SlidingSyncResponseProcessor};
52use crate::{Client, Result, config::RequestConfig};
53
54#[derive(Clone, Debug)]
58pub struct SlidingSync {
59 inner: Arc<SlidingSyncInner>,
61}
62
63#[derive(Debug)]
64pub(super) struct SlidingSyncInner {
65 id: String,
69
70 client: Client,
72
73 poll_timeout: Duration,
75
76 network_timeout: Duration,
79
80 storage_key: String,
82
83 share_pos: bool,
90
91 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
104
105 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
107
108 room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, http::request::RoomSubscription>>,
111
112 extensions: http::request::Extensions,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127 cache::store_sliding_sync_state(self, position).await
128 }
129
130 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132 SlidingSyncBuilder::new(id, client)
133 }
134
135 pub fn subscribe_to_rooms(
142 &self,
143 room_ids: &[&RoomId],
144 settings: Option<http::request::RoomSubscription>,
145 cancel_in_flight_request: bool,
146 ) {
147 if subscribe_to_rooms(
148 self.inner.room_subscriptions.write().unwrap(),
149 &self.inner.client,
150 room_ids,
151 settings,
152 cancel_in_flight_request,
153 ) {
154 self.inner.internal_channel_send_if_possible(
155 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
156 );
157 }
158 }
159
160 pub fn unsubscribe_to_rooms(&self, room_ids: &[&RoomId], cancel_in_flight_request: bool) {
162 let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
163 let mut skip_over_current_sync_loop_iteration = false;
164
165 for room_id in room_ids {
166 if room_subscriptions.remove(*room_id).is_some() {
167 skip_over_current_sync_loop_iteration = true;
168 }
169 }
170
171 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172 self.inner.internal_channel_send_if_possible(
173 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174 );
175 }
176 }
177
178 pub fn clear_and_subscribe_to_rooms(
183 &self,
184 room_ids: &[&RoomId],
185 settings: Option<http::request::RoomSubscription>,
186 cancel_in_flight_request: bool,
187 ) {
188 let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
189 room_subscriptions.clear();
190
191 if subscribe_to_rooms(
192 room_subscriptions,
193 &self.inner.client,
194 room_ids,
195 settings,
196 cancel_in_flight_request,
197 ) {
198 self.inner.internal_channel_send_if_possible(
199 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
200 );
201 }
202 }
203
204 pub async fn on_list<Function, FunctionOutput, R>(
206 &self,
207 list_name: &str,
208 function: Function,
209 ) -> Option<R>
210 where
211 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
212 FunctionOutput: Future<Output = R>,
213 {
214 let lists = self.inner.lists.read().await;
215
216 match lists.get(list_name) {
217 Some(list) => Some(function(list).await),
218 None => None,
219 }
220 }
221
222 pub async fn add_list(
228 &self,
229 list_builder: SlidingSyncListBuilder,
230 ) -> Result<Option<SlidingSyncList>> {
231 let list = list_builder.build(self.inner.internal_channel.clone());
232
233 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
234
235 self.inner.internal_channel_send_if_possible(
236 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
237 );
238
239 Ok(old_list)
240 }
241
242 pub async fn add_cached_list(
249 &self,
250 mut list_builder: SlidingSyncListBuilder,
251 ) -> Result<Option<SlidingSyncList>> {
252 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
253
254 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
255
256 self.add_list(list_builder).await
257 }
258
259 #[instrument(skip_all)]
261 async fn handle_response(
262 &self,
263 mut sliding_sync_response: http::Response,
264 position: &mut SlidingSyncPositionMarkers,
265 requested_required_states: RequestedRequiredStates,
266 ) -> Result<UpdateSummary, crate::Error> {
267 let pos = Some(sliding_sync_response.pos.clone());
268
269 let must_process_rooms_response = self.must_process_rooms_response().await;
270
271 trace!(yes = must_process_rooms_response, "Must process rooms response?");
272
273 let sync_response = {
281 let _timer = timer!("response processor");
282
283 let response_processor = {
284 let _state_store_lock = {
287 let _timer = timer!("acquiring the `state_store_lock`");
288
289 self.inner.client.base_client().state_store_lock().lock().await
290 };
291
292 let mut response_processor =
293 SlidingSyncResponseProcessor::new(self.inner.client.clone());
294
295 if self.is_thread_subscriptions_enabled() {
301 response_processor
302 .handle_thread_subscriptions(
303 position.pos.as_deref(),
304 std::mem::take(
305 &mut sliding_sync_response.extensions.thread_subscriptions,
306 ),
307 )
308 .await?;
309 }
310
311 #[cfg(feature = "e2e-encryption")]
312 if self.is_e2ee_enabled() {
313 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
314 }
315
316 if must_process_rooms_response {
319 response_processor
320 .handle_room_response(&sliding_sync_response, &requested_required_states)
321 .await?;
322 }
323
324 response_processor
325 };
326
327 response_processor.process_and_take_response().await?
329 };
330
331 debug!("Sliding Sync response has been handled by the client");
332 trace!(?sync_response);
333
334 let update_summary = {
335 let updated_rooms = {
337 let mut updated_rooms = Vec::with_capacity(
338 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
339 );
340
341 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
342
343 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
351
352 updated_rooms
353 };
354
355 let updated_lists = {
357 debug!(
358 lists = ?sliding_sync_response.lists,
359 "Update lists"
360 );
361
362 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
363 let mut lists = self.inner.lists.write().await;
364
365 for (name, list) in lists.iter_mut() {
368 if let Some(updates) = sliding_sync_response.lists.get(name) {
369 let maximum_number_of_rooms: u32 =
370 updates.count.try_into().expect("failed to convert `count` to `u32`");
371
372 if list.update(Some(maximum_number_of_rooms))? {
373 updated_lists.push(name.clone());
374 }
375 } else if list.update(None)? {
376 updated_lists.push(name.clone());
377 }
378 }
379
380 for name in sliding_sync_response.lists.keys() {
382 if !lists.contains_key(name) {
383 error!("Response for list `{name}` - unknown to us; skipping");
384 }
385 }
386
387 updated_lists
388 };
389
390 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
391 };
392
393 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
397
398 position.pos = pos;
399
400 Ok(update_summary)
401 }
402
403 async fn generate_sync_request(
404 &self,
405 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
406 let mut requests_lists = BTreeMap::new();
408
409 let timeout = {
410 let lists = self.inner.lists.read().await;
411
412 let mut timeout = PollTimeout::Default;
414
415 for (name, list) in lists.iter() {
416 requests_lists.insert(name.clone(), list.next_request()?);
417 timeout = timeout.min(list.requires_timeout());
418 }
419
420 timeout
421 };
422
423 let mut position_guard = {
431 debug!("Waiting to acquire the `position` lock");
432
433 let _timer = timer!("acquiring the `position` lock");
434
435 self.inner.position.clone().lock_owned().await
436 };
437
438 debug!(pos = ?position_guard.pos, "Got a position");
439
440 let to_device_enabled = self.inner.extensions.to_device.enabled == Some(true);
441
442 let restored_fields = if self.inner.share_pos || to_device_enabled {
443 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
444 } else {
445 None
446 };
447
448 let pos = if self.inner.share_pos {
451 if let Some(fields) = &restored_fields {
452 if fields.pos != position_guard.pos {
454 info!(
455 "Pos from previous request ('{:?}') was different from \
456 pos in database ('{:?}').",
457 position_guard.pos, fields.pos
458 );
459 position_guard.pos = fields.pos.clone();
460 }
461 fields.pos.clone()
462 } else {
463 position_guard.pos.clone()
464 }
465 } else {
466 position_guard.pos.clone()
467 };
468
469 #[cfg(feature = "e2e-encryption")]
478 if pos.is_none() && self.is_e2ee_enabled() {
479 info!("Marking all tracked users as dirty");
480
481 let olm_machine = self.inner.client.olm_machine().await;
482 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
483 olm_machine.mark_all_tracked_users_as_dirty().await?;
484 }
485
486 let timeout = match timeout {
491 PollTimeout::None => None,
492 PollTimeout::Some(timeout) => Some(Duration::from_secs(timeout.into())),
493 PollTimeout::Default => Some(self.inner.poll_timeout),
494 };
495
496 Span::current()
497 .record("pos", &pos)
498 .record("timeout", timeout.map(|duration| duration.as_millis()));
499
500 let mut request = assign!(http::Request::new(), {
501 conn_id: Some(self.inner.id.clone()),
502 pos,
503 timeout,
504 lists: requests_lists,
505 });
506
507 request.room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
509
510 request.extensions = self.inner.extensions.clone();
512
513 if to_device_enabled {
515 request.extensions.to_device.since =
516 restored_fields.and_then(|fields| fields.to_device_token);
517 }
518
519 Ok((
520 request,
522 RequestConfig::default()
525 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
526 .retry_limit(3),
527 position_guard,
528 ))
529 }
530
531 async fn send_sync_request(
535 &self,
536 request: http::Request,
537 request_config: RequestConfig,
538 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
539 ) -> Result<UpdateSummary> {
540 debug!("Sending request");
541
542 let requested_required_states = RequestedRequiredStates::from(&request);
544 let request = self.inner.client.send(request).with_request_config(request_config);
545
546 #[cfg(feature = "e2e-encryption")]
553 let response = {
554 if self.is_e2ee_enabled() {
555 let client = self.inner.client.clone();
572 let e2ee_uploads = spawn(
573 async move {
574 if let Err(error) = client.send_outgoing_requests().await {
575 error!(?error, "Error while sending outgoing E2EE requests");
576 }
577 }
578 .instrument(Span::current()),
579 )
580 .abort_on_drop();
583
584 let response = request.await?;
586
587 e2ee_uploads.await.map_err(|error| Error::JoinError {
592 task_description: "e2ee_uploads".to_owned(),
593 error,
594 })?;
595
596 response
597 } else {
598 request.await?
599 }
600 };
601
602 #[cfg(not(feature = "e2e-encryption"))]
604 let response = request.await?;
605
606 debug!("Received response");
607
608 let this = self.clone();
618
619 let future = async move {
622 debug!("Start handling response");
623
624 let updates = this
630 .handle_response(response, &mut position_guard, requested_required_states)
631 .await?;
632
633 this.cache_to_storage(&position_guard).await?;
634
635 drop(position_guard);
638
639 debug!("Done handling response");
640
641 Ok(updates)
642 };
643
644 spawn(future.instrument(Span::current())).await.unwrap()
645 }
646
647 #[cfg(feature = "e2e-encryption")]
649 fn is_e2ee_enabled(&self) -> bool {
650 self.inner.extensions.e2ee.enabled == Some(true)
651 }
652
653 fn is_thread_subscriptions_enabled(&self) -> bool {
656 self.inner.extensions.thread_subscriptions.enabled == Some(true)
657 }
658
659 #[cfg(not(feature = "e2e-encryption"))]
660 fn is_e2ee_enabled(&self) -> bool {
661 false
662 }
663
664 async fn must_process_rooms_response(&self) -> bool {
666 !self.inner.room_subscriptions.read().unwrap().is_empty()
669 || !self.inner.lists.read().await.is_empty()
670 }
671
672 #[doc(hidden)]
676 #[instrument(skip_all, fields(conn_id = self.inner.id, pos, timeout))]
677 pub async fn sync_once(&self) -> Result<UpdateSummary> {
678 let (request, request_config, position_guard) = self.generate_sync_request().await?;
679
680 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
682
683 self.inner.client.inner.sync_beat.notify(usize::MAX);
685
686 Ok(summaries)
687 }
688
689 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
699 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
700 debug!("Starting sync stream");
701
702 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
703
704 stream! {
705 loop {
706 debug!("Sync stream is running");
707
708 select! {
709 biased;
710
711 internal_message = internal_channel_receiver.recv() => {
712 use SlidingSyncInternalMessage::*;
713
714 debug!(?internal_message, "Sync stream has received an internal message");
715
716 match internal_message {
717 Err(_) | Ok(SyncLoopStop) => {
718 break;
719 }
720
721 Ok(SyncLoopSkipOverCurrentIteration) => {
722 continue;
723 }
724 }
725 }
726
727 update_summary = self.sync_once() => {
728 match update_summary {
729 Ok(updates) => {
730 yield Ok(updates);
731 }
732
733 Err(error) => {
735 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
736 self.expire_session().await;
738 }
739
740 yield Err(error);
741
742 break;
744 }
745 }
746 }
747 }
748 }
749
750 debug!("Sync stream has exited.");
751 }
752 }
753
754 pub fn stop_sync(&self) -> Result<()> {
763 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
764 }
765
766 #[doc(hidden)]
777 pub async fn expire_session(&self) {
778 info!("Session expired; resetting `pos`");
779
780 {
781 let lists = self.inner.lists.read().await;
782
783 for list in lists.values() {
784 list.set_maximum_number_of_rooms(None);
786 }
787 }
788
789 {
791 let mut position = self.inner.position.lock().await;
792
793 position.pos = None;
795
796 if let Err(err) = self.cache_to_storage(&position).await {
800 warn!("Failed to invalidate cached sliding sync state: {err}");
801 }
802 }
803
804 {
805 self.inner.room_subscriptions.write().unwrap().clear();
808 }
809 }
810}
811
812fn subscribe_to_rooms(
815 mut room_subscriptions: StdRwLockWriteGuard<
816 '_,
817 BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
818 >,
819 client: &Client,
820 room_ids: &[&RoomId],
821 settings: Option<http::request::RoomSubscription>,
822 cancel_in_flight_request: bool,
823) -> bool {
824 let settings = settings.unwrap_or_default();
825 let mut skip_over_current_sync_loop_iteration = false;
826
827 for room_id in room_ids {
828 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
829 if let Some(room) = client.get_room(room_id) {
830 room.mark_members_missing();
831 }
832
833 entry.insert(settings.clone());
834
835 skip_over_current_sync_loop_iteration = true;
836 }
837 }
838
839 cancel_in_flight_request && skip_over_current_sync_loop_iteration
840}
841
842impl SlidingSyncInner {
843 #[instrument]
845 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
846 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
847 }
848
849 #[instrument]
852 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
853 let _ = self.internal_channel.send(message);
855 }
856}
857
858#[derive(Copy, Clone, Debug, PartialEq)]
859enum SlidingSyncInternalMessage {
860 SyncLoopStop,
862
863 SyncLoopSkipOverCurrentIteration,
866}
867
868#[cfg(any(test, feature = "testing"))]
869impl SlidingSync {
870 pub async fn set_pos(&self, new_pos: String) {
872 let mut position_lock = self.inner.position.lock().await;
873 position_lock.pos = Some(new_pos);
874 }
875}
876
877#[derive(Clone, Debug)]
878pub(super) struct SlidingSyncPositionMarkers {
879 pos: Option<String>,
882}
883
884#[derive(Debug, Clone)]
887pub struct UpdateSummary {
888 pub lists: Vec<String>,
890 pub rooms: Vec<OwnedRoomId>,
892}
893
894#[derive(Debug)]
903pub enum PollTimeout {
904 None,
906
907 Some(u32),
910
911 Default,
914}
915
916impl PollTimeout {
917 fn min(self, left: Self) -> Self {
928 match (self, left) {
929 (Self::None, _) => Self::None,
930
931 (Self::Some(_), Self::None) => Self::None,
932 (Self::Some(right), Self::Some(left)) => Self::Some(right.min(left)),
933 (Self::Some(right), Self::Default) => Self::Some(right),
934
935 (Self::Default, Self::None) => Self::None,
936 (Self::Default, Self::Some(left)) => Self::Some(left),
937 (Self::Default, Self::Default) => Self::Default,
938 }
939 }
940}
941
942#[cfg(all(test, not(target_family = "wasm")))]
943#[allow(clippy::dbg_macro)]
944mod tests {
945 use std::{
946 collections::BTreeMap,
947 future::ready,
948 ops::Not,
949 sync::{Arc, Mutex},
950 time::Duration,
951 };
952
953 use assert_matches::assert_matches;
954 use event_listener::Listener;
955 use futures_util::{StreamExt, future::join_all, pin_mut};
956 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
957 use matrix_sdk_common::executor::spawn;
958 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
959 use ruma::{
960 OwnedRoomId, assign,
961 events::{direct::DirectEvent, room::member::MembershipState},
962 owned_room_id, room_id,
963 serde::Raw,
964 uint,
965 };
966 use serde::Deserialize;
967 use serde_json::json;
968 use wiremock::{
969 Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
970 };
971
972 use super::{
973 SlidingSync, SlidingSyncBuilder, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
974 cache::restore_sliding_sync_state, http,
975 };
976 use crate::{
977 Client, Result,
978 test_utils::{logged_in_client, mocks::MatrixMockServer},
979 };
980
981 #[derive(Copy, Clone)]
982 struct SlidingSyncMatcher;
983
984 impl Match for SlidingSyncMatcher {
985 fn matches(&self, request: &Request) -> bool {
986 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
987 && request.method == Method::POST
988 }
989 }
990
991 async fn new_sliding_sync(
992 lists: Vec<SlidingSyncListBuilder>,
993 ) -> Result<(MockServer, SlidingSync)> {
994 let server = MockServer::start().await;
995 let client = logged_in_client(Some(server.uri())).await;
996
997 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
998
999 for list in lists {
1000 sliding_sync_builder = sliding_sync_builder.add_list(list);
1001 }
1002
1003 let sliding_sync = sliding_sync_builder.build().await?;
1004
1005 Ok((server, sliding_sync))
1006 }
1007
1008 #[async_test]
1009 async fn test_subscribe_to_rooms() -> Result<()> {
1010 let (server, sliding_sync) = new_sliding_sync(vec![
1011 SlidingSyncList::builder("foo")
1012 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1013 ])
1014 .await?;
1015
1016 let stream = sliding_sync.sync();
1017 pin_mut!(stream);
1018
1019 let room_id_0 = room_id!("!r0:bar.org");
1020 let room_id_1 = room_id!("!r1:bar.org");
1021 let room_id_2 = room_id!("!r2:bar.org");
1022
1023 {
1024 let _mock_guard = Mock::given(SlidingSyncMatcher)
1025 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1026 "pos": "1",
1027 "lists": {},
1028 "rooms": {
1029 room_id_0: {
1030 "name": "Room #0",
1031 "initial": true,
1032 },
1033 room_id_1: {
1034 "name": "Room #1",
1035 "initial": true,
1036 },
1037 room_id_2: {
1038 "name": "Room #2",
1039 "initial": true,
1040 },
1041 }
1042 })))
1043 .mount_as_scoped(&server)
1044 .await;
1045
1046 let _ = stream.next().await.unwrap()?;
1047 }
1048
1049 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1050
1051 assert!(room0.are_members_synced().not());
1055
1056 {
1057 struct MemberMatcher(OwnedRoomId);
1058
1059 impl Match for MemberMatcher {
1060 fn matches(&self, request: &Request) -> bool {
1061 request.url.path()
1062 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1063 && request.method == Method::GET
1064 }
1065 }
1066
1067 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1068 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1069 "chunk": [],
1070 })))
1071 .mount_as_scoped(&server)
1072 .await;
1073
1074 assert_matches!(room0.request_members().await, Ok(()));
1075 }
1076
1077 assert!(room0.are_members_synced());
1079
1080 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1081
1082 assert!(room0.are_members_synced().not());
1085
1086 {
1087 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1088
1089 assert!(room_subscriptions.contains_key(room_id_0));
1090 assert!(room_subscriptions.contains_key(room_id_1));
1091 assert!(!room_subscriptions.contains_key(room_id_2));
1092 }
1093
1094 {
1097 struct MemberMatcher(OwnedRoomId);
1098
1099 impl Match for MemberMatcher {
1100 fn matches(&self, request: &Request) -> bool {
1101 request.url.path()
1102 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1103 && request.method == Method::GET
1104 }
1105 }
1106
1107 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1108 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1109 "chunk": [],
1110 })))
1111 .mount_as_scoped(&server)
1112 .await;
1113
1114 assert_matches!(room0.request_members().await, Ok(()));
1115 }
1116
1117 assert!(room0.are_members_synced());
1119
1120 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1121
1122 assert!(room0.are_members_synced());
1125
1126 Ok(())
1127 }
1128
1129 #[async_test]
1130 async fn test_subscribe_unsubscribe_and_clear_and_subscribe_to_rooms() -> Result<()> {
1131 let (_server, sliding_sync) = new_sliding_sync(vec![
1132 SlidingSyncList::builder("foo")
1133 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1134 ])
1135 .await?;
1136
1137 let room_id_0 = room_id!("!r0:bar.org");
1138 let room_id_1 = room_id!("!r1:bar.org");
1139 let room_id_2 = room_id!("!r2:bar.org");
1140 let room_id_3 = room_id!("!r3:bar.org");
1141
1142 {
1144 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1145
1146 assert!(room_subscriptions.is_empty());
1147 }
1148
1149 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1151
1152 {
1153 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1154
1155 assert_eq!(room_subscriptions.len(), 2);
1156 assert!(room_subscriptions.contains_key(room_id_0));
1157 assert!(room_subscriptions.contains_key(room_id_1));
1158 }
1159
1160 sliding_sync.unsubscribe_to_rooms(&[room_id_0], false);
1162
1163 {
1164 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1165
1166 assert_eq!(room_subscriptions.len(), 1);
1167 assert!(room_subscriptions.contains_key(room_id_1));
1168 }
1169
1170 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1172
1173 {
1174 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1175
1176 assert_eq!(room_subscriptions.len(), 2);
1177 assert!(room_subscriptions.contains_key(room_id_0));
1178 assert!(room_subscriptions.contains_key(room_id_1));
1179 }
1180
1181 sliding_sync.clear_and_subscribe_to_rooms(
1183 &[room_id_2, room_id_3],
1184 Default::default(),
1185 false,
1186 );
1187
1188 {
1189 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1190
1191 assert_eq!(room_subscriptions.len(), 2);
1192 assert!(room_subscriptions.contains_key(room_id_2));
1193 assert!(room_subscriptions.contains_key(room_id_3));
1194 }
1195
1196 Ok(())
1197 }
1198
1199 #[async_test]
1200 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1201 let (_server, sliding_sync) = new_sliding_sync(vec![
1202 SlidingSyncList::builder("foo")
1203 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1204 ])
1205 .await?;
1206
1207 let room_id_0 = room_id!("!r0:bar.org");
1208 let room_id_1 = room_id!("!r1:bar.org");
1209 let room_id_2 = room_id!("!r2:bar.org");
1210
1211 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1213
1214 {
1215 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1216
1217 assert!(room_subscriptions.contains_key(room_id_0));
1218 assert!(room_subscriptions.contains_key(room_id_1));
1219 assert!(room_subscriptions.contains_key(room_id_2).not());
1220 }
1221
1222 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1224
1225 {
1226 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1227
1228 assert!(room_subscriptions.contains_key(room_id_0));
1229 assert!(room_subscriptions.contains_key(room_id_1));
1230 assert!(room_subscriptions.contains_key(room_id_2));
1231 }
1232
1233 sliding_sync.expire_session().await;
1235
1236 {
1237 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1238
1239 assert!(room_subscriptions.is_empty());
1240 }
1241
1242 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1244
1245 {
1246 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1247
1248 assert!(room_subscriptions.contains_key(room_id_0).not());
1249 assert!(room_subscriptions.contains_key(room_id_1).not());
1250 assert!(room_subscriptions.contains_key(room_id_2));
1251 }
1252
1253 Ok(())
1254 }
1255
1256 #[async_test]
1257 async fn test_add_list() -> Result<()> {
1258 let (_server, sliding_sync) = new_sliding_sync(vec![
1259 SlidingSyncList::builder("foo")
1260 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1261 ])
1262 .await?;
1263
1264 let _stream = sliding_sync.sync();
1265 pin_mut!(_stream);
1266
1267 sliding_sync
1268 .add_list(
1269 SlidingSyncList::builder("bar")
1270 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1271 )
1272 .await?;
1273
1274 let lists = sliding_sync.inner.lists.read().await;
1275
1276 assert!(lists.contains_key("foo"));
1277 assert!(lists.contains_key("bar"));
1278
1279 Ok(())
1282 }
1283
1284 #[cfg(feature = "e2e-encryption")]
1285 #[async_test]
1286 async fn test_extensions_to_device_since_is_set() {
1287 use matrix_sdk_base::crypto::store::types::Changes;
1288
1289 let client = logged_in_client(None).await;
1290 let sliding_sync = SlidingSyncBuilder::new("foo".to_owned(), client.clone())
1291 .unwrap()
1292 .with_to_device_extension(assign!(
1293 http::request::ToDevice::default(),
1294 {
1295 enabled: Some(true),
1296 }
1297 ))
1298 .build()
1299 .await
1300 .unwrap();
1301
1302 {
1304 let to_device = &sliding_sync.inner.extensions.to_device;
1305
1306 assert_eq!(to_device.enabled, Some(true));
1307 assert!(to_device.since.is_none());
1308 }
1309
1310 {
1312 let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1313
1314 let to_device = &request.extensions.to_device;
1315
1316 assert_eq!(to_device.enabled, Some(true));
1317 assert!(to_device.since.is_none());
1318 }
1319
1320 let since_token = "depuis".to_owned();
1322
1323 {
1324 if let Some(olm_machine) = &*client.olm_machine().await {
1325 olm_machine
1326 .store()
1327 .save_changes(Changes {
1328 next_batch_token: Some(since_token.clone()),
1329 ..Default::default()
1330 })
1331 .await
1332 .unwrap();
1333 } else {
1334 panic!("Where is the Olm machine?");
1335 }
1336 }
1337
1338 {
1340 let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1341
1342 let to_device = &request.extensions.to_device;
1343
1344 assert_eq!(to_device.enabled, Some(true));
1345 assert_eq!(to_device.since, Some(since_token));
1346 }
1347 }
1348
1349 #[async_test]
1355 #[cfg(feature = "e2e-encryption")]
1356 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1357 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1358 use matrix_sdk_test::ruma_response_from_json;
1359 use ruma::user_id;
1360
1361 let server = MockServer::start().await;
1362 let client = logged_in_client(Some(server.uri())).await;
1363
1364 let alice = user_id!("@alice:localhost");
1365 let bob = user_id!("@bob:localhost");
1366 let me = user_id!("@example:localhost");
1367
1368 {
1371 let olm_machine = client.olm_machine().await;
1372 let olm_machine = olm_machine.as_ref().unwrap();
1373
1374 olm_machine.update_tracked_users([alice, bob]).await?;
1375
1376 let outgoing_requests = olm_machine.outgoing_requests().await?;
1378
1379 assert_eq!(outgoing_requests.len(), 2);
1380 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1381 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1382
1383 olm_machine
1385 .mark_request_as_sent(
1386 outgoing_requests[0].request_id(),
1387 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1388 "one_time_key_counts": {}
1389 }))),
1390 )
1391 .await?;
1392
1393 olm_machine
1394 .mark_request_as_sent(
1395 outgoing_requests[1].request_id(),
1396 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1397 "device_keys": {
1398 alice: {},
1399 bob: {},
1400 }
1401 }))),
1402 )
1403 .await?;
1404
1405 let outgoing_requests = olm_machine.outgoing_requests().await?;
1407
1408 assert_eq!(outgoing_requests.len(), 1);
1409 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1410
1411 olm_machine
1412 .mark_request_as_sent(
1413 outgoing_requests[0].request_id(),
1414 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1415 "device_keys": {
1416 me: {},
1417 }
1418 }))),
1419 )
1420 .await?;
1421
1422 let outgoing_requests = olm_machine.outgoing_requests().await?;
1424
1425 assert!(outgoing_requests.is_empty());
1426 }
1427
1428 let sync = client
1429 .sliding_sync("test-slidingsync")?
1430 .add_list(SlidingSyncList::builder("new_list"))
1431 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1432 .build()
1433 .await?;
1434
1435 let (_request, _, _) = sync.generate_sync_request().await?;
1437
1438 {
1440 let olm_machine = client.olm_machine().await;
1441 let olm_machine = olm_machine.as_ref().unwrap();
1442
1443 let outgoing_requests = olm_machine.outgoing_requests().await?;
1445
1446 assert_eq!(outgoing_requests.len(), 1);
1447 assert_matches!(
1448 outgoing_requests[0].request(),
1449 AnyOutgoingRequest::KeysQuery(request) => {
1450 assert!(request.device_keys.contains_key(alice));
1451 assert!(request.device_keys.contains_key(bob));
1452 assert!(request.device_keys.contains_key(me));
1453 }
1454 );
1455
1456 olm_machine
1458 .mark_request_as_sent(
1459 outgoing_requests[0].request_id(),
1460 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1461 "device_keys": {
1462 alice: {},
1463 bob: {},
1464 me: {},
1465 }
1466 }))),
1467 )
1468 .await?;
1469 }
1470
1471 sync.set_pos("chocolat".to_owned()).await;
1473
1474 let (_request, _, _) = sync.generate_sync_request().await?;
1475
1476 {
1478 let olm_machine = client.olm_machine().await;
1479 let olm_machine = olm_machine.as_ref().unwrap();
1480
1481 let outgoing_requests = olm_machine.outgoing_requests().await?;
1483
1484 assert!(outgoing_requests.is_empty());
1485 }
1486
1487 Ok(())
1488 }
1489
1490 #[cfg(feature = "e2e-encryption")]
1491 #[async_test]
1492 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1493 let server = MockServer::start().await;
1494
1495 #[derive(Deserialize)]
1496 struct PartialRequest {
1497 txn_id: Option<String>,
1498 }
1499
1500 let server_pos = Arc::new(Mutex::new(0));
1501 let _mock_guard = Mock::given(SlidingSyncMatcher)
1502 .respond_with(move |request: &Request| {
1503 let request: PartialRequest = request.body_json().unwrap();
1505 let pos = {
1506 let mut pos = server_pos.lock().unwrap();
1507 let prev = *pos;
1508 *pos += 1;
1509 prev
1510 };
1511
1512 ResponseTemplate::new(200).set_body_json(json!({
1513 "txn_id": request.txn_id,
1514 "pos": pos.to_string(),
1515 }))
1516 })
1517 .mount_as_scoped(&server)
1518 .await;
1519
1520 let client = logged_in_client(Some(server.uri())).await;
1521
1522 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1523
1524 {
1526 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1527
1528 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1529 assert!(request.pos.is_none());
1530 }
1531
1532 let sync = sliding_sync.sync();
1533 pin_mut!(sync);
1534
1535 let next = sync.next().await;
1538 assert_matches!(next, Some(Ok(_update_summary)));
1539
1540 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1541
1542 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1543 .await?
1544 .expect("must have restored fields");
1545
1546 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1549
1550 {
1554 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1555
1556 let mut position_guard = other_sync.inner.position.lock().await;
1557 position_guard.pos = Some("yolo".to_owned());
1558
1559 other_sync.cache_to_storage(&position_guard).await?;
1560 }
1561
1562 {
1564 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1565 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1566 assert_eq!(request.pos.as_deref(), Some("0"));
1567 }
1568
1569 {
1572 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1573 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1574 }
1575
1576 Ok(())
1577 }
1578
1579 #[cfg(feature = "e2e-encryption")]
1580 #[async_test]
1581 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1582 let server = MockServer::start().await;
1583
1584 #[derive(Deserialize)]
1585 struct PartialRequest {
1586 txn_id: Option<String>,
1587 }
1588
1589 let server_pos = Arc::new(Mutex::new(0));
1590 let _mock_guard = Mock::given(SlidingSyncMatcher)
1591 .respond_with(move |request: &Request| {
1592 let request: PartialRequest = request.body_json().unwrap();
1594 let pos = {
1595 let mut pos = server_pos.lock().unwrap();
1596 let prev = *pos;
1597 *pos += 1;
1598 prev
1599 };
1600
1601 ResponseTemplate::new(200).set_body_json(json!({
1602 "txn_id": request.txn_id,
1603 "pos": pos.to_string(),
1604 }))
1605 })
1606 .mount_as_scoped(&server)
1607 .await;
1608
1609 let client = logged_in_client(Some(server.uri())).await;
1610
1611 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1612
1613 {
1615 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1616
1617 assert!(request.pos.is_none());
1618 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1619 }
1620
1621 let sync = sliding_sync.sync();
1622 pin_mut!(sync);
1623
1624 let next = sync.next().await;
1627 assert_matches!(next, Some(Ok(_update_summary)));
1628
1629 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1630
1631 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1632 .await?
1633 .expect("must have restored fields");
1634
1635 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1638
1639 {
1641 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1642
1643 let mut position_guard = other_sync.inner.position.lock().await;
1644 position_guard.pos = Some("42".to_owned());
1645
1646 other_sync.cache_to_storage(&position_guard).await?;
1647 }
1648
1649 {
1651 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1652 assert_eq!(request.pos.as_deref(), Some("42"));
1653 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1654 }
1655
1656 {
1658 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1659 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1660
1661 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1662 assert_eq!(request.pos.as_deref(), Some("42"));
1663 }
1664
1665 sliding_sync.expire_session().await;
1668
1669 {
1670 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1671
1672 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1673 assert!(request.pos.is_none());
1674 }
1675
1676 {
1678 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1679 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1680
1681 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1682 assert!(request.pos.is_none());
1683 }
1684
1685 Ok(())
1686 }
1687
1688 #[async_test]
1689 async fn test_stop_sync_loop() -> Result<()> {
1690 let (_server, sliding_sync) = new_sliding_sync(vec![
1691 SlidingSyncList::builder("foo")
1692 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1693 ])
1694 .await?;
1695
1696 let stream = sliding_sync.sync();
1698 pin_mut!(stream);
1699
1700 assert!(stream.next().await.is_some());
1702
1703 sliding_sync.stop_sync()?;
1705
1706 assert!(stream.next().await.is_none());
1708
1709 let stream = sliding_sync.sync();
1711 pin_mut!(stream);
1712
1713 assert!(stream.next().await.is_some());
1715
1716 Ok(())
1717 }
1718
1719 #[async_test]
1720 async fn test_process_read_receipts() -> Result<()> {
1721 let room = owned_room_id!("!pony:example.org");
1722
1723 let server = MockServer::start().await;
1724 let client = logged_in_client(Some(server.uri())).await;
1725 client.event_cache().subscribe().unwrap();
1726
1727 let sliding_sync = client
1728 .sliding_sync("test")?
1729 .with_receipt_extension(
1730 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
1731 )
1732 .add_list(
1733 SlidingSyncList::builder("all")
1734 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1735 )
1736 .build()
1737 .await?;
1738
1739 {
1741 let server_response = assign!(http::Response::new("0".to_owned()), {
1742 rooms: BTreeMap::from([(
1743 room.clone(),
1744 http::response::Room::default(),
1745 )])
1746 });
1747
1748 let _summary = {
1749 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1750 sliding_sync
1751 .handle_response(
1752 server_response.clone(),
1753 &mut pos_guard,
1754 RequestedRequiredStates::default(),
1755 )
1756 .await?
1757 };
1758 }
1759
1760 let server_response = assign!(http::Response::new("1".to_owned()), {
1761 extensions: assign!(http::response::Extensions::default(), {
1762 receipts: assign!(http::response::Receipts::default(), {
1763 rooms: BTreeMap::from([
1764 (
1765 room.clone(),
1766 Raw::from_json_string(
1767 json!({
1768 "room_id": room,
1769 "type": "m.receipt",
1770 "content": {
1771 "$event:bar.org": {
1772 "m.read": {
1773 client.user_id().unwrap(): {
1774 "ts": 1436451550,
1775 }
1776 }
1777 }
1778 }
1779 })
1780 .to_string(),
1781 ).unwrap()
1782 )
1783 ])
1784 })
1785 })
1786 });
1787
1788 let summary = {
1789 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1790 sliding_sync
1791 .handle_response(
1792 server_response.clone(),
1793 &mut pos_guard,
1794 RequestedRequiredStates::default(),
1795 )
1796 .await?
1797 };
1798
1799 assert!(summary.rooms.contains(&room));
1800
1801 Ok(())
1802 }
1803
1804 #[async_test]
1805 async fn test_process_marked_unread_room_account_data() -> Result<()> {
1806 let room_id = owned_room_id!("!unicorn:example.org");
1807
1808 let server = MockServer::start().await;
1809 let client = logged_in_client(Some(server.uri())).await;
1810
1811 let sliding_sync = client
1814 .sliding_sync("test")?
1815 .with_account_data_extension(
1816 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1817 )
1818 .add_list(
1819 SlidingSyncList::builder("all")
1820 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1821 )
1822 .build()
1823 .await?;
1824
1825 {
1827 let server_response = assign!(http::Response::new("0".to_owned()), {
1828 rooms: BTreeMap::from([(
1829 room_id.clone(),
1830 http::response::Room::default(),
1831 )])
1832 });
1833
1834 let _summary = {
1835 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1836 sliding_sync
1837 .handle_response(
1838 server_response.clone(),
1839 &mut pos_guard,
1840 RequestedRequiredStates::default(),
1841 )
1842 .await?
1843 };
1844 }
1845
1846 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
1850
1851 let update_summary = {
1852 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1853 sliding_sync
1854 .handle_response(
1855 server_response.clone(),
1856 &mut pos_guard,
1857 RequestedRequiredStates::default(),
1858 )
1859 .await?
1860 };
1861
1862 assert!(update_summary.rooms.contains(&room_id));
1865
1866 let room = client.get_room(&room_id).unwrap();
1867
1868 assert!(room.is_marked_unread());
1871
1872 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
1875
1876 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1877 sliding_sync
1878 .handle_response(
1879 server_response.clone(),
1880 &mut pos_guard,
1881 RequestedRequiredStates::default(),
1882 )
1883 .await?;
1884
1885 let room = client.get_room(&room_id).unwrap();
1886
1887 assert!(!room.is_marked_unread());
1888
1889 Ok(())
1890 }
1891
1892 fn make_mark_unread_response(
1893 response_number: &str,
1894 room_id: OwnedRoomId,
1895 unread: bool,
1896 add_rooms_section: bool,
1897 ) -> http::Response {
1898 let rooms = if add_rooms_section {
1899 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
1900 } else {
1901 BTreeMap::new()
1902 };
1903
1904 let extensions = assign!(http::response::Extensions::default(), {
1905 account_data: assign!(http::response::AccountData::default(), {
1906 rooms: BTreeMap::from([
1907 (
1908 room_id,
1909 vec![
1910 Raw::from_json_string(
1911 json!({
1912 "content": {
1913 "unread": unread
1914 },
1915 "type": "m.marked_unread"
1916 })
1917 .to_string(),
1918 ).unwrap()
1919 ]
1920 )
1921 ])
1922 })
1923 });
1924
1925 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
1926 }
1927
1928 #[async_test]
1929 async fn test_process_rooms_account_data() -> Result<()> {
1930 let room = owned_room_id!("!pony:example.org");
1931
1932 let server = MockServer::start().await;
1933 let client = logged_in_client(Some(server.uri())).await;
1934
1935 let sliding_sync = client
1936 .sliding_sync("test")?
1937 .with_account_data_extension(
1938 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1939 )
1940 .add_list(
1941 SlidingSyncList::builder("all")
1942 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1943 )
1944 .build()
1945 .await?;
1946
1947 {
1949 let server_response = assign!(http::Response::new("0".to_owned()), {
1950 rooms: BTreeMap::from([(
1951 room.clone(),
1952 http::response::Room::default(),
1953 )])
1954 });
1955
1956 let _summary = {
1957 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1958 sliding_sync
1959 .handle_response(
1960 server_response.clone(),
1961 &mut pos_guard,
1962 RequestedRequiredStates::default(),
1963 )
1964 .await?
1965 };
1966 }
1967
1968 let server_response = assign!(http::Response::new("1".to_owned()), {
1969 extensions: assign!(http::response::Extensions::default(), {
1970 account_data: assign!(http::response::AccountData::default(), {
1971 rooms: BTreeMap::from([
1972 (
1973 room.clone(),
1974 vec![
1975 Raw::from_json_string(
1976 json!({
1977 "content": {
1978 "tags": {
1979 "u.work": {
1980 "order": 0.9
1981 }
1982 }
1983 },
1984 "type": "m.tag"
1985 })
1986 .to_string(),
1987 ).unwrap()
1988 ]
1989 )
1990 ])
1991 })
1992 })
1993 });
1994 let summary = {
1995 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1996 sliding_sync
1997 .handle_response(
1998 server_response.clone(),
1999 &mut pos_guard,
2000 RequestedRequiredStates::default(),
2001 )
2002 .await?
2003 };
2004
2005 assert!(summary.rooms.contains(&room));
2006
2007 Ok(())
2008 }
2009
2010 #[async_test]
2011 #[cfg(feature = "e2e-encryption")]
2012 async fn test_process_only_encryption_events() -> Result<()> {
2013 use ruma::OneTimeKeyAlgorithm;
2014
2015 let room = owned_room_id!("!croissant:example.org");
2016
2017 let server = MockServer::start().await;
2018 let client = logged_in_client(Some(server.uri())).await;
2019
2020 let server_response = assign!(http::Response::new("0".to_owned()), {
2021 rooms: BTreeMap::from([(
2022 room.clone(),
2023 assign!(http::response::Room::default(), {
2024 name: Some("Croissants lovers".to_owned()),
2025 timeline: Vec::new(),
2026 }),
2027 )]),
2028
2029 extensions: assign!(http::response::Extensions::default(), {
2030 e2ee: assign!(http::response::E2EE::default(), {
2031 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2032 }),
2033 to_device: Some(assign!(http::response::ToDevice::default(), {
2034 next_batch: "to-device-token".to_owned(),
2035 })),
2036 })
2037 });
2038
2039 let sliding_sync = client
2043 .sliding_sync("test")?
2044 .with_to_device_extension(
2045 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2046 )
2047 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2048 .build()
2049 .await?;
2050
2051 {
2052 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2053
2054 sliding_sync
2055 .handle_response(
2056 server_response.clone(),
2057 &mut position_guard,
2058 RequestedRequiredStates::default(),
2059 )
2060 .await?;
2061 }
2062
2063 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2065 assert_eq!(uploaded_key_count, 42);
2066
2067 {
2068 let olm_machine = &*client.olm_machine_for_testing().await;
2069 assert_eq!(
2070 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2071 Some("to-device-token")
2072 );
2073 }
2074
2075 assert!(client.get_room(&room).is_none());
2077
2078 let client = logged_in_client(Some(server.uri())).await;
2081
2082 let sliding_sync = client
2083 .sliding_sync("test")?
2084 .add_list(SlidingSyncList::builder("thelist"))
2085 .build()
2086 .await?;
2087
2088 {
2089 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2090
2091 sliding_sync
2092 .handle_response(
2093 server_response.clone(),
2094 &mut position_guard,
2095 RequestedRequiredStates::default(),
2096 )
2097 .await?;
2098 }
2099
2100 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2102 assert_eq!(uploaded_key_count, 0);
2103
2104 {
2105 let olm_machine = &*client.olm_machine_for_testing().await;
2106 assert_eq!(
2107 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2108 None
2109 );
2110 }
2111
2112 assert!(client.get_room(&room).is_some());
2114
2115 let client = logged_in_client(Some(server.uri())).await;
2117
2118 let sliding_sync = client
2119 .sliding_sync("test")?
2120 .add_list(SlidingSyncList::builder("thelist"))
2121 .with_to_device_extension(
2122 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2123 )
2124 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2125 .build()
2126 .await?;
2127
2128 {
2129 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2130
2131 sliding_sync
2132 .handle_response(
2133 server_response.clone(),
2134 &mut position_guard,
2135 RequestedRequiredStates::default(),
2136 )
2137 .await?;
2138 }
2139
2140 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2142 assert_eq!(uploaded_key_count, 42);
2143
2144 {
2145 let olm_machine = &*client.olm_machine_for_testing().await;
2146 assert_eq!(
2147 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2148 Some("to-device-token")
2149 );
2150 }
2151
2152 assert!(client.get_room(&room).is_some());
2154
2155 Ok(())
2156 }
2157
2158 #[async_test]
2159 async fn test_lock_multiple_requests() -> Result<()> {
2160 let server = MockServer::start().await;
2161 let client = logged_in_client(Some(server.uri())).await;
2162
2163 let pos = Arc::new(Mutex::new(0));
2164 let _mock_guard = Mock::given(SlidingSyncMatcher)
2165 .respond_with(move |_: &Request| {
2166 let mut pos = pos.lock().unwrap();
2167 *pos += 1;
2168 ResponseTemplate::new(200).set_body_json(json!({
2169 "pos": pos.to_string(),
2170 "lists": {},
2171 "rooms": {}
2172 }))
2173 })
2174 .mount_as_scoped(&server)
2175 .await;
2176
2177 let sliding_sync = client
2178 .sliding_sync("test")?
2179 .with_to_device_extension(
2180 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2181 )
2182 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2183 .build()
2184 .await?;
2185
2186 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2189
2190 for result in requests.await {
2191 result?;
2192 }
2193
2194 Ok(())
2195 }
2196
2197 #[async_test]
2198 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2199 let server = MockServer::start().await;
2200 let client = logged_in_client(Some(server.uri())).await;
2201
2202 let pos = Arc::new(Mutex::new(0));
2203 let _mock_guard = Mock::given(SlidingSyncMatcher)
2204 .respond_with(move |_: &Request| {
2205 let mut pos = pos.lock().unwrap();
2206 *pos += 1;
2207 ResponseTemplate::new(200)
2209 .set_body_json(json!({
2210 "pos": pos.to_string(),
2211 "lists": {},
2212 "rooms": {}
2213 }))
2214 .set_delay(Duration::from_secs(2))
2215 })
2216 .mount_as_scoped(&server)
2217 .await;
2218
2219 let sliding_sync =
2220 client
2221 .sliding_sync("test")?
2222 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2223 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2224 ))
2225 .add_list(
2226 SlidingSyncList::builder("another-list")
2227 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2228 )
2229 .build()
2230 .await?;
2231
2232 let stream = sliding_sync.sync();
2233 pin_mut!(stream);
2234
2235 let cloned_sync = sliding_sync.clone();
2236 spawn(async move {
2237 tokio::time::sleep(Duration::from_millis(100)).await;
2238
2239 cloned_sync
2240 .on_list("another-list", |list| {
2241 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2242 ready(())
2243 })
2244 .await;
2245 });
2246
2247 assert_matches!(stream.next().await, Some(Ok(_)));
2248
2249 sliding_sync.stop_sync().unwrap();
2250
2251 assert_matches!(stream.next().await, None);
2252
2253 let mut num_requests = 0;
2254
2255 for request in server.received_requests().await.unwrap() {
2256 if !SlidingSyncMatcher.matches(&request) {
2257 continue;
2258 }
2259
2260 let another_list_ranges = if num_requests == 0 {
2261 json!([[0, 10]])
2263 } else {
2264 json!([[10, 20]])
2266 };
2267
2268 num_requests += 1;
2269 assert!(num_requests <= 2, "more than one request hit the server");
2270
2271 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2272
2273 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2274 &json_value,
2275 &json!({
2276 "conn_id": "test",
2277 "lists": {
2278 "room-list": {
2279 "ranges": [[0, 9]],
2280 "required_state": [
2281 ["m.room.encryption", ""],
2282 ["m.room.tombstone", ""]
2283 ],
2284 },
2285 "another-list": {
2286 "ranges": another_list_ranges,
2287 "required_state": [
2288 ["m.room.encryption", ""],
2289 ["m.room.tombstone", ""]
2290 ],
2291 },
2292 }
2293 }),
2294 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2295 ) {
2296 dbg!(json_value);
2297 panic!("json differ: {err}");
2298 }
2299 }
2300
2301 assert_eq!(num_requests, 2);
2302
2303 Ok(())
2304 }
2305
2306 #[async_test]
2307 async fn test_timeout_zero_list() -> Result<()> {
2308 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2309
2310 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2311
2312 assert!(request.timeout.is_some());
2315
2316 Ok(())
2317 }
2318
2319 #[async_test]
2320 async fn test_timeout_one_list() -> Result<()> {
2321 let (_server, sliding_sync) = new_sliding_sync(vec![
2322 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2323 ])
2324 .await?;
2325
2326 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2327
2328 assert!(request.timeout.is_none());
2330
2331 {
2333 let server_response = assign!(http::Response::new("0".to_owned()), {
2334 lists: BTreeMap::from([(
2335 "foo".to_owned(),
2336 assign!(http::response::List::default(), {
2337 count: uint!(7),
2338 })
2339 )])
2340 });
2341
2342 let _summary = {
2343 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2344 sliding_sync
2345 .handle_response(
2346 server_response.clone(),
2347 &mut pos_guard,
2348 RequestedRequiredStates::default(),
2349 )
2350 .await?
2351 };
2352 }
2353
2354 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2355
2356 assert!(request.timeout.is_some());
2358
2359 Ok(())
2360 }
2361
2362 #[async_test]
2363 async fn test_timeout_three_lists() -> Result<()> {
2364 let (_server, sliding_sync) = new_sliding_sync(vec![
2365 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2366 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2367 SlidingSyncList::builder("baz")
2368 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2369 ])
2370 .await?;
2371
2372 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2373
2374 assert!(request.timeout.is_none());
2376
2377 {
2379 let server_response = assign!(http::Response::new("0".to_owned()), {
2380 lists: BTreeMap::from([(
2381 "foo".to_owned(),
2382 assign!(http::response::List::default(), {
2383 count: uint!(7),
2384 })
2385 )])
2386 });
2387
2388 let _summary = {
2389 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2390 sliding_sync
2391 .handle_response(
2392 server_response.clone(),
2393 &mut pos_guard,
2394 RequestedRequiredStates::default(),
2395 )
2396 .await?
2397 };
2398 }
2399
2400 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2401
2402 assert!(request.timeout.is_none());
2404
2405 {
2407 let server_response = assign!(http::Response::new("1".to_owned()), {
2408 lists: BTreeMap::from([(
2409 "bar".to_owned(),
2410 assign!(http::response::List::default(), {
2411 count: uint!(7),
2412 })
2413 )])
2414 });
2415
2416 let _summary = {
2417 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2418 sliding_sync
2419 .handle_response(
2420 server_response.clone(),
2421 &mut pos_guard,
2422 RequestedRequiredStates::default(),
2423 )
2424 .await?
2425 };
2426 }
2427
2428 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2429
2430 assert!(request.timeout.is_some());
2432
2433 Ok(())
2434 }
2435
2436 #[async_test]
2437 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2438 let server = MockServer::start().await;
2439 let client = logged_in_client(Some(server.uri())).await;
2440
2441 let _mock_guard = Mock::given(SlidingSyncMatcher)
2442 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2443 "pos": "0",
2444 "lists": {},
2445 "rooms": {}
2446 })))
2447 .mount_as_scoped(&server)
2448 .await;
2449
2450 let sliding_sync = client
2451 .sliding_sync("test")?
2452 .with_to_device_extension(
2453 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2454 )
2455 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2456 .build()
2457 .await?;
2458
2459 let sliding_sync = Arc::new(sliding_sync);
2460
2461 let sync_beat_listener = client.inner.sync_beat.listen();
2463 sliding_sync.sync_once().await?;
2464
2465 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2467 Ok(())
2468 }
2469
2470 #[async_test]
2471 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2472 let server = MockServer::start().await;
2473 let client = logged_in_client(Some(server.uri())).await;
2474
2475 let _mock_guard = Mock::given(SlidingSyncMatcher)
2476 .respond_with(ResponseTemplate::new(404))
2477 .mount_as_scoped(&server)
2478 .await;
2479
2480 let sliding_sync = client
2481 .sliding_sync("test")?
2482 .with_to_device_extension(
2483 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2484 )
2485 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2486 .build()
2487 .await?;
2488
2489 let sliding_sync = Arc::new(sliding_sync);
2490
2491 let sync_beat_listener = client.inner.sync_beat.listen();
2493 let sync_result = sliding_sync.sync_once().await;
2494 assert!(sync_result.is_err());
2495
2496 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2498
2499 Ok(())
2500 }
2501
2502 #[async_test]
2503 async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2504 let server = MatrixMockServer::new().await;
2505 let client = server.client_builder().build().await;
2506 let room_id = room_id!("!mu5hr00m:example.org");
2507
2508 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2509 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2510 "pos": "0",
2511 "lists": {},
2512 "extensions": {
2513 "account_data": {
2514 "global": [
2515 {
2516 "type": "m.direct",
2517 "content": {
2518 "@de4dlockh0lmes:example.org": [
2519 "!mu5hr00m:example.org"
2520 ]
2521 }
2522 }
2523 ]
2524 }
2525 },
2526 "rooms": {
2527 room_id: {
2528 "name": "Mario Bros Fanbase Room",
2529 "initial": true,
2530 },
2531 }
2532 })))
2533 .mount_as_scoped(server.server())
2534 .await;
2535
2536 let f = EventFactory::new().room(room_id);
2537
2538 Mock::given(method("GET"))
2539 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2540 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2541 "chunk": [
2542 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2543 ]
2544 })))
2545 .mount(server.server())
2546 .await;
2547
2548 let (tx, rx) = tokio::sync::oneshot::channel();
2549
2550 let tx = Arc::new(Mutex::new(Some(tx)));
2551 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2552 let members =
2554 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2555 assert_eq!(members.len(), 1);
2556 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2557 });
2558
2559 let sliding_sync = client
2560 .sliding_sync("test")?
2561 .add_list(SlidingSyncList::builder("thelist"))
2562 .with_account_data_extension(
2563 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2564 )
2565 .build()
2566 .await?;
2567
2568 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2569 .await
2570 .expect("Sync did not complete in time")
2571 .expect("Sync failed");
2572
2573 tokio::time::timeout(Duration::from_secs(5), rx)
2575 .await
2576 .expect("Event handler did not complete in time")
2577 .expect("Event handler failed");
2578
2579 Ok(())
2580 }
2581}