Skip to main content

matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23
24use std::{
25    collections::{BTreeMap, btree_map::Entry},
26    fmt::Debug,
27    future::Future,
28    sync::{Arc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard},
29    time::Duration,
30};
31
32use async_stream::stream;
33pub use client::{Version, VersionBuilder};
34use futures_core::stream::Stream;
35use matrix_sdk_base::RequestedRequiredStates;
36#[cfg(feature = "e2e-encryption")]
37use matrix_sdk_common::executor::JoinHandleExt as _;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40    OwnedRoomId, RoomId,
41    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42    assign,
43};
44use tokio::{
45    select,
46    sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
47};
48use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
49
50pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
51use self::{cache::restore_sliding_sync_state, client::SlidingSyncResponseProcessor};
52use crate::{Client, Result, config::RequestConfig};
53
54/// The Sliding Sync instance.
55///
56/// It is OK to clone this type as much as you need: cloning it is cheap.
57#[derive(Clone, Debug)]
58pub struct SlidingSync {
59    /// The Sliding Sync data.
60    inner: Arc<SlidingSyncInner>,
61}
62
63#[derive(Debug)]
64pub(super) struct SlidingSyncInner {
65    /// A unique identifier for this instance of sliding sync.
66    ///
67    /// Used to distinguish different connections to sliding sync.
68    id: String,
69
70    /// The HTTP Matrix client.
71    client: Client,
72
73    /// Long-polling timeout that appears in sliding sync request.
74    poll_timeout: Duration,
75
76    /// Extra duration for the sliding sync request to timeout. This is added to
77    /// the [`Self::poll_timeout`].
78    network_timeout: Duration,
79
80    /// The storage key to keep this cache at and load it from.
81    storage_key: String,
82
83    /// Should this sliding sync instance try to restore its sync position
84    /// from the database?
85    ///
86    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
87    /// keep it even so, to avoid sparkling cfg statements everywhere
88    /// throughout this file.
89    share_pos: bool,
90
91    /// Position markers.
92    ///
93    /// The `pos` marker represents a progression when exchanging requests and
94    /// responses with the server: the server acknowledges the request by
95    /// responding with a new `pos`. If the client sends two non-necessarily
96    /// consecutive requests with the same `pos`, the server has to reply with
97    /// the same identical response.
98    ///
99    /// `position` is behind a mutex so that a new request starts after the
100    /// previous request trip has fully ended (successfully or not). This
101    /// mechanism exists to wait for the response to be handled and to see the
102    /// `position` being updated, before sending a new request.
103    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
104
105    /// The lists of this Sliding Sync instance.
106    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
107
108    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
109    /// but one wants to receive updates.
110    room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, http::request::RoomSubscription>>,
111
112    /// The intended state of the extensions being supplied to sliding /sync
113    /// calls.
114    extensions: http::request::Extensions,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127        cache::store_sliding_sync_state(self, position).await
128    }
129
130    /// Create a new [`SlidingSyncBuilder`].
131    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132        SlidingSyncBuilder::new(id, client)
133    }
134
135    /// Add subscriptions to many rooms.
136    ///
137    /// If the associated `Room`s exist, they will be marked as members are
138    /// missing, so that it ensures to re-fetch all members.
139    ///
140    /// A subscription to an already subscribed room is ignored.
141    pub fn subscribe_to_rooms(
142        &self,
143        room_ids: &[&RoomId],
144        settings: Option<http::request::RoomSubscription>,
145        cancel_in_flight_request: bool,
146    ) {
147        if subscribe_to_rooms(
148            self.inner.room_subscriptions.write().unwrap(),
149            &self.inner.client,
150            room_ids,
151            settings,
152            cancel_in_flight_request,
153        ) {
154            self.inner.internal_channel_send_if_possible(
155                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
156            );
157        }
158    }
159
160    /// Remove subscriptions to many rooms.
161    pub fn unsubscribe_to_rooms(&self, room_ids: &[&RoomId], cancel_in_flight_request: bool) {
162        let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
163        let mut skip_over_current_sync_loop_iteration = false;
164
165        for room_id in room_ids {
166            if room_subscriptions.remove(*room_id).is_some() {
167                skip_over_current_sync_loop_iteration = true;
168            }
169        }
170
171        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172            self.inner.internal_channel_send_if_possible(
173                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174            );
175        }
176    }
177
178    /// Replace all subscriptions to rooms by other ones.
179    ///
180    /// If the associated `Room`s exist, they will be marked as members are
181    /// missing, so that it ensures to re-fetch all members.
182    pub fn clear_and_subscribe_to_rooms(
183        &self,
184        room_ids: &[&RoomId],
185        settings: Option<http::request::RoomSubscription>,
186        cancel_in_flight_request: bool,
187    ) {
188        let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
189        room_subscriptions.clear();
190
191        if subscribe_to_rooms(
192            room_subscriptions,
193            &self.inner.client,
194            room_ids,
195            settings,
196            cancel_in_flight_request,
197        ) {
198            self.inner.internal_channel_send_if_possible(
199                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
200            );
201        }
202    }
203
204    /// Find a list by its name, and do something on it if it exists.
205    pub async fn on_list<Function, FunctionOutput, R>(
206        &self,
207        list_name: &str,
208        function: Function,
209    ) -> Option<R>
210    where
211        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
212        FunctionOutput: Future<Output = R>,
213    {
214        let lists = self.inner.lists.read().await;
215
216        match lists.get(list_name) {
217            Some(list) => Some(function(list).await),
218            None => None,
219        }
220    }
221
222    /// Add the list to the list of lists.
223    ///
224    /// As lists need to have a unique `.name`, if a list with the same name
225    /// is found the new list will replace the old one and the return it or
226    /// `None`.
227    pub async fn add_list(
228        &self,
229        list_builder: SlidingSyncListBuilder,
230    ) -> Result<Option<SlidingSyncList>> {
231        let list = list_builder.build(self.inner.internal_channel.clone());
232
233        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
234
235        self.inner.internal_channel_send_if_possible(
236            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
237        );
238
239        Ok(old_list)
240    }
241
242    /// Add a list that will be cached and reloaded from the cache.
243    ///
244    /// This will raise an error if a storage key was not set, or if there
245    /// was a I/O error reading from the cache.
246    ///
247    /// The rest of the semantics is the same as [`Self::add_list`].
248    pub async fn add_cached_list(
249        &self,
250        mut list_builder: SlidingSyncListBuilder,
251    ) -> Result<Option<SlidingSyncList>> {
252        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
253
254        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
255
256        self.add_list(list_builder).await
257    }
258
259    /// Handle the HTTP response.
260    #[instrument(skip_all)]
261    async fn handle_response(
262        &self,
263        mut sliding_sync_response: http::Response,
264        position: &mut SlidingSyncPositionMarkers,
265        requested_required_states: RequestedRequiredStates,
266    ) -> Result<UpdateSummary, crate::Error> {
267        let pos = Some(sliding_sync_response.pos.clone());
268
269        let must_process_rooms_response = self.must_process_rooms_response().await;
270
271        trace!(yes = must_process_rooms_response, "Must process rooms response?");
272
273        // Transform a Sliding Sync Response to a `SyncResponse`.
274        //
275        // We may not need the `sync_response` in the future (once `SyncResponse` will
276        // move to Sliding Sync, i.e. to `http::Response`), but processing the
277        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
278        // happens here.
279
280        let sync_response = {
281            let _timer = timer!("response processor");
282
283            let response_processor = {
284                // Take the lock to synchronise accesses to the state store, to avoid concurrent
285                // sliding syncs overwriting each other's room infos.
286                let _state_store_lock = {
287                    let _timer = timer!("acquiring the `state_store_lock`");
288
289                    self.inner.client.base_client().state_store_lock().lock().await
290                };
291
292                let mut response_processor =
293                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
294
295                // Process thread subscriptions if they're available.
296                //
297                // It's important to do this *before* handling the room responses, so that
298                // notifications can be properly generated based on the thread subscriptions,
299                // for the events in threads we've subscribed to.
300                if self.is_thread_subscriptions_enabled() {
301                    response_processor
302                        .handle_thread_subscriptions(
303                            position.pos.as_deref(),
304                            std::mem::take(
305                                &mut sliding_sync_response.extensions.thread_subscriptions,
306                            ),
307                        )
308                        .await?;
309                }
310
311                #[cfg(feature = "e2e-encryption")]
312                if self.is_e2ee_enabled() {
313                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
314                }
315
316                // Only handle the room's subsection of the response, if this sliding sync was
317                // configured to do so.
318                if must_process_rooms_response {
319                    response_processor
320                        .handle_room_response(&sliding_sync_response, &requested_required_states)
321                        .await?;
322                }
323
324                response_processor
325            };
326
327            // Release the lock before calling event handlers
328            response_processor.process_and_take_response().await?
329        };
330
331        debug!("Sliding Sync response has been handled by the client");
332        trace!(?sync_response);
333
334        let update_summary = {
335            // Update the rooms.
336            let updated_rooms = {
337                let mut updated_rooms = Vec::with_capacity(
338                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
339                );
340
341                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
342
343                // There might be other rooms that were only mentioned in the sliding sync
344                // extensions part of the response, and thus would result in rooms present in
345                // the `sync_response.joined`. Mark them as updated too.
346                //
347                // Since we've removed rooms that were in the room subsection from
348                // `sync_response.rooms.joined`, the remaining ones aren't already present in
349                // `updated_rooms` and wouldn't cause any duplicates.
350                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
351
352                updated_rooms
353            };
354
355            // Update the lists.
356            let updated_lists = {
357                debug!(
358                    lists = ?sliding_sync_response.lists,
359                    "Update lists"
360                );
361
362                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
363                let mut lists = self.inner.lists.write().await;
364
365                // Iterate on known lists, not on lists in the response. Rooms may have been
366                // updated that were not involved in any list update.
367                for (name, list) in lists.iter_mut() {
368                    if let Some(updates) = sliding_sync_response.lists.get(name) {
369                        let maximum_number_of_rooms: u32 =
370                            updates.count.try_into().expect("failed to convert `count` to `u32`");
371
372                        if list.update(Some(maximum_number_of_rooms))? {
373                            updated_lists.push(name.clone());
374                        }
375                    } else if list.update(None)? {
376                        updated_lists.push(name.clone());
377                    }
378                }
379
380                // Report about unknown lists.
381                for name in sliding_sync_response.lists.keys() {
382                    if !lists.contains_key(name) {
383                        error!("Response for list `{name}` - unknown to us; skipping");
384                    }
385                }
386
387                updated_lists
388            };
389
390            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
391        };
392
393        // Everything went well, we can update the position markers.
394        //
395        // Save the new position markers.
396        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
397
398        position.pos = pos;
399
400        Ok(update_summary)
401    }
402
403    async fn generate_sync_request(
404        &self,
405    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
406        // Collect requests for lists.
407        let mut requests_lists = BTreeMap::new();
408
409        let timeout = {
410            let lists = self.inner.lists.read().await;
411
412            // Start at `Default` in case there is zero list.
413            let mut timeout = PollTimeout::Default;
414
415            for (name, list) in lists.iter() {
416                requests_lists.insert(name.clone(), list.next_request()?);
417                timeout = timeout.min(list.requires_timeout());
418            }
419
420            timeout
421        };
422
423        // Collect the `pos`.
424        //
425        // Wait on the `position` mutex to be available. It means no request nor
426        // response is running. The `position` mutex is released whether the response
427        // has been fully handled successfully, in this case the `pos` is updated, or
428        // the response handling has failed, in this case the `pos` hasn't been updated
429        // and the same `pos` will be used for this new request.
430        let mut position_guard = {
431            debug!("Waiting to acquire the `position` lock");
432
433            let _timer = timer!("acquiring the `position` lock");
434
435            self.inner.position.clone().lock_owned().await
436        };
437
438        debug!(pos = ?position_guard.pos, "Got a position");
439
440        let to_device_enabled = self.inner.extensions.to_device.enabled == Some(true);
441
442        let restored_fields = if self.inner.share_pos || to_device_enabled {
443            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
444        } else {
445            None
446        };
447
448        // Update pos: either the one restored from the database, if any and the sliding
449        // sync was configured so, or read it from the memory cache.
450        let pos = if self.inner.share_pos {
451            if let Some(fields) = &restored_fields {
452                // Override the memory one with the database one, for consistency.
453                if fields.pos != position_guard.pos {
454                    info!(
455                        "Pos from previous request ('{:?}') was different from \
456                         pos in database ('{:?}').",
457                        position_guard.pos, fields.pos
458                    );
459                    position_guard.pos = fields.pos.clone();
460                }
461                fields.pos.clone()
462            } else {
463                position_guard.pos.clone()
464            }
465        } else {
466            position_guard.pos.clone()
467        };
468
469        // When the client sends a request with no `pos`, MSC4186 returns no device
470        // lists updates, as it only returns changes since the provided `pos`
471        // (which is `null` in this case); this is in line with sync v2.
472        //
473        // Therefore, with MSC4186, the device list cache must be marked as to be
474        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
475        // device lists updates that happened between the previous request and the new
476        // “initial” request.
477        #[cfg(feature = "e2e-encryption")]
478        if pos.is_none() && self.is_e2ee_enabled() {
479            info!("Marking all tracked users as dirty");
480
481            let olm_machine = self.inner.client.olm_machine().await;
482            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
483            olm_machine.mark_all_tracked_users_as_dirty().await?;
484        }
485
486        // Configure the timeout.
487        //
488        // The `timeout` query is necessary when all lists require it. Please see
489        // [`SlidingSyncList::requires_timeout`].
490        let timeout = match timeout {
491            PollTimeout::None => None,
492            PollTimeout::Some(timeout) => Some(Duration::from_secs(timeout.into())),
493            PollTimeout::Default => Some(self.inner.poll_timeout),
494        };
495
496        Span::current()
497            .record("pos", &pos)
498            .record("timeout", timeout.map(|duration| duration.as_millis()));
499
500        let mut request = assign!(http::Request::new(), {
501            conn_id: Some(self.inner.id.clone()),
502            pos,
503            timeout,
504            lists: requests_lists,
505        });
506
507        // Add room subscriptions.
508        request.room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
509
510        // Add extensions.
511        request.extensions = self.inner.extensions.clone();
512
513        // Override the to-device token if the extension is enabled.
514        if to_device_enabled {
515            request.extensions.to_device.since =
516                restored_fields.and_then(|fields| fields.to_device_token);
517        }
518
519        Ok((
520            // The request itself.
521            request,
522            // Configure long-polling. We need some time for the long-poll itself,
523            // and extra time for the network delays.
524            RequestConfig::default()
525                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
526                .retry_limit(3),
527            position_guard,
528        ))
529    }
530
531    /// Send a sliding sync request.
532    ///
533    /// This method contains the sending logic.
534    async fn send_sync_request(
535        &self,
536        request: http::Request,
537        request_config: RequestConfig,
538        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
539    ) -> Result<UpdateSummary> {
540        debug!("Sending request");
541
542        // Prepare the request.
543        let requested_required_states = RequestedRequiredStates::from(&request);
544        let request = self.inner.client.send(request).with_request_config(request_config);
545
546        // Send the request and get a response with end-to-end encryption support.
547        //
548        // Sending the `/sync` request out when end-to-end encryption is enabled means
549        // that we need to also send out any outgoing e2ee related request out
550        // coming from the `OlmMachine::outgoing_requests()` method.
551
552        #[cfg(feature = "e2e-encryption")]
553        let response = {
554            if self.is_e2ee_enabled() {
555                // Here, we need to run 2 things:
556                //
557                // 1. Send the sliding sync request and get a response,
558                // 2. Send the E2EE requests.
559                //
560                // We don't want to use a `join` or `try_join` because we want to fail if and
561                // only if sending the sliding sync request fails. Failing to send the E2EE
562                // requests should just result in a log.
563                //
564                // We also want to give the priority to sliding sync request. E2EE requests are
565                // sent concurrently to the sliding sync request, but the priority is on waiting
566                // a sliding sync response.
567                //
568                // If sending sliding sync request fails, the sending of E2EE requests must be
569                // aborted as soon as possible.
570
571                let client = self.inner.client.clone();
572                let e2ee_uploads = spawn(
573                    async move {
574                        if let Err(error) = client.send_outgoing_requests().await {
575                            error!(?error, "Error while sending outgoing E2EE requests");
576                        }
577                    }
578                    .instrument(Span::current()),
579                )
580                // Ensure that the task is not running in detached mode. It is aborted when it's
581                // dropped.
582                .abort_on_drop();
583
584                // Wait on the sliding sync request success or failure early.
585                let response = request.await?;
586
587                // At this point, if `request` has been resolved successfully, we wait on
588                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
589                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
590                // been dropped, so aborted.
591                e2ee_uploads.await.map_err(|error| Error::JoinError {
592                    task_description: "e2ee_uploads".to_owned(),
593                    error,
594                })?;
595
596                response
597            } else {
598                request.await?
599            }
600        };
601
602        // Send the request and get a response _without_ end-to-end encryption support.
603        #[cfg(not(feature = "e2e-encryption"))]
604        let response = request.await?;
605
606        debug!("Received response");
607
608        // At this point, the request has been sent, and a response has been received.
609        //
610        // We must ensure the handling of the response cannot be stopped/
611        // cancelled. It must be done entirely, otherwise we can have
612        // corrupted/incomplete states for Sliding Sync and other parts of
613        // the code.
614        //
615        // That's why we are running the handling of the response in a spawned
616        // future that cannot be cancelled by anything.
617        let this = self.clone();
618
619        // Spawn a new future to ensure that the code inside this future cannot be
620        // cancelled if this method is cancelled.
621        let future = async move {
622            debug!("Start handling response");
623
624            // In case the task running this future is detached, we must
625            // ensure responses are handled one at a time. At this point we still own
626            // `position_guard`, so we're fine.
627
628            // Handle the response.
629            let updates = this
630                .handle_response(response, &mut position_guard, requested_required_states)
631                .await?;
632
633            this.cache_to_storage(&position_guard).await?;
634
635            // Release the position guard lock.
636            // It means that other responses can be generated and then handled later.
637            drop(position_guard);
638
639            debug!("Done handling response");
640
641            Ok(updates)
642        };
643
644        spawn(future.instrument(Span::current())).await.map_err(|error| Error::JoinError {
645            task_description: "handle_response".to_owned(),
646            error,
647        })?
648    }
649
650    /// Is the e2ee extension enabled for this sliding sync instance?
651    #[cfg(feature = "e2e-encryption")]
652    fn is_e2ee_enabled(&self) -> bool {
653        self.inner.extensions.e2ee.enabled == Some(true)
654    }
655
656    /// Is the thread subscriptions extension enabled for this sliding sync
657    /// instance?
658    fn is_thread_subscriptions_enabled(&self) -> bool {
659        self.inner.extensions.thread_subscriptions.enabled == Some(true)
660    }
661
662    #[cfg(not(feature = "e2e-encryption"))]
663    fn is_e2ee_enabled(&self) -> bool {
664        false
665    }
666
667    /// Should we process the room's subpart of a response?
668    async fn must_process_rooms_response(&self) -> bool {
669        // We consider that we must, if there's any room subscription or there's any
670        // list.
671        !self.inner.room_subscriptions.read().unwrap().is_empty()
672            || !self.inner.lists.read().await.is_empty()
673    }
674
675    /// Send a single sliding sync request, and returns the response summary.
676    ///
677    /// Public for testing purposes only.
678    #[doc(hidden)]
679    #[instrument(skip_all, fields(conn_id = self.inner.id, pos, timeout))]
680    pub async fn sync_once(&self) -> Result<UpdateSummary> {
681        let (request, request_config, position_guard) = self.generate_sync_request().await?;
682
683        // Send the request.
684        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
685
686        // Notify a new sync was received.
687        self.inner.client.inner.sync_beat.notify(usize::MAX);
688
689        Ok(summaries)
690    }
691
692    /// Create a _new_ Sliding Sync sync loop.
693    ///
694    /// This method returns a `Stream`, which will send requests and will handle
695    /// responses automatically. Lists and rooms are updated automatically.
696    ///
697    /// This function returns `Ok(…)` if everything went well, otherwise it will
698    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
699    /// termination.
700    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
701    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
702    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
703        debug!("Starting sync stream");
704
705        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
706
707        stream! {
708            loop {
709                debug!("Sync stream is running");
710
711                select! {
712                    biased;
713
714                    internal_message = internal_channel_receiver.recv() => {
715                        use SlidingSyncInternalMessage::*;
716
717                        debug!(?internal_message, "Sync stream has received an internal message");
718
719                        match internal_message {
720                            Err(_) | Ok(SyncLoopStop) => {
721                                break;
722                            }
723
724                            Ok(SyncLoopSkipOverCurrentIteration) => {
725                                continue;
726                            }
727                        }
728                    }
729
730                    update_summary = self.sync_once() => {
731                        match update_summary {
732                            Ok(updates) => {
733                                yield Ok(updates);
734                            }
735
736                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
737                            Err(error) => {
738                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
739                                    // The Sliding Sync session has expired. Let's reset `pos`.
740                                    self.expire_session().await;
741                                }
742
743                                yield Err(error);
744
745                                // Terminates the loop, and terminates the stream.
746                                break;
747                            }
748                        }
749                    }
750                }
751            }
752
753            debug!("Sync stream has exited.");
754        }
755    }
756
757    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
758    ///
759    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
760    /// enough to “stop” it, but depending of how this `Stream` is used, it
761    /// might not be obvious to drop it immediately (thinking of using this API
762    /// over FFI; the foreign-language might not be able to drop a value
763    /// immediately). Thus, calling this method will ensure that the sync loop
764    /// stops gracefully and as soon as it returns.
765    pub fn stop_sync(&self) -> Result<()> {
766        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
767    }
768
769    /// Expire the current Sliding Sync session on the client-side.
770    ///
771    /// Expiring a Sliding Sync session means: resetting `pos`.
772    ///
773    /// This should only be used when it's clear that this session was about to
774    /// expire anyways, and should be used only in very specific cases (e.g.
775    /// multiple sliding syncs being run in parallel, and one of them has
776    /// expired).
777    ///
778    /// This method **MUST** be called when the sync loop is stopped.
779    #[doc(hidden)]
780    pub async fn expire_session(&self) {
781        info!("Session expired; resetting `pos`");
782
783        {
784            let lists = self.inner.lists.read().await;
785
786            for list in lists.values() {
787                // Invalidate in-memory data that would be persisted on disk.
788                list.set_maximum_number_of_rooms(None);
789            }
790        }
791
792        // Remove the cached sliding sync state as well.
793        {
794            let mut position = self.inner.position.lock().await;
795
796            // Invalidate in memory.
797            position.pos = None;
798
799            // Propagate to disk.
800            // Note: this propagates both the sliding sync state and the cached lists'
801            // state to disk.
802            if let Err(err) = self.cache_to_storage(&position).await {
803                warn!("Failed to invalidate cached sliding sync state: {err}");
804            }
805        }
806
807        {
808            // Clear all room subscriptions: we don't want to resend all room subscriptions
809            // when the session will restart.
810            self.inner.room_subscriptions.write().unwrap().clear();
811        }
812    }
813}
814
815/// Private implementation for [`SlidingSync::subscribe_to_rooms`] and
816/// [`SlidingSync::clear_and_subscribe_to_rooms`].
817fn subscribe_to_rooms(
818    mut room_subscriptions: StdRwLockWriteGuard<
819        '_,
820        BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
821    >,
822    client: &Client,
823    room_ids: &[&RoomId],
824    settings: Option<http::request::RoomSubscription>,
825    cancel_in_flight_request: bool,
826) -> bool {
827    let settings = settings.unwrap_or_default();
828    let mut skip_over_current_sync_loop_iteration = false;
829
830    for room_id in room_ids {
831        if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
832            if let Some(room) = client.get_room(room_id) {
833                room.mark_members_missing();
834            }
835
836            entry.insert(settings.clone());
837
838            skip_over_current_sync_loop_iteration = true;
839        }
840    }
841
842    cancel_in_flight_request && skip_over_current_sync_loop_iteration
843}
844
845impl SlidingSyncInner {
846    /// Send a message over the internal channel.
847    #[instrument]
848    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
849        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
850    }
851
852    /// Send a message over the internal channel if there is a receiver, i.e. if
853    /// the sync loop is running.
854    #[instrument]
855    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
856        // If there is no receiver, the send will fail, but that's OK here.
857        let _ = self.internal_channel.send(message);
858    }
859}
860
861#[derive(Copy, Clone, Debug, PartialEq)]
862enum SlidingSyncInternalMessage {
863    /// Instruct the sync loop to stop.
864    SyncLoopStop,
865
866    /// Instruct the sync loop to skip over any remaining work in its iteration,
867    /// and to jump to the next iteration.
868    SyncLoopSkipOverCurrentIteration,
869}
870
871#[cfg(any(test, feature = "testing"))]
872impl SlidingSync {
873    /// Set a new value for `pos`.
874    pub async fn set_pos(&self, new_pos: String) {
875        let mut position_lock = self.inner.position.lock().await;
876        position_lock.pos = Some(new_pos);
877    }
878}
879
880#[derive(Clone, Debug)]
881pub(super) struct SlidingSyncPositionMarkers {
882    /// An ephemeral position in the current stream, as received from the
883    /// previous `/sync` response, or `None` for the first request.
884    pos: Option<String>,
885}
886
887/// A summary of the updates received after a sync (like in
888/// [`SlidingSync::sync`]).
889#[derive(Debug, Clone)]
890pub struct UpdateSummary {
891    /// The names of the lists that have seen an update.
892    pub lists: Vec<String>,
893    /// The rooms that have seen updates
894    pub rooms: Vec<OwnedRoomId>,
895}
896
897/// Define what kind of poll timeout [`SlidingSync`] must use.
898///
899/// [The spec says about `timeout`][spec]:
900///
901/// > How long to wait for new events […] If omitted the response is always
902/// > returned immediately, even if there are no changes.
903///
904/// [spec]: https://github.com/matrix-org/matrix-spec-proposals/blob/erikj/sss/proposals/4186-simplified-sliding-sync.md#top-level
905#[derive(Debug)]
906pub enum PollTimeout {
907    /// No `timeout` must be present.
908    None,
909
910    /// A `timeout=X` must be present, where `X` is in seconds and
911    /// represents how long to wait for new events.
912    Some(u32),
913
914    /// A `timeout=X` must be present, where `X` is the default value passed to
915    /// [`SlidingSyncBuilder::poll_timeout`].
916    Default,
917}
918
919impl PollTimeout {
920    /// Computes the smallest `PollTimeout` between two of them.
921    ///
922    /// The rules are the following:
923    ///
924    /// * `None` < `Some`,
925    /// * `Some(x) < Some(y)` if and only if `x < y`,
926    /// * `Some < Default`.
927    ///
928    /// The `Default` value is unknown at this step but is assumed to be the
929    /// largest.
930    fn min(self, left: Self) -> Self {
931        match (self, left) {
932            (Self::None, _) => Self::None,
933
934            (Self::Some(_), Self::None) => Self::None,
935            (Self::Some(right), Self::Some(left)) => Self::Some(right.min(left)),
936            (Self::Some(right), Self::Default) => Self::Some(right),
937
938            (Self::Default, Self::None) => Self::None,
939            (Self::Default, Self::Some(left)) => Self::Some(left),
940            (Self::Default, Self::Default) => Self::Default,
941        }
942    }
943}
944
945#[cfg(all(test, not(target_family = "wasm")))]
946#[allow(clippy::dbg_macro)]
947mod tests {
948    use std::{
949        collections::BTreeMap,
950        future::ready,
951        ops::Not,
952        sync::{Arc, Mutex},
953        time::Duration,
954    };
955
956    use assert_matches::assert_matches;
957    use event_listener::Listener;
958    use futures_util::{StreamExt, future::join_all, pin_mut};
959    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
960    use matrix_sdk_common::executor::spawn;
961    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
962    use ruma::{
963        OwnedRoomId, assign,
964        events::{direct::DirectEvent, room::member::MembershipState},
965        owned_room_id, room_id,
966        serde::Raw,
967        uint,
968    };
969    use serde::Deserialize;
970    use serde_json::json;
971    use wiremock::{
972        Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
973    };
974
975    use super::{
976        SlidingSync, SlidingSyncBuilder, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
977        cache::restore_sliding_sync_state, http,
978    };
979    use crate::{
980        Client, Result,
981        test_utils::{logged_in_client, mocks::MatrixMockServer},
982    };
983
984    #[derive(Copy, Clone)]
985    struct SlidingSyncMatcher;
986
987    impl Match for SlidingSyncMatcher {
988        fn matches(&self, request: &Request) -> bool {
989            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
990                && request.method == Method::POST
991        }
992    }
993
994    async fn new_sliding_sync(
995        lists: Vec<SlidingSyncListBuilder>,
996    ) -> Result<(MockServer, SlidingSync)> {
997        let server = MockServer::start().await;
998        let client = logged_in_client(Some(server.uri())).await;
999
1000        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1001
1002        for list in lists {
1003            sliding_sync_builder = sliding_sync_builder.add_list(list);
1004        }
1005
1006        let sliding_sync = sliding_sync_builder.build().await?;
1007
1008        Ok((server, sliding_sync))
1009    }
1010
1011    #[async_test]
1012    async fn test_subscribe_to_rooms() -> Result<()> {
1013        let (server, sliding_sync) = new_sliding_sync(vec![
1014            SlidingSyncList::builder("foo")
1015                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1016        ])
1017        .await?;
1018
1019        let stream = sliding_sync.sync();
1020        pin_mut!(stream);
1021
1022        let room_id_0 = room_id!("!r0:bar.org");
1023        let room_id_1 = room_id!("!r1:bar.org");
1024        let room_id_2 = room_id!("!r2:bar.org");
1025
1026        {
1027            let _mock_guard = Mock::given(SlidingSyncMatcher)
1028                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1029                    "pos": "1",
1030                    "lists": {},
1031                    "rooms": {
1032                        room_id_0: {
1033                            "name": "Room #0",
1034                            "initial": true,
1035                        },
1036                        room_id_1: {
1037                            "name": "Room #1",
1038                            "initial": true,
1039                        },
1040                        room_id_2: {
1041                            "name": "Room #2",
1042                            "initial": true,
1043                        },
1044                    }
1045                })))
1046                .mount_as_scoped(&server)
1047                .await;
1048
1049            let _ = stream.next().await.unwrap()?;
1050        }
1051
1052        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1053
1054        // Members aren't synced.
1055        // We need to make them synced, so that we can test that subscribing to a room
1056        // make members not synced. That's a desired feature.
1057        assert!(room0.are_members_synced().not());
1058
1059        {
1060            struct MemberMatcher(OwnedRoomId);
1061
1062            impl Match for MemberMatcher {
1063                fn matches(&self, request: &Request) -> bool {
1064                    request.url.path()
1065                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1066                        && request.method == Method::GET
1067                }
1068            }
1069
1070            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1071                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1072                    "chunk": [],
1073                })))
1074                .mount_as_scoped(&server)
1075                .await;
1076
1077            assert_matches!(room0.request_members().await, Ok(()));
1078        }
1079
1080        // Members are now synced! We can start subscribing and see how it goes.
1081        assert!(room0.are_members_synced());
1082
1083        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1084
1085        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1086        // now marked as not synced.
1087        assert!(room0.are_members_synced().not());
1088
1089        {
1090            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1091
1092            assert!(room_subscriptions.contains_key(room_id_0));
1093            assert!(room_subscriptions.contains_key(room_id_1));
1094            assert!(!room_subscriptions.contains_key(room_id_2));
1095        }
1096
1097        // Subscribing to the same room doesn't reset the member sync state.
1098
1099        {
1100            struct MemberMatcher(OwnedRoomId);
1101
1102            impl Match for MemberMatcher {
1103                fn matches(&self, request: &Request) -> bool {
1104                    request.url.path()
1105                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1106                        && request.method == Method::GET
1107                }
1108            }
1109
1110            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1111                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1112                    "chunk": [],
1113                })))
1114                .mount_as_scoped(&server)
1115                .await;
1116
1117            assert_matches!(room0.request_members().await, Ok(()));
1118        }
1119
1120        // Members are synced, good, good.
1121        assert!(room0.are_members_synced());
1122
1123        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1124
1125        // Members are still synced: because we have already subscribed to the
1126        // room, the members aren't marked as unsynced.
1127        assert!(room0.are_members_synced());
1128
1129        Ok(())
1130    }
1131
1132    #[async_test]
1133    async fn test_subscribe_unsubscribe_and_clear_and_subscribe_to_rooms() -> Result<()> {
1134        let (_server, sliding_sync) = new_sliding_sync(vec![
1135            SlidingSyncList::builder("foo")
1136                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1137        ])
1138        .await?;
1139
1140        let room_id_0 = room_id!("!r0:bar.org");
1141        let room_id_1 = room_id!("!r1:bar.org");
1142        let room_id_2 = room_id!("!r2:bar.org");
1143        let room_id_3 = room_id!("!r3:bar.org");
1144
1145        // Initially empty.
1146        {
1147            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1148
1149            assert!(room_subscriptions.is_empty());
1150        }
1151
1152        // Add 2 rooms.
1153        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1154
1155        {
1156            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1157
1158            assert_eq!(room_subscriptions.len(), 2);
1159            assert!(room_subscriptions.contains_key(room_id_0));
1160            assert!(room_subscriptions.contains_key(room_id_1));
1161        }
1162
1163        // Remove 1 room.
1164        sliding_sync.unsubscribe_to_rooms(&[room_id_0], false);
1165
1166        {
1167            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1168
1169            assert_eq!(room_subscriptions.len(), 1);
1170            assert!(room_subscriptions.contains_key(room_id_1));
1171        }
1172
1173        // Add 2 rooms, but one already exists.
1174        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1175
1176        {
1177            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1178
1179            assert_eq!(room_subscriptions.len(), 2);
1180            assert!(room_subscriptions.contains_key(room_id_0));
1181            assert!(room_subscriptions.contains_key(room_id_1));
1182        }
1183
1184        // Replace all rooms with 2 other rooms.
1185        sliding_sync.clear_and_subscribe_to_rooms(
1186            &[room_id_2, room_id_3],
1187            Default::default(),
1188            false,
1189        );
1190
1191        {
1192            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1193
1194            assert_eq!(room_subscriptions.len(), 2);
1195            assert!(room_subscriptions.contains_key(room_id_2));
1196            assert!(room_subscriptions.contains_key(room_id_3));
1197        }
1198
1199        Ok(())
1200    }
1201
1202    #[async_test]
1203    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1204        let (_server, sliding_sync) = new_sliding_sync(vec![
1205            SlidingSyncList::builder("foo")
1206                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1207        ])
1208        .await?;
1209
1210        let room_id_0 = room_id!("!r0:bar.org");
1211        let room_id_1 = room_id!("!r1:bar.org");
1212        let room_id_2 = room_id!("!r2:bar.org");
1213
1214        // Subscribe to two rooms.
1215        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1216
1217        {
1218            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1219
1220            assert!(room_subscriptions.contains_key(room_id_0));
1221            assert!(room_subscriptions.contains_key(room_id_1));
1222            assert!(room_subscriptions.contains_key(room_id_2).not());
1223        }
1224
1225        // Subscribe to one more room.
1226        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1227
1228        {
1229            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1230
1231            assert!(room_subscriptions.contains_key(room_id_0));
1232            assert!(room_subscriptions.contains_key(room_id_1));
1233            assert!(room_subscriptions.contains_key(room_id_2));
1234        }
1235
1236        // Suddenly, the session expires!
1237        sliding_sync.expire_session().await;
1238
1239        {
1240            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1241
1242            assert!(room_subscriptions.is_empty());
1243        }
1244
1245        // Subscribe to one room again.
1246        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1247
1248        {
1249            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1250
1251            assert!(room_subscriptions.contains_key(room_id_0).not());
1252            assert!(room_subscriptions.contains_key(room_id_1).not());
1253            assert!(room_subscriptions.contains_key(room_id_2));
1254        }
1255
1256        Ok(())
1257    }
1258
1259    #[async_test]
1260    async fn test_add_list() -> Result<()> {
1261        let (_server, sliding_sync) = new_sliding_sync(vec![
1262            SlidingSyncList::builder("foo")
1263                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1264        ])
1265        .await?;
1266
1267        let _stream = sliding_sync.sync();
1268        pin_mut!(_stream);
1269
1270        sliding_sync
1271            .add_list(
1272                SlidingSyncList::builder("bar")
1273                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1274            )
1275            .await?;
1276
1277        let lists = sliding_sync.inner.lists.read().await;
1278
1279        assert!(lists.contains_key("foo"));
1280        assert!(lists.contains_key("bar"));
1281
1282        // this test also ensures that Tokio is not panicking when calling `add_list`.
1283
1284        Ok(())
1285    }
1286
1287    #[cfg(feature = "e2e-encryption")]
1288    #[async_test]
1289    async fn test_extensions_to_device_since_is_set() {
1290        use matrix_sdk_base::crypto::store::types::Changes;
1291
1292        let client = logged_in_client(None).await;
1293        let sliding_sync = SlidingSyncBuilder::new("foo".to_owned(), client.clone())
1294            .unwrap()
1295            .with_to_device_extension(assign!(
1296                http::request::ToDevice::default(),
1297                {
1298                    enabled: Some(true),
1299                }
1300            ))
1301            .build()
1302            .await
1303            .unwrap();
1304
1305        // Test `SlidingSyncInner::extensions`.
1306        {
1307            let to_device = &sliding_sync.inner.extensions.to_device;
1308
1309            assert_eq!(to_device.enabled, Some(true));
1310            assert!(to_device.since.is_none());
1311        }
1312
1313        // Test `Request::extensions`.
1314        {
1315            let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1316
1317            let to_device = &request.extensions.to_device;
1318
1319            assert_eq!(to_device.enabled, Some(true));
1320            assert!(to_device.since.is_none());
1321        }
1322
1323        // Define a `since` token.
1324        let since_token = "depuis".to_owned();
1325
1326        {
1327            if let Some(olm_machine) = &*client.olm_machine().await {
1328                olm_machine
1329                    .store()
1330                    .save_changes(Changes {
1331                        next_batch_token: Some(since_token.clone()),
1332                        ..Default::default()
1333                    })
1334                    .await
1335                    .unwrap();
1336            } else {
1337                panic!("Where is the Olm machine?");
1338            }
1339        }
1340
1341        // Test `Request::extensions` again.
1342        {
1343            let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1344
1345            let to_device = &request.extensions.to_device;
1346
1347            assert_eq!(to_device.enabled, Some(true));
1348            assert_eq!(to_device.since, Some(since_token));
1349        }
1350    }
1351
1352    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1353    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1354    // `/key/query` requests must be sent. See the code to see the details.
1355    //
1356    // This test is asserting that.
1357    #[async_test]
1358    #[cfg(feature = "e2e-encryption")]
1359    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1360        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1361        use matrix_sdk_test::ruma_response_from_json;
1362        use ruma::user_id;
1363
1364        let server = MockServer::start().await;
1365        let client = logged_in_client(Some(server.uri())).await;
1366
1367        let alice = user_id!("@alice:localhost");
1368        let bob = user_id!("@bob:localhost");
1369        let me = user_id!("@example:localhost");
1370
1371        // Track and mark users are not dirty, so that we can check they are “dirty”
1372        // after that. Dirty here means that a `/key/query` must be sent.
1373        {
1374            let olm_machine = client.olm_machine().await;
1375            let olm_machine = olm_machine.as_ref().unwrap();
1376
1377            olm_machine.update_tracked_users([alice, bob]).await?;
1378
1379            // Assert requests.
1380            let outgoing_requests = olm_machine.outgoing_requests().await?;
1381
1382            assert_eq!(outgoing_requests.len(), 2);
1383            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1384            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1385
1386            // Fake responses.
1387            olm_machine
1388                .mark_request_as_sent(
1389                    outgoing_requests[0].request_id(),
1390                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1391                        "one_time_key_counts": {}
1392                    }))),
1393                )
1394                .await?;
1395
1396            olm_machine
1397                .mark_request_as_sent(
1398                    outgoing_requests[1].request_id(),
1399                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1400                        "device_keys": {
1401                            alice: {},
1402                            bob: {},
1403                        }
1404                    }))),
1405                )
1406                .await?;
1407
1408            // Once more.
1409            let outgoing_requests = olm_machine.outgoing_requests().await?;
1410
1411            assert_eq!(outgoing_requests.len(), 1);
1412            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1413
1414            olm_machine
1415                .mark_request_as_sent(
1416                    outgoing_requests[0].request_id(),
1417                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1418                        "device_keys": {
1419                            me: {},
1420                        }
1421                    }))),
1422                )
1423                .await?;
1424
1425            // No more.
1426            let outgoing_requests = olm_machine.outgoing_requests().await?;
1427
1428            assert!(outgoing_requests.is_empty());
1429        }
1430
1431        let sync = client
1432            .sliding_sync("test-slidingsync")?
1433            .add_list(SlidingSyncList::builder("new_list"))
1434            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1435            .build()
1436            .await?;
1437
1438        // First request: no `pos`.
1439        let (_request, _, _) = sync.generate_sync_request().await?;
1440
1441        // Now, tracked users must be dirty.
1442        {
1443            let olm_machine = client.olm_machine().await;
1444            let olm_machine = olm_machine.as_ref().unwrap();
1445
1446            // Assert requests.
1447            let outgoing_requests = olm_machine.outgoing_requests().await?;
1448
1449            assert_eq!(outgoing_requests.len(), 1);
1450            assert_matches!(
1451                outgoing_requests[0].request(),
1452                AnyOutgoingRequest::KeysQuery(request) => {
1453                    assert!(request.device_keys.contains_key(alice));
1454                    assert!(request.device_keys.contains_key(bob));
1455                    assert!(request.device_keys.contains_key(me));
1456                }
1457            );
1458
1459            // Fake responses.
1460            olm_machine
1461                .mark_request_as_sent(
1462                    outgoing_requests[0].request_id(),
1463                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1464                        "device_keys": {
1465                            alice: {},
1466                            bob: {},
1467                            me: {},
1468                        }
1469                    }))),
1470                )
1471                .await?;
1472        }
1473
1474        // Second request: with a `pos` this time.
1475        sync.set_pos("chocolat".to_owned()).await;
1476
1477        let (_request, _, _) = sync.generate_sync_request().await?;
1478
1479        // Tracked users are not marked as dirty.
1480        {
1481            let olm_machine = client.olm_machine().await;
1482            let olm_machine = olm_machine.as_ref().unwrap();
1483
1484            // Assert requests.
1485            let outgoing_requests = olm_machine.outgoing_requests().await?;
1486
1487            assert!(outgoing_requests.is_empty());
1488        }
1489
1490        Ok(())
1491    }
1492
1493    #[cfg(feature = "e2e-encryption")]
1494    #[async_test]
1495    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1496        let server = MockServer::start().await;
1497
1498        #[derive(Deserialize)]
1499        struct PartialRequest {
1500            txn_id: Option<String>,
1501        }
1502
1503        let server_pos = Arc::new(Mutex::new(0));
1504        let _mock_guard = Mock::given(SlidingSyncMatcher)
1505            .respond_with(move |request: &Request| {
1506                // Repeat the txn_id in the response, if set.
1507                let request: PartialRequest = request.body_json().unwrap();
1508                let pos = {
1509                    let mut pos = server_pos.lock().unwrap();
1510                    let prev = *pos;
1511                    *pos += 1;
1512                    prev
1513                };
1514
1515                ResponseTemplate::new(200).set_body_json(json!({
1516                    "txn_id": request.txn_id,
1517                    "pos": pos.to_string(),
1518                }))
1519            })
1520            .mount_as_scoped(&server)
1521            .await;
1522
1523        let client = logged_in_client(Some(server.uri())).await;
1524
1525        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1526
1527        // `pos` is `None` to start with.
1528        {
1529            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1530
1531            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1532            assert!(request.pos.is_none());
1533        }
1534
1535        let sync = sliding_sync.sync();
1536        pin_mut!(sync);
1537
1538        // Sync goes well, and then the position is saved both into the internal memory
1539        // and the database.
1540        let next = sync.next().await;
1541        assert_matches!(next, Some(Ok(_update_summary)));
1542
1543        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1544
1545        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1546            .await?
1547            .expect("must have restored fields");
1548
1549        // While it has been saved into the database, it's not necessarily going to be
1550        // used later!
1551        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1552
1553        // Now, even if we mess with the position stored in the database, the sliding
1554        // sync instance isn't configured to reload the stream position from the
1555        // database, so it won't be changed.
1556        {
1557            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1558
1559            let mut position_guard = other_sync.inner.position.lock().await;
1560            position_guard.pos = Some("yolo".to_owned());
1561
1562            other_sync.cache_to_storage(&position_guard).await?;
1563        }
1564
1565        // It's still 0, not "yolo".
1566        {
1567            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1568            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1569            assert_eq!(request.pos.as_deref(), Some("0"));
1570        }
1571
1572        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1573        // asked to.
1574        {
1575            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1576            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1577        }
1578
1579        Ok(())
1580    }
1581
1582    #[cfg(feature = "e2e-encryption")]
1583    #[async_test]
1584    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1585        let server = MockServer::start().await;
1586
1587        #[derive(Deserialize)]
1588        struct PartialRequest {
1589            txn_id: Option<String>,
1590        }
1591
1592        let server_pos = Arc::new(Mutex::new(0));
1593        let _mock_guard = Mock::given(SlidingSyncMatcher)
1594            .respond_with(move |request: &Request| {
1595                // Repeat the txn_id in the response, if set.
1596                let request: PartialRequest = request.body_json().unwrap();
1597                let pos = {
1598                    let mut pos = server_pos.lock().unwrap();
1599                    let prev = *pos;
1600                    *pos += 1;
1601                    prev
1602                };
1603
1604                ResponseTemplate::new(200).set_body_json(json!({
1605                    "txn_id": request.txn_id,
1606                    "pos": pos.to_string(),
1607                }))
1608            })
1609            .mount_as_scoped(&server)
1610            .await;
1611
1612        let client = logged_in_client(Some(server.uri())).await;
1613
1614        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1615
1616        // `pos` is `None` to start with.
1617        {
1618            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1619
1620            assert!(request.pos.is_none());
1621            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1622        }
1623
1624        let sync = sliding_sync.sync();
1625        pin_mut!(sync);
1626
1627        // Sync goes well, and then the position is saved both into the internal memory
1628        // and the database.
1629        let next = sync.next().await;
1630        assert_matches!(next, Some(Ok(_update_summary)));
1631
1632        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1633
1634        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1635            .await?
1636            .expect("must have restored fields");
1637
1638        // While it has been saved into the database, it's not necessarily going to be
1639        // used later!
1640        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1641
1642        // Another process modifies the stream position under our feet...
1643        {
1644            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1645
1646            let mut position_guard = other_sync.inner.position.lock().await;
1647            position_guard.pos = Some("42".to_owned());
1648
1649            other_sync.cache_to_storage(&position_guard).await?;
1650        }
1651
1652        // It's alright, the next request will load it from the database.
1653        {
1654            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1655            assert_eq!(request.pos.as_deref(), Some("42"));
1656            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1657        }
1658
1659        // Recreating a sliding sync with the same ID will reload it too.
1660        {
1661            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1662            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1663
1664            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1665            assert_eq!(request.pos.as_deref(), Some("42"));
1666        }
1667
1668        // Invalidating the session will remove the in-memory value AND the database
1669        // value.
1670        sliding_sync.expire_session().await;
1671
1672        {
1673            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1674
1675            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1676            assert!(request.pos.is_none());
1677        }
1678
1679        // And new sliding syncs with the same ID won't find it either.
1680        {
1681            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1682            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1683
1684            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1685            assert!(request.pos.is_none());
1686        }
1687
1688        Ok(())
1689    }
1690
1691    #[async_test]
1692    async fn test_stop_sync_loop() -> Result<()> {
1693        let (_server, sliding_sync) = new_sliding_sync(vec![
1694            SlidingSyncList::builder("foo")
1695                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1696        ])
1697        .await?;
1698
1699        // Start the sync loop.
1700        let stream = sliding_sync.sync();
1701        pin_mut!(stream);
1702
1703        // The sync loop is actually running.
1704        assert!(stream.next().await.is_some());
1705
1706        // Stop the sync loop.
1707        sliding_sync.stop_sync()?;
1708
1709        // The sync loop is actually stopped.
1710        assert!(stream.next().await.is_none());
1711
1712        // Start a new sync loop.
1713        let stream = sliding_sync.sync();
1714        pin_mut!(stream);
1715
1716        // The sync loop is actually running.
1717        assert!(stream.next().await.is_some());
1718
1719        Ok(())
1720    }
1721
1722    #[async_test]
1723    async fn test_process_read_receipts() -> Result<()> {
1724        let room = owned_room_id!("!pony:example.org");
1725
1726        let server = MockServer::start().await;
1727        let client = logged_in_client(Some(server.uri())).await;
1728        client.event_cache().subscribe().unwrap();
1729
1730        let sliding_sync = client
1731            .sliding_sync("test")?
1732            .with_receipt_extension(
1733                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
1734            )
1735            .add_list(
1736                SlidingSyncList::builder("all")
1737                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1738            )
1739            .build()
1740            .await?;
1741
1742        // Initial state.
1743        {
1744            let server_response = assign!(http::Response::new("0".to_owned()), {
1745                rooms: BTreeMap::from([(
1746                    room.clone(),
1747                    http::response::Room::default(),
1748                )])
1749            });
1750
1751            let _summary = {
1752                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1753                sliding_sync
1754                    .handle_response(
1755                        server_response.clone(),
1756                        &mut pos_guard,
1757                        RequestedRequiredStates::default(),
1758                    )
1759                    .await?
1760            };
1761        }
1762
1763        let server_response = assign!(http::Response::new("1".to_owned()), {
1764            extensions: assign!(http::response::Extensions::default(), {
1765                receipts: assign!(http::response::Receipts::default(), {
1766                    rooms: BTreeMap::from([
1767                        (
1768                            room.clone(),
1769                            Raw::from_json_string(
1770                                json!({
1771                                    "room_id": room,
1772                                    "type": "m.receipt",
1773                                    "content": {
1774                                        "$event:bar.org": {
1775                                            "m.read": {
1776                                                client.user_id().unwrap(): {
1777                                                    "ts": 1436451550,
1778                                                }
1779                                            }
1780                                        }
1781                                    }
1782                                })
1783                                .to_string(),
1784                            ).unwrap()
1785                        )
1786                    ])
1787                })
1788            })
1789        });
1790
1791        let summary = {
1792            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1793            sliding_sync
1794                .handle_response(
1795                    server_response.clone(),
1796                    &mut pos_guard,
1797                    RequestedRequiredStates::default(),
1798                )
1799                .await?
1800        };
1801
1802        assert!(summary.rooms.contains(&room));
1803
1804        Ok(())
1805    }
1806
1807    #[async_test]
1808    async fn test_process_marked_unread_room_account_data() -> Result<()> {
1809        let room_id = owned_room_id!("!unicorn:example.org");
1810
1811        let server = MockServer::start().await;
1812        let client = logged_in_client(Some(server.uri())).await;
1813
1814        // Setup sliding sync with with one room and one list
1815
1816        let sliding_sync = client
1817            .sliding_sync("test")?
1818            .with_account_data_extension(
1819                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1820            )
1821            .add_list(
1822                SlidingSyncList::builder("all")
1823                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1824            )
1825            .build()
1826            .await?;
1827
1828        // Initial state.
1829        {
1830            let server_response = assign!(http::Response::new("0".to_owned()), {
1831                rooms: BTreeMap::from([(
1832                    room_id.clone(),
1833                    http::response::Room::default(),
1834                )])
1835            });
1836
1837            let _summary = {
1838                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1839                sliding_sync
1840                    .handle_response(
1841                        server_response.clone(),
1842                        &mut pos_guard,
1843                        RequestedRequiredStates::default(),
1844                    )
1845                    .await?
1846            };
1847        }
1848
1849        // Simulate a response that only changes the marked unread state of the room to
1850        // true
1851
1852        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
1853
1854        let update_summary = {
1855            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1856            sliding_sync
1857                .handle_response(
1858                    server_response.clone(),
1859                    &mut pos_guard,
1860                    RequestedRequiredStates::default(),
1861                )
1862                .await?
1863        };
1864
1865        // Check that the list list and entry received the update
1866
1867        assert!(update_summary.rooms.contains(&room_id));
1868
1869        let room = client.get_room(&room_id).unwrap();
1870
1871        // Check the actual room data, this powers RoomInfo
1872
1873        assert!(room.is_marked_unread());
1874
1875        // Change it back to false and check if it updates
1876
1877        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
1878
1879        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1880        sliding_sync
1881            .handle_response(
1882                server_response.clone(),
1883                &mut pos_guard,
1884                RequestedRequiredStates::default(),
1885            )
1886            .await?;
1887
1888        let room = client.get_room(&room_id).unwrap();
1889
1890        assert!(!room.is_marked_unread());
1891
1892        Ok(())
1893    }
1894
1895    fn make_mark_unread_response(
1896        response_number: &str,
1897        room_id: OwnedRoomId,
1898        unread: bool,
1899        add_rooms_section: bool,
1900    ) -> http::Response {
1901        let rooms = if add_rooms_section {
1902            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
1903        } else {
1904            BTreeMap::new()
1905        };
1906
1907        let extensions = assign!(http::response::Extensions::default(), {
1908            account_data: assign!(http::response::AccountData::default(), {
1909                rooms: BTreeMap::from([
1910                    (
1911                        room_id,
1912                        vec![
1913                            Raw::from_json_string(
1914                                json!({
1915                                    "content": {
1916                                        "unread": unread
1917                                    },
1918                                    "type": "m.marked_unread"
1919                                })
1920                                .to_string(),
1921                            ).unwrap()
1922                        ]
1923                    )
1924                ])
1925            })
1926        });
1927
1928        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
1929    }
1930
1931    #[async_test]
1932    async fn test_process_rooms_account_data() -> Result<()> {
1933        let room = owned_room_id!("!pony:example.org");
1934
1935        let server = MockServer::start().await;
1936        let client = logged_in_client(Some(server.uri())).await;
1937
1938        let sliding_sync = client
1939            .sliding_sync("test")?
1940            .with_account_data_extension(
1941                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1942            )
1943            .add_list(
1944                SlidingSyncList::builder("all")
1945                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1946            )
1947            .build()
1948            .await?;
1949
1950        // Initial state.
1951        {
1952            let server_response = assign!(http::Response::new("0".to_owned()), {
1953                rooms: BTreeMap::from([(
1954                    room.clone(),
1955                    http::response::Room::default(),
1956                )])
1957            });
1958
1959            let _summary = {
1960                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1961                sliding_sync
1962                    .handle_response(
1963                        server_response.clone(),
1964                        &mut pos_guard,
1965                        RequestedRequiredStates::default(),
1966                    )
1967                    .await?
1968            };
1969        }
1970
1971        let server_response = assign!(http::Response::new("1".to_owned()), {
1972            extensions: assign!(http::response::Extensions::default(), {
1973                account_data: assign!(http::response::AccountData::default(), {
1974                    rooms: BTreeMap::from([
1975                        (
1976                            room.clone(),
1977                            vec![
1978                                Raw::from_json_string(
1979                                    json!({
1980                                        "content": {
1981                                            "tags": {
1982                                                "u.work": {
1983                                                    "order": 0.9
1984                                                }
1985                                            }
1986                                        },
1987                                        "type": "m.tag"
1988                                    })
1989                                    .to_string(),
1990                                ).unwrap()
1991                            ]
1992                        )
1993                    ])
1994                })
1995            })
1996        });
1997        let summary = {
1998            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1999            sliding_sync
2000                .handle_response(
2001                    server_response.clone(),
2002                    &mut pos_guard,
2003                    RequestedRequiredStates::default(),
2004                )
2005                .await?
2006        };
2007
2008        assert!(summary.rooms.contains(&room));
2009
2010        Ok(())
2011    }
2012
2013    #[async_test]
2014    #[cfg(feature = "e2e-encryption")]
2015    async fn test_process_only_encryption_events() -> Result<()> {
2016        use ruma::OneTimeKeyAlgorithm;
2017
2018        let room = owned_room_id!("!croissant:example.org");
2019
2020        let server = MockServer::start().await;
2021        let client = logged_in_client(Some(server.uri())).await;
2022
2023        let server_response = assign!(http::Response::new("0".to_owned()), {
2024            rooms: BTreeMap::from([(
2025                room.clone(),
2026                assign!(http::response::Room::default(), {
2027                    name: Some("Croissants lovers".to_owned()),
2028                    timeline: Vec::new(),
2029                }),
2030            )]),
2031
2032            extensions: assign!(http::response::Extensions::default(), {
2033                e2ee: assign!(http::response::E2EE::default(), {
2034                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2035                }),
2036                to_device: Some(assign!(http::response::ToDevice::default(), {
2037                    next_batch: "to-device-token".to_owned(),
2038                })),
2039            })
2040        });
2041
2042        // Don't process non-encryption events if the sliding sync is configured for
2043        // encryption only.
2044
2045        let sliding_sync = client
2046            .sliding_sync("test")?
2047            .with_to_device_extension(
2048                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2049            )
2050            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2051            .build()
2052            .await?;
2053
2054        {
2055            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2056
2057            sliding_sync
2058                .handle_response(
2059                    server_response.clone(),
2060                    &mut position_guard,
2061                    RequestedRequiredStates::default(),
2062                )
2063                .await?;
2064        }
2065
2066        // E2EE has been properly handled.
2067        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2068        assert_eq!(uploaded_key_count, 42);
2069
2070        {
2071            let olm_machine = &*client.olm_machine_for_testing().await;
2072            assert_eq!(
2073                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2074                Some("to-device-token")
2075            );
2076        }
2077
2078        // Room events haven't.
2079        assert!(client.get_room(&room).is_none());
2080
2081        // Conversely, only process room lists events if the sliding sync was configured
2082        // as so.
2083        let client = logged_in_client(Some(server.uri())).await;
2084
2085        let sliding_sync = client
2086            .sliding_sync("test")?
2087            .add_list(SlidingSyncList::builder("thelist"))
2088            .build()
2089            .await?;
2090
2091        {
2092            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2093
2094            sliding_sync
2095                .handle_response(
2096                    server_response.clone(),
2097                    &mut position_guard,
2098                    RequestedRequiredStates::default(),
2099                )
2100                .await?;
2101        }
2102
2103        // E2EE response has been ignored.
2104        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2105        assert_eq!(uploaded_key_count, 0);
2106
2107        {
2108            let olm_machine = &*client.olm_machine_for_testing().await;
2109            assert_eq!(
2110                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2111                None
2112            );
2113        }
2114
2115        // The room is now known.
2116        assert!(client.get_room(&room).is_some());
2117
2118        // And it's also possible to set up both.
2119        let client = logged_in_client(Some(server.uri())).await;
2120
2121        let sliding_sync = client
2122            .sliding_sync("test")?
2123            .add_list(SlidingSyncList::builder("thelist"))
2124            .with_to_device_extension(
2125                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2126            )
2127            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2128            .build()
2129            .await?;
2130
2131        {
2132            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2133
2134            sliding_sync
2135                .handle_response(
2136                    server_response.clone(),
2137                    &mut position_guard,
2138                    RequestedRequiredStates::default(),
2139                )
2140                .await?;
2141        }
2142
2143        // E2EE has been properly handled.
2144        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2145        assert_eq!(uploaded_key_count, 42);
2146
2147        {
2148            let olm_machine = &*client.olm_machine_for_testing().await;
2149            assert_eq!(
2150                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2151                Some("to-device-token")
2152            );
2153        }
2154
2155        // The room is now known.
2156        assert!(client.get_room(&room).is_some());
2157
2158        Ok(())
2159    }
2160
2161    #[async_test]
2162    async fn test_lock_multiple_requests() -> Result<()> {
2163        let server = MockServer::start().await;
2164        let client = logged_in_client(Some(server.uri())).await;
2165
2166        let pos = Arc::new(Mutex::new(0));
2167        let _mock_guard = Mock::given(SlidingSyncMatcher)
2168            .respond_with(move |_: &Request| {
2169                let mut pos = pos.lock().unwrap();
2170                *pos += 1;
2171                ResponseTemplate::new(200).set_body_json(json!({
2172                    "pos": pos.to_string(),
2173                    "lists": {},
2174                    "rooms": {}
2175                }))
2176            })
2177            .mount_as_scoped(&server)
2178            .await;
2179
2180        let sliding_sync = client
2181            .sliding_sync("test")?
2182            .with_to_device_extension(
2183                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2184            )
2185            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2186            .build()
2187            .await?;
2188
2189        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2190        // test would never terminate.
2191        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2192
2193        for result in requests.await {
2194            result?;
2195        }
2196
2197        Ok(())
2198    }
2199
2200    #[async_test]
2201    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2202        let server = MockServer::start().await;
2203        let client = logged_in_client(Some(server.uri())).await;
2204
2205        let pos = Arc::new(Mutex::new(0));
2206        let _mock_guard = Mock::given(SlidingSyncMatcher)
2207            .respond_with(move |_: &Request| {
2208                let mut pos = pos.lock().unwrap();
2209                *pos += 1;
2210                // Respond slowly enough that we can skip one iteration.
2211                ResponseTemplate::new(200)
2212                    .set_body_json(json!({
2213                        "pos": pos.to_string(),
2214                        "lists": {},
2215                        "rooms": {}
2216                    }))
2217                    .set_delay(Duration::from_secs(2))
2218            })
2219            .mount_as_scoped(&server)
2220            .await;
2221
2222        let sliding_sync =
2223            client
2224                .sliding_sync("test")?
2225                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2226                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2227                ))
2228                .add_list(
2229                    SlidingSyncList::builder("another-list")
2230                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2231                )
2232                .build()
2233                .await?;
2234
2235        let stream = sliding_sync.sync();
2236        pin_mut!(stream);
2237
2238        let cloned_sync = sliding_sync.clone();
2239        spawn(async move {
2240            tokio::time::sleep(Duration::from_millis(100)).await;
2241
2242            cloned_sync
2243                .on_list("another-list", |list| {
2244                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2245                    ready(())
2246                })
2247                .await;
2248        });
2249
2250        assert_matches!(stream.next().await, Some(Ok(_)));
2251
2252        sliding_sync.stop_sync().unwrap();
2253
2254        assert_matches!(stream.next().await, None);
2255
2256        let mut num_requests = 0;
2257
2258        for request in server.received_requests().await.unwrap() {
2259            if !SlidingSyncMatcher.matches(&request) {
2260                continue;
2261            }
2262
2263            let another_list_ranges = if num_requests == 0 {
2264                // First request
2265                json!([[0, 10]])
2266            } else {
2267                // Second request
2268                json!([[10, 20]])
2269            };
2270
2271            num_requests += 1;
2272            assert!(num_requests <= 2, "more than one request hit the server");
2273
2274            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2275
2276            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2277                &json_value,
2278                &json!({
2279                    "conn_id": "test",
2280                    "lists": {
2281                        "room-list": {
2282                            "ranges": [[0, 9]],
2283                            "required_state": [
2284                                ["m.room.encryption", ""],
2285                                ["m.room.tombstone", ""]
2286                            ],
2287                        },
2288                        "another-list": {
2289                            "ranges": another_list_ranges,
2290                            "required_state": [
2291                                ["m.room.encryption", ""],
2292                                ["m.room.tombstone", ""]
2293                            ],
2294                        },
2295                    }
2296                }),
2297                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2298            ) {
2299                dbg!(json_value);
2300                panic!("json differ: {err}");
2301            }
2302        }
2303
2304        assert_eq!(num_requests, 2);
2305
2306        Ok(())
2307    }
2308
2309    #[async_test]
2310    async fn test_timeout_zero_list() -> Result<()> {
2311        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2312
2313        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2314
2315        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2316        // on new update to pop.
2317        assert!(request.timeout.is_some());
2318
2319        Ok(())
2320    }
2321
2322    #[async_test]
2323    async fn test_timeout_one_list() -> Result<()> {
2324        let (_server, sliding_sync) = new_sliding_sync(vec![
2325            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2326        ])
2327        .await?;
2328
2329        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2330
2331        // The list does not require a timeout.
2332        assert!(request.timeout.is_none());
2333
2334        // Simulate a response.
2335        {
2336            let server_response = assign!(http::Response::new("0".to_owned()), {
2337                lists: BTreeMap::from([(
2338                    "foo".to_owned(),
2339                    assign!(http::response::List::default(), {
2340                        count: uint!(7),
2341                    })
2342                 )])
2343            });
2344
2345            let _summary = {
2346                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2347                sliding_sync
2348                    .handle_response(
2349                        server_response.clone(),
2350                        &mut pos_guard,
2351                        RequestedRequiredStates::default(),
2352                    )
2353                    .await?
2354            };
2355        }
2356
2357        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2358
2359        // The list is now fully loaded, so it requires a timeout.
2360        assert!(request.timeout.is_some());
2361
2362        Ok(())
2363    }
2364
2365    #[async_test]
2366    async fn test_timeout_three_lists() -> Result<()> {
2367        let (_server, sliding_sync) = new_sliding_sync(vec![
2368            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2369            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2370            SlidingSyncList::builder("baz")
2371                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2372        ])
2373        .await?;
2374
2375        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2376
2377        // Two lists don't require a timeout.
2378        assert!(request.timeout.is_none());
2379
2380        // Simulate a response.
2381        {
2382            let server_response = assign!(http::Response::new("0".to_owned()), {
2383                lists: BTreeMap::from([(
2384                    "foo".to_owned(),
2385                    assign!(http::response::List::default(), {
2386                        count: uint!(7),
2387                    })
2388                 )])
2389            });
2390
2391            let _summary = {
2392                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2393                sliding_sync
2394                    .handle_response(
2395                        server_response.clone(),
2396                        &mut pos_guard,
2397                        RequestedRequiredStates::default(),
2398                    )
2399                    .await?
2400            };
2401        }
2402
2403        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2404
2405        // One don't require a timeout.
2406        assert!(request.timeout.is_none());
2407
2408        // Simulate a response.
2409        {
2410            let server_response = assign!(http::Response::new("1".to_owned()), {
2411                lists: BTreeMap::from([(
2412                    "bar".to_owned(),
2413                    assign!(http::response::List::default(), {
2414                        count: uint!(7),
2415                    })
2416                 )])
2417            });
2418
2419            let _summary = {
2420                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2421                sliding_sync
2422                    .handle_response(
2423                        server_response.clone(),
2424                        &mut pos_guard,
2425                        RequestedRequiredStates::default(),
2426                    )
2427                    .await?
2428            };
2429        }
2430
2431        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2432
2433        // All lists require a timeout.
2434        assert!(request.timeout.is_some());
2435
2436        Ok(())
2437    }
2438
2439    #[async_test]
2440    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2441        let server = MockServer::start().await;
2442        let client = logged_in_client(Some(server.uri())).await;
2443
2444        let _mock_guard = Mock::given(SlidingSyncMatcher)
2445            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2446                "pos": "0",
2447                "lists": {},
2448                "rooms": {}
2449            })))
2450            .mount_as_scoped(&server)
2451            .await;
2452
2453        let sliding_sync = client
2454            .sliding_sync("test")?
2455            .with_to_device_extension(
2456                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2457            )
2458            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2459            .build()
2460            .await?;
2461
2462        let sliding_sync = Arc::new(sliding_sync);
2463
2464        // Create the listener and perform a sync request
2465        let sync_beat_listener = client.inner.sync_beat.listen();
2466        sliding_sync.sync_once().await?;
2467
2468        // The sync beat listener should be notified shortly after
2469        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2470        Ok(())
2471    }
2472
2473    #[async_test]
2474    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2475        let server = MockServer::start().await;
2476        let client = logged_in_client(Some(server.uri())).await;
2477
2478        let _mock_guard = Mock::given(SlidingSyncMatcher)
2479            .respond_with(ResponseTemplate::new(404))
2480            .mount_as_scoped(&server)
2481            .await;
2482
2483        let sliding_sync = client
2484            .sliding_sync("test")?
2485            .with_to_device_extension(
2486                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2487            )
2488            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2489            .build()
2490            .await?;
2491
2492        let sliding_sync = Arc::new(sliding_sync);
2493
2494        // Create the listener and perform a sync request
2495        let sync_beat_listener = client.inner.sync_beat.listen();
2496        let sync_result = sliding_sync.sync_once().await;
2497        assert!(sync_result.is_err());
2498
2499        // The sync beat listener won't be notified in this case
2500        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2501
2502        Ok(())
2503    }
2504
2505    #[async_test]
2506    async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2507        let server = MatrixMockServer::new().await;
2508        let client = server.client_builder().build().await;
2509        let room_id = room_id!("!mu5hr00m:example.org");
2510
2511        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2512            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2513                "pos": "0",
2514                "lists": {},
2515                "extensions": {
2516                    "account_data": {
2517                        "global": [
2518                            {
2519                                "type": "m.direct",
2520                                "content": {
2521                                    "@de4dlockh0lmes:example.org": [
2522                                        "!mu5hr00m:example.org"
2523                                    ]
2524                                }
2525                            }
2526                        ]
2527                    }
2528                },
2529                "rooms": {
2530                    room_id: {
2531                        "name": "Mario Bros Fanbase Room",
2532                        "initial": true,
2533                    },
2534                }
2535            })))
2536            .mount_as_scoped(server.server())
2537            .await;
2538
2539        let f = EventFactory::new().room(room_id);
2540
2541        Mock::given(method("GET"))
2542            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2543            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2544                "chunk": [
2545                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2546                ]
2547            })))
2548            .mount(server.server())
2549            .await;
2550
2551        let (tx, rx) = tokio::sync::oneshot::channel();
2552
2553        let tx = Arc::new(Mutex::new(Some(tx)));
2554        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2555            // Try to run a /members query while in a event handler.
2556            let members =
2557                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2558            assert_eq!(members.len(), 1);
2559            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2560        });
2561
2562        let sliding_sync = client
2563            .sliding_sync("test")?
2564            .add_list(SlidingSyncList::builder("thelist"))
2565            .with_account_data_extension(
2566                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2567            )
2568            .build()
2569            .await?;
2570
2571        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2572            .await
2573            .expect("Sync did not complete in time")
2574            .expect("Sync failed");
2575
2576        // Wait for the event handler to complete.
2577        tokio::time::timeout(Duration::from_secs(5), rx)
2578            .await
2579            .expect("Event handler did not complete in time")
2580            .expect("Event handler failed");
2581
2582        Ok(())
2583    }
2584}