1#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23
24use std::{
25 collections::{BTreeMap, btree_map::Entry},
26 fmt::Debug,
27 future::Future,
28 sync::{Arc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard},
29 time::Duration,
30};
31
32use async_stream::stream;
33pub use client::{Version, VersionBuilder};
34use futures_core::stream::Stream;
35use matrix_sdk_base::RequestedRequiredStates;
36#[cfg(feature = "e2e-encryption")]
37use matrix_sdk_common::executor::JoinHandleExt as _;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40 OwnedRoomId, RoomId,
41 api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42 assign,
43};
44use tokio::{
45 select,
46 sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
47};
48use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
49
50pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
51use self::{cache::restore_sliding_sync_state, client::SlidingSyncResponseProcessor};
52use crate::{Client, Result, config::RequestConfig};
53
54#[derive(Clone, Debug)]
58pub struct SlidingSync {
59 inner: Arc<SlidingSyncInner>,
61}
62
63#[derive(Debug)]
64pub(super) struct SlidingSyncInner {
65 id: String,
69
70 client: Client,
72
73 poll_timeout: Duration,
75
76 network_timeout: Duration,
79
80 storage_key: String,
82
83 share_pos: bool,
90
91 position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
104
105 lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
107
108 room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, http::request::RoomSubscription>>,
111
112 extensions: http::request::Extensions,
115
116 internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122 pub(super) fn new(inner: SlidingSyncInner) -> Self {
123 Self { inner: Arc::new(inner) }
124 }
125
126 async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127 cache::store_sliding_sync_state(self, position).await
128 }
129
130 pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132 SlidingSyncBuilder::new(id, client)
133 }
134
135 pub fn subscribe_to_rooms(
142 &self,
143 room_ids: &[&RoomId],
144 settings: Option<http::request::RoomSubscription>,
145 cancel_in_flight_request: bool,
146 ) {
147 if subscribe_to_rooms(
148 self.inner.room_subscriptions.write().unwrap(),
149 &self.inner.client,
150 room_ids,
151 settings,
152 cancel_in_flight_request,
153 ) {
154 self.inner.internal_channel_send_if_possible(
155 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
156 );
157 }
158 }
159
160 pub fn unsubscribe_to_rooms(&self, room_ids: &[&RoomId], cancel_in_flight_request: bool) {
162 let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
163 let mut skip_over_current_sync_loop_iteration = false;
164
165 for room_id in room_ids {
166 if room_subscriptions.remove(*room_id).is_some() {
167 skip_over_current_sync_loop_iteration = true;
168 }
169 }
170
171 if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172 self.inner.internal_channel_send_if_possible(
173 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174 );
175 }
176 }
177
178 pub fn clear_and_subscribe_to_rooms(
183 &self,
184 room_ids: &[&RoomId],
185 settings: Option<http::request::RoomSubscription>,
186 cancel_in_flight_request: bool,
187 ) {
188 let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
189 room_subscriptions.clear();
190
191 if subscribe_to_rooms(
192 room_subscriptions,
193 &self.inner.client,
194 room_ids,
195 settings,
196 cancel_in_flight_request,
197 ) {
198 self.inner.internal_channel_send_if_possible(
199 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
200 );
201 }
202 }
203
204 pub async fn on_list<Function, FunctionOutput, R>(
206 &self,
207 list_name: &str,
208 function: Function,
209 ) -> Option<R>
210 where
211 Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
212 FunctionOutput: Future<Output = R>,
213 {
214 let lists = self.inner.lists.read().await;
215
216 match lists.get(list_name) {
217 Some(list) => Some(function(list).await),
218 None => None,
219 }
220 }
221
222 pub async fn add_list(
228 &self,
229 list_builder: SlidingSyncListBuilder,
230 ) -> Result<Option<SlidingSyncList>> {
231 let list = list_builder.build(self.inner.internal_channel.clone());
232
233 let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
234
235 self.inner.internal_channel_send_if_possible(
236 SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
237 );
238
239 Ok(old_list)
240 }
241
242 pub async fn add_cached_list(
249 &self,
250 mut list_builder: SlidingSyncListBuilder,
251 ) -> Result<Option<SlidingSyncList>> {
252 let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
253
254 list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
255
256 self.add_list(list_builder).await
257 }
258
259 #[instrument(skip_all)]
261 async fn handle_response(
262 &self,
263 mut sliding_sync_response: http::Response,
264 position: &mut SlidingSyncPositionMarkers,
265 requested_required_states: RequestedRequiredStates,
266 ) -> Result<UpdateSummary, crate::Error> {
267 let pos = Some(sliding_sync_response.pos.clone());
268
269 let must_process_rooms_response = self.must_process_rooms_response().await;
270
271 trace!(yes = must_process_rooms_response, "Must process rooms response?");
272
273 let sync_response = {
281 let _timer = timer!("response processor");
282
283 let response_processor = {
284 let _state_store_lock = {
287 let _timer = timer!("acquiring the `state_store_lock`");
288
289 self.inner.client.base_client().state_store_lock().lock().await
290 };
291
292 let mut response_processor =
293 SlidingSyncResponseProcessor::new(self.inner.client.clone());
294
295 if self.is_thread_subscriptions_enabled() {
301 response_processor
302 .handle_thread_subscriptions(
303 position.pos.as_deref(),
304 std::mem::take(
305 &mut sliding_sync_response.extensions.thread_subscriptions,
306 ),
307 )
308 .await?;
309 }
310
311 #[cfg(feature = "e2e-encryption")]
312 if self.is_e2ee_enabled() {
313 response_processor.handle_encryption(&sliding_sync_response.extensions).await?
314 }
315
316 if must_process_rooms_response {
319 response_processor
320 .handle_room_response(&sliding_sync_response, &requested_required_states)
321 .await?;
322 }
323
324 response_processor
325 };
326
327 response_processor.process_and_take_response().await?
329 };
330
331 debug!("Sliding Sync response has been handled by the client");
332 trace!(?sync_response);
333
334 let update_summary = {
335 let updated_rooms = {
337 let mut updated_rooms = Vec::with_capacity(
338 sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
339 );
340
341 updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
342
343 updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
351
352 updated_rooms
353 };
354
355 let updated_lists = {
357 debug!(
358 lists = ?sliding_sync_response.lists,
359 "Update lists"
360 );
361
362 let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
363 let mut lists = self.inner.lists.write().await;
364
365 for (name, list) in lists.iter_mut() {
368 if let Some(updates) = sliding_sync_response.lists.get(name) {
369 let maximum_number_of_rooms: u32 =
370 updates.count.try_into().expect("failed to convert `count` to `u32`");
371
372 if list.update(Some(maximum_number_of_rooms))? {
373 updated_lists.push(name.clone());
374 }
375 } else if list.update(None)? {
376 updated_lists.push(name.clone());
377 }
378 }
379
380 for name in sliding_sync_response.lists.keys() {
382 if !lists.contains_key(name) {
383 error!("Response for list `{name}` - unknown to us; skipping");
384 }
385 }
386
387 updated_lists
388 };
389
390 UpdateSummary { lists: updated_lists, rooms: updated_rooms }
391 };
392
393 debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
397
398 position.pos = pos;
399
400 Ok(update_summary)
401 }
402
403 async fn generate_sync_request(
404 &self,
405 ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
406 let mut requests_lists = BTreeMap::new();
408
409 let timeout = {
410 let lists = self.inner.lists.read().await;
411
412 let mut timeout = PollTimeout::Default;
414
415 for (name, list) in lists.iter() {
416 requests_lists.insert(name.clone(), list.next_request()?);
417 timeout = timeout.min(list.requires_timeout());
418 }
419
420 timeout
421 };
422
423 let mut position_guard = {
431 debug!("Waiting to acquire the `position` lock");
432
433 let _timer = timer!("acquiring the `position` lock");
434
435 self.inner.position.clone().lock_owned().await
436 };
437
438 debug!(pos = ?position_guard.pos, "Got a position");
439
440 let to_device_enabled = self.inner.extensions.to_device.enabled == Some(true);
441
442 let restored_fields = if self.inner.share_pos || to_device_enabled {
443 restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
444 } else {
445 None
446 };
447
448 let pos = if self.inner.share_pos {
451 if let Some(fields) = &restored_fields {
452 if fields.pos != position_guard.pos {
454 info!(
455 "Pos from previous request ('{:?}') was different from \
456 pos in database ('{:?}').",
457 position_guard.pos, fields.pos
458 );
459 position_guard.pos = fields.pos.clone();
460 }
461 fields.pos.clone()
462 } else {
463 position_guard.pos.clone()
464 }
465 } else {
466 position_guard.pos.clone()
467 };
468
469 #[cfg(feature = "e2e-encryption")]
478 if pos.is_none() && self.is_e2ee_enabled() {
479 info!("Marking all tracked users as dirty");
480
481 let olm_machine = self.inner.client.olm_machine().await;
482 let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
483 olm_machine.mark_all_tracked_users_as_dirty().await?;
484 }
485
486 let timeout = match timeout {
491 PollTimeout::None => None,
492 PollTimeout::Some(timeout) => Some(Duration::from_secs(timeout.into())),
493 PollTimeout::Default => Some(self.inner.poll_timeout),
494 };
495
496 Span::current()
497 .record("pos", &pos)
498 .record("timeout", timeout.map(|duration| duration.as_millis()));
499
500 let mut request = assign!(http::Request::new(), {
501 conn_id: Some(self.inner.id.clone()),
502 pos,
503 timeout,
504 lists: requests_lists,
505 });
506
507 request.room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
509
510 request.extensions = self.inner.extensions.clone();
512
513 if to_device_enabled {
515 request.extensions.to_device.since =
516 restored_fields.and_then(|fields| fields.to_device_token);
517 }
518
519 Ok((
520 request,
522 RequestConfig::default()
525 .timeout(self.inner.poll_timeout + self.inner.network_timeout)
526 .retry_limit(3),
527 position_guard,
528 ))
529 }
530
531 async fn send_sync_request(
535 &self,
536 request: http::Request,
537 request_config: RequestConfig,
538 mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
539 ) -> Result<UpdateSummary> {
540 debug!("Sending request");
541
542 let requested_required_states = RequestedRequiredStates::from(&request);
544 let request = self.inner.client.send(request).with_request_config(request_config);
545
546 #[cfg(feature = "e2e-encryption")]
553 let response = {
554 if self.is_e2ee_enabled() {
555 let client = self.inner.client.clone();
572 let e2ee_uploads = spawn(
573 async move {
574 if let Err(error) = client.send_outgoing_requests().await {
575 error!(?error, "Error while sending outgoing E2EE requests");
576 }
577 }
578 .instrument(Span::current()),
579 )
580 .abort_on_drop();
583
584 let response = request.await?;
586
587 e2ee_uploads.await.map_err(|error| Error::JoinError {
592 task_description: "e2ee_uploads".to_owned(),
593 error,
594 })?;
595
596 response
597 } else {
598 request.await?
599 }
600 };
601
602 #[cfg(not(feature = "e2e-encryption"))]
604 let response = request.await?;
605
606 debug!("Received response");
607
608 let this = self.clone();
618
619 let future = async move {
622 debug!("Start handling response");
623
624 let updates = this
630 .handle_response(response, &mut position_guard, requested_required_states)
631 .await?;
632
633 this.cache_to_storage(&position_guard).await?;
634
635 drop(position_guard);
638
639 debug!("Done handling response");
640
641 Ok(updates)
642 };
643
644 spawn(future.instrument(Span::current())).await.map_err(|error| Error::JoinError {
645 task_description: "handle_response".to_owned(),
646 error,
647 })?
648 }
649
650 #[cfg(feature = "e2e-encryption")]
652 fn is_e2ee_enabled(&self) -> bool {
653 self.inner.extensions.e2ee.enabled == Some(true)
654 }
655
656 fn is_thread_subscriptions_enabled(&self) -> bool {
659 self.inner.extensions.thread_subscriptions.enabled == Some(true)
660 }
661
662 #[cfg(not(feature = "e2e-encryption"))]
663 fn is_e2ee_enabled(&self) -> bool {
664 false
665 }
666
667 async fn must_process_rooms_response(&self) -> bool {
669 !self.inner.room_subscriptions.read().unwrap().is_empty()
672 || !self.inner.lists.read().await.is_empty()
673 }
674
675 #[doc(hidden)]
679 #[instrument(skip_all, fields(conn_id = self.inner.id, pos, timeout))]
680 pub async fn sync_once(&self) -> Result<UpdateSummary> {
681 let (request, request_config, position_guard) = self.generate_sync_request().await?;
682
683 let summaries = self.send_sync_request(request, request_config, position_guard).await?;
685
686 self.inner.client.inner.sync_beat.notify(usize::MAX);
688
689 Ok(summaries)
690 }
691
692 #[allow(unknown_lints, clippy::let_with_type_underscore)] #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
702 pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
703 debug!("Starting sync stream");
704
705 let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
706
707 stream! {
708 loop {
709 debug!("Sync stream is running");
710
711 select! {
712 biased;
713
714 internal_message = internal_channel_receiver.recv() => {
715 use SlidingSyncInternalMessage::*;
716
717 debug!(?internal_message, "Sync stream has received an internal message");
718
719 match internal_message {
720 Err(_) | Ok(SyncLoopStop) => {
721 break;
722 }
723
724 Ok(SyncLoopSkipOverCurrentIteration) => {
725 continue;
726 }
727 }
728 }
729
730 update_summary = self.sync_once() => {
731 match update_summary {
732 Ok(updates) => {
733 yield Ok(updates);
734 }
735
736 Err(error) => {
738 if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
739 self.expire_session().await;
741 }
742
743 yield Err(error);
744
745 break;
747 }
748 }
749 }
750 }
751 }
752
753 debug!("Sync stream has exited.");
754 }
755 }
756
757 pub fn stop_sync(&self) -> Result<()> {
766 Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
767 }
768
769 #[doc(hidden)]
780 pub async fn expire_session(&self) {
781 info!("Session expired; resetting `pos`");
782
783 {
784 let lists = self.inner.lists.read().await;
785
786 for list in lists.values() {
787 list.set_maximum_number_of_rooms(None);
789 }
790 }
791
792 {
794 let mut position = self.inner.position.lock().await;
795
796 position.pos = None;
798
799 if let Err(err) = self.cache_to_storage(&position).await {
803 warn!("Failed to invalidate cached sliding sync state: {err}");
804 }
805 }
806
807 {
808 self.inner.room_subscriptions.write().unwrap().clear();
811 }
812 }
813}
814
815fn subscribe_to_rooms(
818 mut room_subscriptions: StdRwLockWriteGuard<
819 '_,
820 BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
821 >,
822 client: &Client,
823 room_ids: &[&RoomId],
824 settings: Option<http::request::RoomSubscription>,
825 cancel_in_flight_request: bool,
826) -> bool {
827 let settings = settings.unwrap_or_default();
828 let mut skip_over_current_sync_loop_iteration = false;
829
830 for room_id in room_ids {
831 if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
832 if let Some(room) = client.get_room(room_id) {
833 room.mark_members_missing();
834 }
835
836 entry.insert(settings.clone());
837
838 skip_over_current_sync_loop_iteration = true;
839 }
840 }
841
842 cancel_in_flight_request && skip_over_current_sync_loop_iteration
843}
844
845impl SlidingSyncInner {
846 #[instrument]
848 fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
849 self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
850 }
851
852 #[instrument]
855 fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
856 let _ = self.internal_channel.send(message);
858 }
859}
860
861#[derive(Copy, Clone, Debug, PartialEq)]
862enum SlidingSyncInternalMessage {
863 SyncLoopStop,
865
866 SyncLoopSkipOverCurrentIteration,
869}
870
871#[cfg(any(test, feature = "testing"))]
872impl SlidingSync {
873 pub async fn set_pos(&self, new_pos: String) {
875 let mut position_lock = self.inner.position.lock().await;
876 position_lock.pos = Some(new_pos);
877 }
878}
879
880#[derive(Clone, Debug)]
881pub(super) struct SlidingSyncPositionMarkers {
882 pos: Option<String>,
885}
886
887#[derive(Debug, Clone)]
890pub struct UpdateSummary {
891 pub lists: Vec<String>,
893 pub rooms: Vec<OwnedRoomId>,
895}
896
897#[derive(Debug)]
906pub enum PollTimeout {
907 None,
909
910 Some(u32),
913
914 Default,
917}
918
919impl PollTimeout {
920 fn min(self, left: Self) -> Self {
931 match (self, left) {
932 (Self::None, _) => Self::None,
933
934 (Self::Some(_), Self::None) => Self::None,
935 (Self::Some(right), Self::Some(left)) => Self::Some(right.min(left)),
936 (Self::Some(right), Self::Default) => Self::Some(right),
937
938 (Self::Default, Self::None) => Self::None,
939 (Self::Default, Self::Some(left)) => Self::Some(left),
940 (Self::Default, Self::Default) => Self::Default,
941 }
942 }
943}
944
945#[cfg(all(test, not(target_family = "wasm")))]
946#[allow(clippy::dbg_macro)]
947mod tests {
948 use std::{
949 collections::BTreeMap,
950 future::ready,
951 ops::Not,
952 sync::{Arc, Mutex},
953 time::Duration,
954 };
955
956 use assert_matches::assert_matches;
957 use event_listener::Listener;
958 use futures_util::{StreamExt, future::join_all, pin_mut};
959 use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
960 use matrix_sdk_common::executor::spawn;
961 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
962 use ruma::{
963 OwnedRoomId, assign,
964 events::{direct::DirectEvent, room::member::MembershipState},
965 owned_room_id, room_id,
966 serde::Raw,
967 uint,
968 };
969 use serde::Deserialize;
970 use serde_json::json;
971 use wiremock::{
972 Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
973 };
974
975 use super::{
976 SlidingSync, SlidingSyncBuilder, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
977 cache::restore_sliding_sync_state, http,
978 };
979 use crate::{
980 Client, Result,
981 test_utils::{logged_in_client, mocks::MatrixMockServer},
982 };
983
984 #[derive(Copy, Clone)]
985 struct SlidingSyncMatcher;
986
987 impl Match for SlidingSyncMatcher {
988 fn matches(&self, request: &Request) -> bool {
989 request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
990 && request.method == Method::POST
991 }
992 }
993
994 async fn new_sliding_sync(
995 lists: Vec<SlidingSyncListBuilder>,
996 ) -> Result<(MockServer, SlidingSync)> {
997 let server = MockServer::start().await;
998 let client = logged_in_client(Some(server.uri())).await;
999
1000 let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1001
1002 for list in lists {
1003 sliding_sync_builder = sliding_sync_builder.add_list(list);
1004 }
1005
1006 let sliding_sync = sliding_sync_builder.build().await?;
1007
1008 Ok((server, sliding_sync))
1009 }
1010
1011 #[async_test]
1012 async fn test_subscribe_to_rooms() -> Result<()> {
1013 let (server, sliding_sync) = new_sliding_sync(vec![
1014 SlidingSyncList::builder("foo")
1015 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1016 ])
1017 .await?;
1018
1019 let stream = sliding_sync.sync();
1020 pin_mut!(stream);
1021
1022 let room_id_0 = room_id!("!r0:bar.org");
1023 let room_id_1 = room_id!("!r1:bar.org");
1024 let room_id_2 = room_id!("!r2:bar.org");
1025
1026 {
1027 let _mock_guard = Mock::given(SlidingSyncMatcher)
1028 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1029 "pos": "1",
1030 "lists": {},
1031 "rooms": {
1032 room_id_0: {
1033 "name": "Room #0",
1034 "initial": true,
1035 },
1036 room_id_1: {
1037 "name": "Room #1",
1038 "initial": true,
1039 },
1040 room_id_2: {
1041 "name": "Room #2",
1042 "initial": true,
1043 },
1044 }
1045 })))
1046 .mount_as_scoped(&server)
1047 .await;
1048
1049 let _ = stream.next().await.unwrap()?;
1050 }
1051
1052 let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1053
1054 assert!(room0.are_members_synced().not());
1058
1059 {
1060 struct MemberMatcher(OwnedRoomId);
1061
1062 impl Match for MemberMatcher {
1063 fn matches(&self, request: &Request) -> bool {
1064 request.url.path()
1065 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1066 && request.method == Method::GET
1067 }
1068 }
1069
1070 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1071 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1072 "chunk": [],
1073 })))
1074 .mount_as_scoped(&server)
1075 .await;
1076
1077 assert_matches!(room0.request_members().await, Ok(()));
1078 }
1079
1080 assert!(room0.are_members_synced());
1082
1083 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1084
1085 assert!(room0.are_members_synced().not());
1088
1089 {
1090 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1091
1092 assert!(room_subscriptions.contains_key(room_id_0));
1093 assert!(room_subscriptions.contains_key(room_id_1));
1094 assert!(!room_subscriptions.contains_key(room_id_2));
1095 }
1096
1097 {
1100 struct MemberMatcher(OwnedRoomId);
1101
1102 impl Match for MemberMatcher {
1103 fn matches(&self, request: &Request) -> bool {
1104 request.url.path()
1105 == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1106 && request.method == Method::GET
1107 }
1108 }
1109
1110 let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1111 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1112 "chunk": [],
1113 })))
1114 .mount_as_scoped(&server)
1115 .await;
1116
1117 assert_matches!(room0.request_members().await, Ok(()));
1118 }
1119
1120 assert!(room0.are_members_synced());
1122
1123 sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1124
1125 assert!(room0.are_members_synced());
1128
1129 Ok(())
1130 }
1131
1132 #[async_test]
1133 async fn test_subscribe_unsubscribe_and_clear_and_subscribe_to_rooms() -> Result<()> {
1134 let (_server, sliding_sync) = new_sliding_sync(vec![
1135 SlidingSyncList::builder("foo")
1136 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1137 ])
1138 .await?;
1139
1140 let room_id_0 = room_id!("!r0:bar.org");
1141 let room_id_1 = room_id!("!r1:bar.org");
1142 let room_id_2 = room_id!("!r2:bar.org");
1143 let room_id_3 = room_id!("!r3:bar.org");
1144
1145 {
1147 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1148
1149 assert!(room_subscriptions.is_empty());
1150 }
1151
1152 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1154
1155 {
1156 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1157
1158 assert_eq!(room_subscriptions.len(), 2);
1159 assert!(room_subscriptions.contains_key(room_id_0));
1160 assert!(room_subscriptions.contains_key(room_id_1));
1161 }
1162
1163 sliding_sync.unsubscribe_to_rooms(&[room_id_0], false);
1165
1166 {
1167 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1168
1169 assert_eq!(room_subscriptions.len(), 1);
1170 assert!(room_subscriptions.contains_key(room_id_1));
1171 }
1172
1173 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1175
1176 {
1177 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1178
1179 assert_eq!(room_subscriptions.len(), 2);
1180 assert!(room_subscriptions.contains_key(room_id_0));
1181 assert!(room_subscriptions.contains_key(room_id_1));
1182 }
1183
1184 sliding_sync.clear_and_subscribe_to_rooms(
1186 &[room_id_2, room_id_3],
1187 Default::default(),
1188 false,
1189 );
1190
1191 {
1192 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1193
1194 assert_eq!(room_subscriptions.len(), 2);
1195 assert!(room_subscriptions.contains_key(room_id_2));
1196 assert!(room_subscriptions.contains_key(room_id_3));
1197 }
1198
1199 Ok(())
1200 }
1201
1202 #[async_test]
1203 async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1204 let (_server, sliding_sync) = new_sliding_sync(vec![
1205 SlidingSyncList::builder("foo")
1206 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1207 ])
1208 .await?;
1209
1210 let room_id_0 = room_id!("!r0:bar.org");
1211 let room_id_1 = room_id!("!r1:bar.org");
1212 let room_id_2 = room_id!("!r2:bar.org");
1213
1214 sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1216
1217 {
1218 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1219
1220 assert!(room_subscriptions.contains_key(room_id_0));
1221 assert!(room_subscriptions.contains_key(room_id_1));
1222 assert!(room_subscriptions.contains_key(room_id_2).not());
1223 }
1224
1225 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1227
1228 {
1229 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1230
1231 assert!(room_subscriptions.contains_key(room_id_0));
1232 assert!(room_subscriptions.contains_key(room_id_1));
1233 assert!(room_subscriptions.contains_key(room_id_2));
1234 }
1235
1236 sliding_sync.expire_session().await;
1238
1239 {
1240 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1241
1242 assert!(room_subscriptions.is_empty());
1243 }
1244
1245 sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1247
1248 {
1249 let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1250
1251 assert!(room_subscriptions.contains_key(room_id_0).not());
1252 assert!(room_subscriptions.contains_key(room_id_1).not());
1253 assert!(room_subscriptions.contains_key(room_id_2));
1254 }
1255
1256 Ok(())
1257 }
1258
1259 #[async_test]
1260 async fn test_add_list() -> Result<()> {
1261 let (_server, sliding_sync) = new_sliding_sync(vec![
1262 SlidingSyncList::builder("foo")
1263 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1264 ])
1265 .await?;
1266
1267 let _stream = sliding_sync.sync();
1268 pin_mut!(_stream);
1269
1270 sliding_sync
1271 .add_list(
1272 SlidingSyncList::builder("bar")
1273 .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1274 )
1275 .await?;
1276
1277 let lists = sliding_sync.inner.lists.read().await;
1278
1279 assert!(lists.contains_key("foo"));
1280 assert!(lists.contains_key("bar"));
1281
1282 Ok(())
1285 }
1286
1287 #[cfg(feature = "e2e-encryption")]
1288 #[async_test]
1289 async fn test_extensions_to_device_since_is_set() {
1290 use matrix_sdk_base::crypto::store::types::Changes;
1291
1292 let client = logged_in_client(None).await;
1293 let sliding_sync = SlidingSyncBuilder::new("foo".to_owned(), client.clone())
1294 .unwrap()
1295 .with_to_device_extension(assign!(
1296 http::request::ToDevice::default(),
1297 {
1298 enabled: Some(true),
1299 }
1300 ))
1301 .build()
1302 .await
1303 .unwrap();
1304
1305 {
1307 let to_device = &sliding_sync.inner.extensions.to_device;
1308
1309 assert_eq!(to_device.enabled, Some(true));
1310 assert!(to_device.since.is_none());
1311 }
1312
1313 {
1315 let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1316
1317 let to_device = &request.extensions.to_device;
1318
1319 assert_eq!(to_device.enabled, Some(true));
1320 assert!(to_device.since.is_none());
1321 }
1322
1323 let since_token = "depuis".to_owned();
1325
1326 {
1327 if let Some(olm_machine) = &*client.olm_machine().await {
1328 olm_machine
1329 .store()
1330 .save_changes(Changes {
1331 next_batch_token: Some(since_token.clone()),
1332 ..Default::default()
1333 })
1334 .await
1335 .unwrap();
1336 } else {
1337 panic!("Where is the Olm machine?");
1338 }
1339 }
1340
1341 {
1343 let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1344
1345 let to_device = &request.extensions.to_device;
1346
1347 assert_eq!(to_device.enabled, Some(true));
1348 assert_eq!(to_device.since, Some(since_token));
1349 }
1350 }
1351
1352 #[async_test]
1358 #[cfg(feature = "e2e-encryption")]
1359 async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1360 use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1361 use matrix_sdk_test::ruma_response_from_json;
1362 use ruma::user_id;
1363
1364 let server = MockServer::start().await;
1365 let client = logged_in_client(Some(server.uri())).await;
1366
1367 let alice = user_id!("@alice:localhost");
1368 let bob = user_id!("@bob:localhost");
1369 let me = user_id!("@example:localhost");
1370
1371 {
1374 let olm_machine = client.olm_machine().await;
1375 let olm_machine = olm_machine.as_ref().unwrap();
1376
1377 olm_machine.update_tracked_users([alice, bob]).await?;
1378
1379 let outgoing_requests = olm_machine.outgoing_requests().await?;
1381
1382 assert_eq!(outgoing_requests.len(), 2);
1383 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1384 assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1385
1386 olm_machine
1388 .mark_request_as_sent(
1389 outgoing_requests[0].request_id(),
1390 AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1391 "one_time_key_counts": {}
1392 }))),
1393 )
1394 .await?;
1395
1396 olm_machine
1397 .mark_request_as_sent(
1398 outgoing_requests[1].request_id(),
1399 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1400 "device_keys": {
1401 alice: {},
1402 bob: {},
1403 }
1404 }))),
1405 )
1406 .await?;
1407
1408 let outgoing_requests = olm_machine.outgoing_requests().await?;
1410
1411 assert_eq!(outgoing_requests.len(), 1);
1412 assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1413
1414 olm_machine
1415 .mark_request_as_sent(
1416 outgoing_requests[0].request_id(),
1417 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1418 "device_keys": {
1419 me: {},
1420 }
1421 }))),
1422 )
1423 .await?;
1424
1425 let outgoing_requests = olm_machine.outgoing_requests().await?;
1427
1428 assert!(outgoing_requests.is_empty());
1429 }
1430
1431 let sync = client
1432 .sliding_sync("test-slidingsync")?
1433 .add_list(SlidingSyncList::builder("new_list"))
1434 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1435 .build()
1436 .await?;
1437
1438 let (_request, _, _) = sync.generate_sync_request().await?;
1440
1441 {
1443 let olm_machine = client.olm_machine().await;
1444 let olm_machine = olm_machine.as_ref().unwrap();
1445
1446 let outgoing_requests = olm_machine.outgoing_requests().await?;
1448
1449 assert_eq!(outgoing_requests.len(), 1);
1450 assert_matches!(
1451 outgoing_requests[0].request(),
1452 AnyOutgoingRequest::KeysQuery(request) => {
1453 assert!(request.device_keys.contains_key(alice));
1454 assert!(request.device_keys.contains_key(bob));
1455 assert!(request.device_keys.contains_key(me));
1456 }
1457 );
1458
1459 olm_machine
1461 .mark_request_as_sent(
1462 outgoing_requests[0].request_id(),
1463 AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1464 "device_keys": {
1465 alice: {},
1466 bob: {},
1467 me: {},
1468 }
1469 }))),
1470 )
1471 .await?;
1472 }
1473
1474 sync.set_pos("chocolat".to_owned()).await;
1476
1477 let (_request, _, _) = sync.generate_sync_request().await?;
1478
1479 {
1481 let olm_machine = client.olm_machine().await;
1482 let olm_machine = olm_machine.as_ref().unwrap();
1483
1484 let outgoing_requests = olm_machine.outgoing_requests().await?;
1486
1487 assert!(outgoing_requests.is_empty());
1488 }
1489
1490 Ok(())
1491 }
1492
1493 #[cfg(feature = "e2e-encryption")]
1494 #[async_test]
1495 async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1496 let server = MockServer::start().await;
1497
1498 #[derive(Deserialize)]
1499 struct PartialRequest {
1500 txn_id: Option<String>,
1501 }
1502
1503 let server_pos = Arc::new(Mutex::new(0));
1504 let _mock_guard = Mock::given(SlidingSyncMatcher)
1505 .respond_with(move |request: &Request| {
1506 let request: PartialRequest = request.body_json().unwrap();
1508 let pos = {
1509 let mut pos = server_pos.lock().unwrap();
1510 let prev = *pos;
1511 *pos += 1;
1512 prev
1513 };
1514
1515 ResponseTemplate::new(200).set_body_json(json!({
1516 "txn_id": request.txn_id,
1517 "pos": pos.to_string(),
1518 }))
1519 })
1520 .mount_as_scoped(&server)
1521 .await;
1522
1523 let client = logged_in_client(Some(server.uri())).await;
1524
1525 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1526
1527 {
1529 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1530
1531 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1532 assert!(request.pos.is_none());
1533 }
1534
1535 let sync = sliding_sync.sync();
1536 pin_mut!(sync);
1537
1538 let next = sync.next().await;
1541 assert_matches!(next, Some(Ok(_update_summary)));
1542
1543 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1544
1545 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1546 .await?
1547 .expect("must have restored fields");
1548
1549 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1552
1553 {
1557 let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1558
1559 let mut position_guard = other_sync.inner.position.lock().await;
1560 position_guard.pos = Some("yolo".to_owned());
1561
1562 other_sync.cache_to_storage(&position_guard).await?;
1563 }
1564
1565 {
1567 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1568 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1569 assert_eq!(request.pos.as_deref(), Some("0"));
1570 }
1571
1572 {
1575 let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1576 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1577 }
1578
1579 Ok(())
1580 }
1581
1582 #[cfg(feature = "e2e-encryption")]
1583 #[async_test]
1584 async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1585 let server = MockServer::start().await;
1586
1587 #[derive(Deserialize)]
1588 struct PartialRequest {
1589 txn_id: Option<String>,
1590 }
1591
1592 let server_pos = Arc::new(Mutex::new(0));
1593 let _mock_guard = Mock::given(SlidingSyncMatcher)
1594 .respond_with(move |request: &Request| {
1595 let request: PartialRequest = request.body_json().unwrap();
1597 let pos = {
1598 let mut pos = server_pos.lock().unwrap();
1599 let prev = *pos;
1600 *pos += 1;
1601 prev
1602 };
1603
1604 ResponseTemplate::new(200).set_body_json(json!({
1605 "txn_id": request.txn_id,
1606 "pos": pos.to_string(),
1607 }))
1608 })
1609 .mount_as_scoped(&server)
1610 .await;
1611
1612 let client = logged_in_client(Some(server.uri())).await;
1613
1614 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1615
1616 {
1618 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1619
1620 assert!(request.pos.is_none());
1621 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1622 }
1623
1624 let sync = sliding_sync.sync();
1625 pin_mut!(sync);
1626
1627 let next = sync.next().await;
1630 assert_matches!(next, Some(Ok(_update_summary)));
1631
1632 assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1633
1634 let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1635 .await?
1636 .expect("must have restored fields");
1637
1638 assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1641
1642 {
1644 let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1645
1646 let mut position_guard = other_sync.inner.position.lock().await;
1647 position_guard.pos = Some("42".to_owned());
1648
1649 other_sync.cache_to_storage(&position_guard).await?;
1650 }
1651
1652 {
1654 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1655 assert_eq!(request.pos.as_deref(), Some("42"));
1656 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1657 }
1658
1659 {
1661 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1662 assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1663
1664 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1665 assert_eq!(request.pos.as_deref(), Some("42"));
1666 }
1667
1668 sliding_sync.expire_session().await;
1671
1672 {
1673 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1674
1675 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1676 assert!(request.pos.is_none());
1677 }
1678
1679 {
1681 let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1682 assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1683
1684 let (request, _, _) = sliding_sync.generate_sync_request().await?;
1685 assert!(request.pos.is_none());
1686 }
1687
1688 Ok(())
1689 }
1690
1691 #[async_test]
1692 async fn test_stop_sync_loop() -> Result<()> {
1693 let (_server, sliding_sync) = new_sliding_sync(vec![
1694 SlidingSyncList::builder("foo")
1695 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1696 ])
1697 .await?;
1698
1699 let stream = sliding_sync.sync();
1701 pin_mut!(stream);
1702
1703 assert!(stream.next().await.is_some());
1705
1706 sliding_sync.stop_sync()?;
1708
1709 assert!(stream.next().await.is_none());
1711
1712 let stream = sliding_sync.sync();
1714 pin_mut!(stream);
1715
1716 assert!(stream.next().await.is_some());
1718
1719 Ok(())
1720 }
1721
1722 #[async_test]
1723 async fn test_process_read_receipts() -> Result<()> {
1724 let room = owned_room_id!("!pony:example.org");
1725
1726 let server = MockServer::start().await;
1727 let client = logged_in_client(Some(server.uri())).await;
1728 client.event_cache().subscribe().unwrap();
1729
1730 let sliding_sync = client
1731 .sliding_sync("test")?
1732 .with_receipt_extension(
1733 assign!(http::request::Receipts::default(), { enabled: Some(true) }),
1734 )
1735 .add_list(
1736 SlidingSyncList::builder("all")
1737 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1738 )
1739 .build()
1740 .await?;
1741
1742 {
1744 let server_response = assign!(http::Response::new("0".to_owned()), {
1745 rooms: BTreeMap::from([(
1746 room.clone(),
1747 http::response::Room::default(),
1748 )])
1749 });
1750
1751 let _summary = {
1752 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1753 sliding_sync
1754 .handle_response(
1755 server_response.clone(),
1756 &mut pos_guard,
1757 RequestedRequiredStates::default(),
1758 )
1759 .await?
1760 };
1761 }
1762
1763 let server_response = assign!(http::Response::new("1".to_owned()), {
1764 extensions: assign!(http::response::Extensions::default(), {
1765 receipts: assign!(http::response::Receipts::default(), {
1766 rooms: BTreeMap::from([
1767 (
1768 room.clone(),
1769 Raw::from_json_string(
1770 json!({
1771 "room_id": room,
1772 "type": "m.receipt",
1773 "content": {
1774 "$event:bar.org": {
1775 "m.read": {
1776 client.user_id().unwrap(): {
1777 "ts": 1436451550,
1778 }
1779 }
1780 }
1781 }
1782 })
1783 .to_string(),
1784 ).unwrap()
1785 )
1786 ])
1787 })
1788 })
1789 });
1790
1791 let summary = {
1792 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1793 sliding_sync
1794 .handle_response(
1795 server_response.clone(),
1796 &mut pos_guard,
1797 RequestedRequiredStates::default(),
1798 )
1799 .await?
1800 };
1801
1802 assert!(summary.rooms.contains(&room));
1803
1804 Ok(())
1805 }
1806
1807 #[async_test]
1808 async fn test_process_marked_unread_room_account_data() -> Result<()> {
1809 let room_id = owned_room_id!("!unicorn:example.org");
1810
1811 let server = MockServer::start().await;
1812 let client = logged_in_client(Some(server.uri())).await;
1813
1814 let sliding_sync = client
1817 .sliding_sync("test")?
1818 .with_account_data_extension(
1819 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1820 )
1821 .add_list(
1822 SlidingSyncList::builder("all")
1823 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1824 )
1825 .build()
1826 .await?;
1827
1828 {
1830 let server_response = assign!(http::Response::new("0".to_owned()), {
1831 rooms: BTreeMap::from([(
1832 room_id.clone(),
1833 http::response::Room::default(),
1834 )])
1835 });
1836
1837 let _summary = {
1838 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1839 sliding_sync
1840 .handle_response(
1841 server_response.clone(),
1842 &mut pos_guard,
1843 RequestedRequiredStates::default(),
1844 )
1845 .await?
1846 };
1847 }
1848
1849 let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
1853
1854 let update_summary = {
1855 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1856 sliding_sync
1857 .handle_response(
1858 server_response.clone(),
1859 &mut pos_guard,
1860 RequestedRequiredStates::default(),
1861 )
1862 .await?
1863 };
1864
1865 assert!(update_summary.rooms.contains(&room_id));
1868
1869 let room = client.get_room(&room_id).unwrap();
1870
1871 assert!(room.is_marked_unread());
1874
1875 let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
1878
1879 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1880 sliding_sync
1881 .handle_response(
1882 server_response.clone(),
1883 &mut pos_guard,
1884 RequestedRequiredStates::default(),
1885 )
1886 .await?;
1887
1888 let room = client.get_room(&room_id).unwrap();
1889
1890 assert!(!room.is_marked_unread());
1891
1892 Ok(())
1893 }
1894
1895 fn make_mark_unread_response(
1896 response_number: &str,
1897 room_id: OwnedRoomId,
1898 unread: bool,
1899 add_rooms_section: bool,
1900 ) -> http::Response {
1901 let rooms = if add_rooms_section {
1902 BTreeMap::from([(room_id.clone(), http::response::Room::default())])
1903 } else {
1904 BTreeMap::new()
1905 };
1906
1907 let extensions = assign!(http::response::Extensions::default(), {
1908 account_data: assign!(http::response::AccountData::default(), {
1909 rooms: BTreeMap::from([
1910 (
1911 room_id,
1912 vec![
1913 Raw::from_json_string(
1914 json!({
1915 "content": {
1916 "unread": unread
1917 },
1918 "type": "m.marked_unread"
1919 })
1920 .to_string(),
1921 ).unwrap()
1922 ]
1923 )
1924 ])
1925 })
1926 });
1927
1928 assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
1929 }
1930
1931 #[async_test]
1932 async fn test_process_rooms_account_data() -> Result<()> {
1933 let room = owned_room_id!("!pony:example.org");
1934
1935 let server = MockServer::start().await;
1936 let client = logged_in_client(Some(server.uri())).await;
1937
1938 let sliding_sync = client
1939 .sliding_sync("test")?
1940 .with_account_data_extension(
1941 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1942 )
1943 .add_list(
1944 SlidingSyncList::builder("all")
1945 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1946 )
1947 .build()
1948 .await?;
1949
1950 {
1952 let server_response = assign!(http::Response::new("0".to_owned()), {
1953 rooms: BTreeMap::from([(
1954 room.clone(),
1955 http::response::Room::default(),
1956 )])
1957 });
1958
1959 let _summary = {
1960 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1961 sliding_sync
1962 .handle_response(
1963 server_response.clone(),
1964 &mut pos_guard,
1965 RequestedRequiredStates::default(),
1966 )
1967 .await?
1968 };
1969 }
1970
1971 let server_response = assign!(http::Response::new("1".to_owned()), {
1972 extensions: assign!(http::response::Extensions::default(), {
1973 account_data: assign!(http::response::AccountData::default(), {
1974 rooms: BTreeMap::from([
1975 (
1976 room.clone(),
1977 vec![
1978 Raw::from_json_string(
1979 json!({
1980 "content": {
1981 "tags": {
1982 "u.work": {
1983 "order": 0.9
1984 }
1985 }
1986 },
1987 "type": "m.tag"
1988 })
1989 .to_string(),
1990 ).unwrap()
1991 ]
1992 )
1993 ])
1994 })
1995 })
1996 });
1997 let summary = {
1998 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1999 sliding_sync
2000 .handle_response(
2001 server_response.clone(),
2002 &mut pos_guard,
2003 RequestedRequiredStates::default(),
2004 )
2005 .await?
2006 };
2007
2008 assert!(summary.rooms.contains(&room));
2009
2010 Ok(())
2011 }
2012
2013 #[async_test]
2014 #[cfg(feature = "e2e-encryption")]
2015 async fn test_process_only_encryption_events() -> Result<()> {
2016 use ruma::OneTimeKeyAlgorithm;
2017
2018 let room = owned_room_id!("!croissant:example.org");
2019
2020 let server = MockServer::start().await;
2021 let client = logged_in_client(Some(server.uri())).await;
2022
2023 let server_response = assign!(http::Response::new("0".to_owned()), {
2024 rooms: BTreeMap::from([(
2025 room.clone(),
2026 assign!(http::response::Room::default(), {
2027 name: Some("Croissants lovers".to_owned()),
2028 timeline: Vec::new(),
2029 }),
2030 )]),
2031
2032 extensions: assign!(http::response::Extensions::default(), {
2033 e2ee: assign!(http::response::E2EE::default(), {
2034 device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2035 }),
2036 to_device: Some(assign!(http::response::ToDevice::default(), {
2037 next_batch: "to-device-token".to_owned(),
2038 })),
2039 })
2040 });
2041
2042 let sliding_sync = client
2046 .sliding_sync("test")?
2047 .with_to_device_extension(
2048 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2049 )
2050 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2051 .build()
2052 .await?;
2053
2054 {
2055 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2056
2057 sliding_sync
2058 .handle_response(
2059 server_response.clone(),
2060 &mut position_guard,
2061 RequestedRequiredStates::default(),
2062 )
2063 .await?;
2064 }
2065
2066 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2068 assert_eq!(uploaded_key_count, 42);
2069
2070 {
2071 let olm_machine = &*client.olm_machine_for_testing().await;
2072 assert_eq!(
2073 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2074 Some("to-device-token")
2075 );
2076 }
2077
2078 assert!(client.get_room(&room).is_none());
2080
2081 let client = logged_in_client(Some(server.uri())).await;
2084
2085 let sliding_sync = client
2086 .sliding_sync("test")?
2087 .add_list(SlidingSyncList::builder("thelist"))
2088 .build()
2089 .await?;
2090
2091 {
2092 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2093
2094 sliding_sync
2095 .handle_response(
2096 server_response.clone(),
2097 &mut position_guard,
2098 RequestedRequiredStates::default(),
2099 )
2100 .await?;
2101 }
2102
2103 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2105 assert_eq!(uploaded_key_count, 0);
2106
2107 {
2108 let olm_machine = &*client.olm_machine_for_testing().await;
2109 assert_eq!(
2110 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2111 None
2112 );
2113 }
2114
2115 assert!(client.get_room(&room).is_some());
2117
2118 let client = logged_in_client(Some(server.uri())).await;
2120
2121 let sliding_sync = client
2122 .sliding_sync("test")?
2123 .add_list(SlidingSyncList::builder("thelist"))
2124 .with_to_device_extension(
2125 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2126 )
2127 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2128 .build()
2129 .await?;
2130
2131 {
2132 let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2133
2134 sliding_sync
2135 .handle_response(
2136 server_response.clone(),
2137 &mut position_guard,
2138 RequestedRequiredStates::default(),
2139 )
2140 .await?;
2141 }
2142
2143 let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2145 assert_eq!(uploaded_key_count, 42);
2146
2147 {
2148 let olm_machine = &*client.olm_machine_for_testing().await;
2149 assert_eq!(
2150 olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2151 Some("to-device-token")
2152 );
2153 }
2154
2155 assert!(client.get_room(&room).is_some());
2157
2158 Ok(())
2159 }
2160
2161 #[async_test]
2162 async fn test_lock_multiple_requests() -> Result<()> {
2163 let server = MockServer::start().await;
2164 let client = logged_in_client(Some(server.uri())).await;
2165
2166 let pos = Arc::new(Mutex::new(0));
2167 let _mock_guard = Mock::given(SlidingSyncMatcher)
2168 .respond_with(move |_: &Request| {
2169 let mut pos = pos.lock().unwrap();
2170 *pos += 1;
2171 ResponseTemplate::new(200).set_body_json(json!({
2172 "pos": pos.to_string(),
2173 "lists": {},
2174 "rooms": {}
2175 }))
2176 })
2177 .mount_as_scoped(&server)
2178 .await;
2179
2180 let sliding_sync = client
2181 .sliding_sync("test")?
2182 .with_to_device_extension(
2183 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2184 )
2185 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2186 .build()
2187 .await?;
2188
2189 let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2192
2193 for result in requests.await {
2194 result?;
2195 }
2196
2197 Ok(())
2198 }
2199
2200 #[async_test]
2201 async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2202 let server = MockServer::start().await;
2203 let client = logged_in_client(Some(server.uri())).await;
2204
2205 let pos = Arc::new(Mutex::new(0));
2206 let _mock_guard = Mock::given(SlidingSyncMatcher)
2207 .respond_with(move |_: &Request| {
2208 let mut pos = pos.lock().unwrap();
2209 *pos += 1;
2210 ResponseTemplate::new(200)
2212 .set_body_json(json!({
2213 "pos": pos.to_string(),
2214 "lists": {},
2215 "rooms": {}
2216 }))
2217 .set_delay(Duration::from_secs(2))
2218 })
2219 .mount_as_scoped(&server)
2220 .await;
2221
2222 let sliding_sync =
2223 client
2224 .sliding_sync("test")?
2225 .add_list(SlidingSyncList::builder("room-list").sync_mode(
2226 SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2227 ))
2228 .add_list(
2229 SlidingSyncList::builder("another-list")
2230 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2231 )
2232 .build()
2233 .await?;
2234
2235 let stream = sliding_sync.sync();
2236 pin_mut!(stream);
2237
2238 let cloned_sync = sliding_sync.clone();
2239 spawn(async move {
2240 tokio::time::sleep(Duration::from_millis(100)).await;
2241
2242 cloned_sync
2243 .on_list("another-list", |list| {
2244 list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2245 ready(())
2246 })
2247 .await;
2248 });
2249
2250 assert_matches!(stream.next().await, Some(Ok(_)));
2251
2252 sliding_sync.stop_sync().unwrap();
2253
2254 assert_matches!(stream.next().await, None);
2255
2256 let mut num_requests = 0;
2257
2258 for request in server.received_requests().await.unwrap() {
2259 if !SlidingSyncMatcher.matches(&request) {
2260 continue;
2261 }
2262
2263 let another_list_ranges = if num_requests == 0 {
2264 json!([[0, 10]])
2266 } else {
2267 json!([[10, 20]])
2269 };
2270
2271 num_requests += 1;
2272 assert!(num_requests <= 2, "more than one request hit the server");
2273
2274 let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2275
2276 if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2277 &json_value,
2278 &json!({
2279 "conn_id": "test",
2280 "lists": {
2281 "room-list": {
2282 "ranges": [[0, 9]],
2283 "required_state": [
2284 ["m.room.encryption", ""],
2285 ["m.room.tombstone", ""]
2286 ],
2287 },
2288 "another-list": {
2289 "ranges": another_list_ranges,
2290 "required_state": [
2291 ["m.room.encryption", ""],
2292 ["m.room.tombstone", ""]
2293 ],
2294 },
2295 }
2296 }),
2297 assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2298 ) {
2299 dbg!(json_value);
2300 panic!("json differ: {err}");
2301 }
2302 }
2303
2304 assert_eq!(num_requests, 2);
2305
2306 Ok(())
2307 }
2308
2309 #[async_test]
2310 async fn test_timeout_zero_list() -> Result<()> {
2311 let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2312
2313 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2314
2315 assert!(request.timeout.is_some());
2318
2319 Ok(())
2320 }
2321
2322 #[async_test]
2323 async fn test_timeout_one_list() -> Result<()> {
2324 let (_server, sliding_sync) = new_sliding_sync(vec![
2325 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2326 ])
2327 .await?;
2328
2329 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2330
2331 assert!(request.timeout.is_none());
2333
2334 {
2336 let server_response = assign!(http::Response::new("0".to_owned()), {
2337 lists: BTreeMap::from([(
2338 "foo".to_owned(),
2339 assign!(http::response::List::default(), {
2340 count: uint!(7),
2341 })
2342 )])
2343 });
2344
2345 let _summary = {
2346 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2347 sliding_sync
2348 .handle_response(
2349 server_response.clone(),
2350 &mut pos_guard,
2351 RequestedRequiredStates::default(),
2352 )
2353 .await?
2354 };
2355 }
2356
2357 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2358
2359 assert!(request.timeout.is_some());
2361
2362 Ok(())
2363 }
2364
2365 #[async_test]
2366 async fn test_timeout_three_lists() -> Result<()> {
2367 let (_server, sliding_sync) = new_sliding_sync(vec![
2368 SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2369 SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2370 SlidingSyncList::builder("baz")
2371 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2372 ])
2373 .await?;
2374
2375 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2376
2377 assert!(request.timeout.is_none());
2379
2380 {
2382 let server_response = assign!(http::Response::new("0".to_owned()), {
2383 lists: BTreeMap::from([(
2384 "foo".to_owned(),
2385 assign!(http::response::List::default(), {
2386 count: uint!(7),
2387 })
2388 )])
2389 });
2390
2391 let _summary = {
2392 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2393 sliding_sync
2394 .handle_response(
2395 server_response.clone(),
2396 &mut pos_guard,
2397 RequestedRequiredStates::default(),
2398 )
2399 .await?
2400 };
2401 }
2402
2403 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2404
2405 assert!(request.timeout.is_none());
2407
2408 {
2410 let server_response = assign!(http::Response::new("1".to_owned()), {
2411 lists: BTreeMap::from([(
2412 "bar".to_owned(),
2413 assign!(http::response::List::default(), {
2414 count: uint!(7),
2415 })
2416 )])
2417 });
2418
2419 let _summary = {
2420 let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2421 sliding_sync
2422 .handle_response(
2423 server_response.clone(),
2424 &mut pos_guard,
2425 RequestedRequiredStates::default(),
2426 )
2427 .await?
2428 };
2429 }
2430
2431 let (request, _, _) = sliding_sync.generate_sync_request().await?;
2432
2433 assert!(request.timeout.is_some());
2435
2436 Ok(())
2437 }
2438
2439 #[async_test]
2440 async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2441 let server = MockServer::start().await;
2442 let client = logged_in_client(Some(server.uri())).await;
2443
2444 let _mock_guard = Mock::given(SlidingSyncMatcher)
2445 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2446 "pos": "0",
2447 "lists": {},
2448 "rooms": {}
2449 })))
2450 .mount_as_scoped(&server)
2451 .await;
2452
2453 let sliding_sync = client
2454 .sliding_sync("test")?
2455 .with_to_device_extension(
2456 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2457 )
2458 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2459 .build()
2460 .await?;
2461
2462 let sliding_sync = Arc::new(sliding_sync);
2463
2464 let sync_beat_listener = client.inner.sync_beat.listen();
2466 sliding_sync.sync_once().await?;
2467
2468 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2470 Ok(())
2471 }
2472
2473 #[async_test]
2474 async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2475 let server = MockServer::start().await;
2476 let client = logged_in_client(Some(server.uri())).await;
2477
2478 let _mock_guard = Mock::given(SlidingSyncMatcher)
2479 .respond_with(ResponseTemplate::new(404))
2480 .mount_as_scoped(&server)
2481 .await;
2482
2483 let sliding_sync = client
2484 .sliding_sync("test")?
2485 .with_to_device_extension(
2486 assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2487 )
2488 .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2489 .build()
2490 .await?;
2491
2492 let sliding_sync = Arc::new(sliding_sync);
2493
2494 let sync_beat_listener = client.inner.sync_beat.listen();
2496 let sync_result = sliding_sync.sync_once().await;
2497 assert!(sync_result.is_err());
2498
2499 assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2501
2502 Ok(())
2503 }
2504
2505 #[async_test]
2506 async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2507 let server = MatrixMockServer::new().await;
2508 let client = server.client_builder().build().await;
2509 let room_id = room_id!("!mu5hr00m:example.org");
2510
2511 let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2512 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2513 "pos": "0",
2514 "lists": {},
2515 "extensions": {
2516 "account_data": {
2517 "global": [
2518 {
2519 "type": "m.direct",
2520 "content": {
2521 "@de4dlockh0lmes:example.org": [
2522 "!mu5hr00m:example.org"
2523 ]
2524 }
2525 }
2526 ]
2527 }
2528 },
2529 "rooms": {
2530 room_id: {
2531 "name": "Mario Bros Fanbase Room",
2532 "initial": true,
2533 },
2534 }
2535 })))
2536 .mount_as_scoped(server.server())
2537 .await;
2538
2539 let f = EventFactory::new().room(room_id);
2540
2541 Mock::given(method("GET"))
2542 .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2543 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2544 "chunk": [
2545 f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2546 ]
2547 })))
2548 .mount(server.server())
2549 .await;
2550
2551 let (tx, rx) = tokio::sync::oneshot::channel();
2552
2553 let tx = Arc::new(Mutex::new(Some(tx)));
2554 client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2555 let members =
2557 client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2558 assert_eq!(members.len(), 1);
2559 tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2560 });
2561
2562 let sliding_sync = client
2563 .sliding_sync("test")?
2564 .add_list(SlidingSyncList::builder("thelist"))
2565 .with_account_data_extension(
2566 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2567 )
2568 .build()
2569 .await?;
2570
2571 tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2572 .await
2573 .expect("Sync did not complete in time")
2574 .expect("Sync failed");
2575
2576 tokio::time::timeout(Duration::from_secs(5), rx)
2578 .await
2579 .expect("Event handler did not complete in time")
2580 .expect("Event handler failed");
2581
2582 Ok(())
2583 }
2584}