matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23
24use std::{
25    collections::{BTreeMap, btree_map::Entry},
26    fmt::Debug,
27    future::Future,
28    sync::{Arc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard},
29    time::Duration,
30};
31
32use async_stream::stream;
33pub use client::{Version, VersionBuilder};
34use futures_core::stream::Stream;
35use matrix_sdk_base::RequestedRequiredStates;
36#[cfg(feature = "e2e-encryption")]
37use matrix_sdk_common::executor::JoinHandleExt as _;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40    OwnedRoomId, RoomId,
41    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42    assign,
43};
44use tokio::{
45    select,
46    sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
47};
48use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
49
50pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
51use self::{cache::restore_sliding_sync_state, client::SlidingSyncResponseProcessor};
52use crate::{Client, Result, config::RequestConfig};
53
54/// The Sliding Sync instance.
55///
56/// It is OK to clone this type as much as you need: cloning it is cheap.
57#[derive(Clone, Debug)]
58pub struct SlidingSync {
59    /// The Sliding Sync data.
60    inner: Arc<SlidingSyncInner>,
61}
62
63#[derive(Debug)]
64pub(super) struct SlidingSyncInner {
65    /// A unique identifier for this instance of sliding sync.
66    ///
67    /// Used to distinguish different connections to sliding sync.
68    id: String,
69
70    /// The HTTP Matrix client.
71    client: Client,
72
73    /// Long-polling timeout that appears in sliding sync request.
74    poll_timeout: Duration,
75
76    /// Extra duration for the sliding sync request to timeout. This is added to
77    /// the [`Self::poll_timeout`].
78    network_timeout: Duration,
79
80    /// The storage key to keep this cache at and load it from.
81    storage_key: String,
82
83    /// Should this sliding sync instance try to restore its sync position
84    /// from the database?
85    ///
86    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
87    /// keep it even so, to avoid sparkling cfg statements everywhere
88    /// throughout this file.
89    share_pos: bool,
90
91    /// Position markers.
92    ///
93    /// The `pos` marker represents a progression when exchanging requests and
94    /// responses with the server: the server acknowledges the request by
95    /// responding with a new `pos`. If the client sends two non-necessarily
96    /// consecutive requests with the same `pos`, the server has to reply with
97    /// the same identical response.
98    ///
99    /// `position` is behind a mutex so that a new request starts after the
100    /// previous request trip has fully ended (successfully or not). This
101    /// mechanism exists to wait for the response to be handled and to see the
102    /// `position` being updated, before sending a new request.
103    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
104
105    /// The lists of this Sliding Sync instance.
106    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
107
108    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
109    /// but one wants to receive updates.
110    room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, http::request::RoomSubscription>>,
111
112    /// The intended state of the extensions being supplied to sliding /sync
113    /// calls.
114    extensions: http::request::Extensions,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127        cache::store_sliding_sync_state(self, position).await
128    }
129
130    /// Create a new [`SlidingSyncBuilder`].
131    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132        SlidingSyncBuilder::new(id, client)
133    }
134
135    /// Add subscriptions to many rooms.
136    ///
137    /// If the associated `Room`s exist, they will be marked as members are
138    /// missing, so that it ensures to re-fetch all members.
139    ///
140    /// A subscription to an already subscribed room is ignored.
141    pub fn subscribe_to_rooms(
142        &self,
143        room_ids: &[&RoomId],
144        settings: Option<http::request::RoomSubscription>,
145        cancel_in_flight_request: bool,
146    ) {
147        if subscribe_to_rooms(
148            self.inner.room_subscriptions.write().unwrap(),
149            &self.inner.client,
150            room_ids,
151            settings,
152            cancel_in_flight_request,
153        ) {
154            self.inner.internal_channel_send_if_possible(
155                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
156            );
157        }
158    }
159
160    /// Remove subscriptions to many rooms.
161    pub fn unsubscribe_to_rooms(&self, room_ids: &[&RoomId], cancel_in_flight_request: bool) {
162        let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
163        let mut skip_over_current_sync_loop_iteration = false;
164
165        for room_id in room_ids {
166            if room_subscriptions.remove(*room_id).is_some() {
167                skip_over_current_sync_loop_iteration = true;
168            }
169        }
170
171        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172            self.inner.internal_channel_send_if_possible(
173                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174            );
175        }
176    }
177
178    /// Replace all subscriptions to rooms by other ones.
179    ///
180    /// If the associated `Room`s exist, they will be marked as members are
181    /// missing, so that it ensures to re-fetch all members.
182    pub fn clear_and_subscribe_to_rooms(
183        &self,
184        room_ids: &[&RoomId],
185        settings: Option<http::request::RoomSubscription>,
186        cancel_in_flight_request: bool,
187    ) {
188        let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
189        room_subscriptions.clear();
190
191        if subscribe_to_rooms(
192            room_subscriptions,
193            &self.inner.client,
194            room_ids,
195            settings,
196            cancel_in_flight_request,
197        ) {
198            self.inner.internal_channel_send_if_possible(
199                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
200            );
201        }
202    }
203
204    /// Find a list by its name, and do something on it if it exists.
205    pub async fn on_list<Function, FunctionOutput, R>(
206        &self,
207        list_name: &str,
208        function: Function,
209    ) -> Option<R>
210    where
211        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
212        FunctionOutput: Future<Output = R>,
213    {
214        let lists = self.inner.lists.read().await;
215
216        match lists.get(list_name) {
217            Some(list) => Some(function(list).await),
218            None => None,
219        }
220    }
221
222    /// Add the list to the list of lists.
223    ///
224    /// As lists need to have a unique `.name`, if a list with the same name
225    /// is found the new list will replace the old one and the return it or
226    /// `None`.
227    pub async fn add_list(
228        &self,
229        list_builder: SlidingSyncListBuilder,
230    ) -> Result<Option<SlidingSyncList>> {
231        let list = list_builder.build(self.inner.internal_channel.clone());
232
233        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
234
235        self.inner.internal_channel_send_if_possible(
236            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
237        );
238
239        Ok(old_list)
240    }
241
242    /// Add a list that will be cached and reloaded from the cache.
243    ///
244    /// This will raise an error if a storage key was not set, or if there
245    /// was a I/O error reading from the cache.
246    ///
247    /// The rest of the semantics is the same as [`Self::add_list`].
248    pub async fn add_cached_list(
249        &self,
250        mut list_builder: SlidingSyncListBuilder,
251    ) -> Result<Option<SlidingSyncList>> {
252        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
253
254        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
255
256        self.add_list(list_builder).await
257    }
258
259    /// Handle the HTTP response.
260    #[instrument(skip_all)]
261    async fn handle_response(
262        &self,
263        mut sliding_sync_response: http::Response,
264        position: &mut SlidingSyncPositionMarkers,
265        requested_required_states: RequestedRequiredStates,
266    ) -> Result<UpdateSummary, crate::Error> {
267        let pos = Some(sliding_sync_response.pos.clone());
268
269        let must_process_rooms_response = self.must_process_rooms_response().await;
270
271        trace!(yes = must_process_rooms_response, "Must process rooms response?");
272
273        // Transform a Sliding Sync Response to a `SyncResponse`.
274        //
275        // We may not need the `sync_response` in the future (once `SyncResponse` will
276        // move to Sliding Sync, i.e. to `http::Response`), but processing the
277        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
278        // happens here.
279
280        let sync_response = {
281            let _timer = timer!("response processor");
282
283            let response_processor = {
284                // Take the lock to synchronise accesses to the state store, to avoid concurrent
285                // sliding syncs overwriting each other's room infos.
286                let _state_store_lock = {
287                    let _timer = timer!("acquiring the `state_store_lock`");
288
289                    self.inner.client.base_client().state_store_lock().lock().await
290                };
291
292                let mut response_processor =
293                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
294
295                // Process thread subscriptions if they're available.
296                //
297                // It's important to do this *before* handling the room responses, so that
298                // notifications can be properly generated based on the thread subscriptions,
299                // for the events in threads we've subscribed to.
300                if self.is_thread_subscriptions_enabled() {
301                    response_processor
302                        .handle_thread_subscriptions(
303                            position.pos.as_deref(),
304                            std::mem::take(
305                                &mut sliding_sync_response.extensions.thread_subscriptions,
306                            ),
307                        )
308                        .await?;
309                }
310
311                #[cfg(feature = "e2e-encryption")]
312                if self.is_e2ee_enabled() {
313                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
314                }
315
316                // Only handle the room's subsection of the response, if this sliding sync was
317                // configured to do so.
318                if must_process_rooms_response {
319                    response_processor
320                        .handle_room_response(&sliding_sync_response, &requested_required_states)
321                        .await?;
322                }
323
324                response_processor
325            };
326
327            // Release the lock before calling event handlers
328            response_processor.process_and_take_response().await?
329        };
330
331        debug!("Sliding Sync response has been handled by the client");
332        trace!(?sync_response);
333
334        let update_summary = {
335            // Update the rooms.
336            let updated_rooms = {
337                let mut updated_rooms = Vec::with_capacity(
338                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
339                );
340
341                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
342
343                // There might be other rooms that were only mentioned in the sliding sync
344                // extensions part of the response, and thus would result in rooms present in
345                // the `sync_response.joined`. Mark them as updated too.
346                //
347                // Since we've removed rooms that were in the room subsection from
348                // `sync_response.rooms.joined`, the remaining ones aren't already present in
349                // `updated_rooms` and wouldn't cause any duplicates.
350                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
351
352                updated_rooms
353            };
354
355            // Update the lists.
356            let updated_lists = {
357                debug!(
358                    lists = ?sliding_sync_response.lists,
359                    "Update lists"
360                );
361
362                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
363                let mut lists = self.inner.lists.write().await;
364
365                // Iterate on known lists, not on lists in the response. Rooms may have been
366                // updated that were not involved in any list update.
367                for (name, list) in lists.iter_mut() {
368                    if let Some(updates) = sliding_sync_response.lists.get(name) {
369                        let maximum_number_of_rooms: u32 =
370                            updates.count.try_into().expect("failed to convert `count` to `u32`");
371
372                        if list.update(Some(maximum_number_of_rooms))? {
373                            updated_lists.push(name.clone());
374                        }
375                    } else if list.update(None)? {
376                        updated_lists.push(name.clone());
377                    }
378                }
379
380                // Report about unknown lists.
381                for name in sliding_sync_response.lists.keys() {
382                    if !lists.contains_key(name) {
383                        error!("Response for list `{name}` - unknown to us; skipping");
384                    }
385                }
386
387                updated_lists
388            };
389
390            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
391        };
392
393        // Everything went well, we can update the position markers.
394        //
395        // Save the new position markers.
396        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
397
398        position.pos = pos;
399
400        Ok(update_summary)
401    }
402
403    async fn generate_sync_request(
404        &self,
405    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
406        // Collect requests for lists.
407        let mut requests_lists = BTreeMap::new();
408
409        let timeout = {
410            let lists = self.inner.lists.read().await;
411
412            // Start at `Default` in case there is zero list.
413            let mut timeout = PollTimeout::Default;
414
415            for (name, list) in lists.iter() {
416                requests_lists.insert(name.clone(), list.next_request()?);
417                timeout = timeout.min(list.requires_timeout());
418            }
419
420            timeout
421        };
422
423        // Collect the `pos`.
424        //
425        // Wait on the `position` mutex to be available. It means no request nor
426        // response is running. The `position` mutex is released whether the response
427        // has been fully handled successfully, in this case the `pos` is updated, or
428        // the response handling has failed, in this case the `pos` hasn't been updated
429        // and the same `pos` will be used for this new request.
430        let mut position_guard = {
431            debug!("Waiting to acquire the `position` lock");
432
433            let _timer = timer!("acquiring the `position` lock");
434
435            self.inner.position.clone().lock_owned().await
436        };
437
438        debug!(pos = ?position_guard.pos, "Got a position");
439
440        let to_device_enabled = self.inner.extensions.to_device.enabled == Some(true);
441
442        let restored_fields = if self.inner.share_pos || to_device_enabled {
443            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
444        } else {
445            None
446        };
447
448        // Update pos: either the one restored from the database, if any and the sliding
449        // sync was configured so, or read it from the memory cache.
450        let pos = if self.inner.share_pos {
451            if let Some(fields) = &restored_fields {
452                // Override the memory one with the database one, for consistency.
453                if fields.pos != position_guard.pos {
454                    info!(
455                        "Pos from previous request ('{:?}') was different from \
456                         pos in database ('{:?}').",
457                        position_guard.pos, fields.pos
458                    );
459                    position_guard.pos = fields.pos.clone();
460                }
461                fields.pos.clone()
462            } else {
463                position_guard.pos.clone()
464            }
465        } else {
466            position_guard.pos.clone()
467        };
468
469        // When the client sends a request with no `pos`, MSC4186 returns no device
470        // lists updates, as it only returns changes since the provided `pos`
471        // (which is `null` in this case); this is in line with sync v2.
472        //
473        // Therefore, with MSC4186, the device list cache must be marked as to be
474        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
475        // device lists updates that happened between the previous request and the new
476        // “initial” request.
477        #[cfg(feature = "e2e-encryption")]
478        if pos.is_none() && self.is_e2ee_enabled() {
479            info!("Marking all tracked users as dirty");
480
481            let olm_machine = self.inner.client.olm_machine().await;
482            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
483            olm_machine.mark_all_tracked_users_as_dirty().await?;
484        }
485
486        // Configure the timeout.
487        //
488        // The `timeout` query is necessary when all lists require it. Please see
489        // [`SlidingSyncList::requires_timeout`].
490        let timeout = match timeout {
491            PollTimeout::None => None,
492            PollTimeout::Some(timeout) => Some(Duration::from_secs(timeout.into())),
493            PollTimeout::Default => Some(self.inner.poll_timeout),
494        };
495
496        Span::current()
497            .record("pos", &pos)
498            .record("timeout", timeout.map(|duration| duration.as_millis()));
499
500        let mut request = assign!(http::Request::new(), {
501            conn_id: Some(self.inner.id.clone()),
502            pos,
503            timeout,
504            lists: requests_lists,
505        });
506
507        // Add room subscriptions.
508        request.room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
509
510        // Add extensions.
511        request.extensions = self.inner.extensions.clone();
512
513        // Override the to-device token if the extension is enabled.
514        if to_device_enabled {
515            request.extensions.to_device.since =
516                restored_fields.and_then(|fields| fields.to_device_token);
517        }
518
519        Ok((
520            // The request itself.
521            request,
522            // Configure long-polling. We need some time for the long-poll itself,
523            // and extra time for the network delays.
524            RequestConfig::default()
525                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
526                .retry_limit(3),
527            position_guard,
528        ))
529    }
530
531    /// Send a sliding sync request.
532    ///
533    /// This method contains the sending logic.
534    async fn send_sync_request(
535        &self,
536        request: http::Request,
537        request_config: RequestConfig,
538        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
539    ) -> Result<UpdateSummary> {
540        debug!("Sending request");
541
542        // Prepare the request.
543        let requested_required_states = RequestedRequiredStates::from(&request);
544        let request = self.inner.client.send(request).with_request_config(request_config);
545
546        // Send the request and get a response with end-to-end encryption support.
547        //
548        // Sending the `/sync` request out when end-to-end encryption is enabled means
549        // that we need to also send out any outgoing e2ee related request out
550        // coming from the `OlmMachine::outgoing_requests()` method.
551
552        #[cfg(feature = "e2e-encryption")]
553        let response = {
554            if self.is_e2ee_enabled() {
555                // Here, we need to run 2 things:
556                //
557                // 1. Send the sliding sync request and get a response,
558                // 2. Send the E2EE requests.
559                //
560                // We don't want to use a `join` or `try_join` because we want to fail if and
561                // only if sending the sliding sync request fails. Failing to send the E2EE
562                // requests should just result in a log.
563                //
564                // We also want to give the priority to sliding sync request. E2EE requests are
565                // sent concurrently to the sliding sync request, but the priority is on waiting
566                // a sliding sync response.
567                //
568                // If sending sliding sync request fails, the sending of E2EE requests must be
569                // aborted as soon as possible.
570
571                let client = self.inner.client.clone();
572                let e2ee_uploads = spawn(
573                    async move {
574                        if let Err(error) = client.send_outgoing_requests().await {
575                            error!(?error, "Error while sending outgoing E2EE requests");
576                        }
577                    }
578                    .instrument(Span::current()),
579                )
580                // Ensure that the task is not running in detached mode. It is aborted when it's
581                // dropped.
582                .abort_on_drop();
583
584                // Wait on the sliding sync request success or failure early.
585                let response = request.await?;
586
587                // At this point, if `request` has been resolved successfully, we wait on
588                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
589                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
590                // been dropped, so aborted.
591                e2ee_uploads.await.map_err(|error| Error::JoinError {
592                    task_description: "e2ee_uploads".to_owned(),
593                    error,
594                })?;
595
596                response
597            } else {
598                request.await?
599            }
600        };
601
602        // Send the request and get a response _without_ end-to-end encryption support.
603        #[cfg(not(feature = "e2e-encryption"))]
604        let response = request.await?;
605
606        debug!("Received response");
607
608        // At this point, the request has been sent, and a response has been received.
609        //
610        // We must ensure the handling of the response cannot be stopped/
611        // cancelled. It must be done entirely, otherwise we can have
612        // corrupted/incomplete states for Sliding Sync and other parts of
613        // the code.
614        //
615        // That's why we are running the handling of the response in a spawned
616        // future that cannot be cancelled by anything.
617        let this = self.clone();
618
619        // Spawn a new future to ensure that the code inside this future cannot be
620        // cancelled if this method is cancelled.
621        let future = async move {
622            debug!("Start handling response");
623
624            // In case the task running this future is detached, we must
625            // ensure responses are handled one at a time. At this point we still own
626            // `position_guard`, so we're fine.
627
628            // Handle the response.
629            let updates = this
630                .handle_response(response, &mut position_guard, requested_required_states)
631                .await?;
632
633            this.cache_to_storage(&position_guard).await?;
634
635            // Release the position guard lock.
636            // It means that other responses can be generated and then handled later.
637            drop(position_guard);
638
639            debug!("Done handling response");
640
641            Ok(updates)
642        };
643
644        spawn(future.instrument(Span::current())).await.unwrap()
645    }
646
647    /// Is the e2ee extension enabled for this sliding sync instance?
648    #[cfg(feature = "e2e-encryption")]
649    fn is_e2ee_enabled(&self) -> bool {
650        self.inner.extensions.e2ee.enabled == Some(true)
651    }
652
653    /// Is the thread subscriptions extension enabled for this sliding sync
654    /// instance?
655    fn is_thread_subscriptions_enabled(&self) -> bool {
656        self.inner.extensions.thread_subscriptions.enabled == Some(true)
657    }
658
659    #[cfg(not(feature = "e2e-encryption"))]
660    fn is_e2ee_enabled(&self) -> bool {
661        false
662    }
663
664    /// Should we process the room's subpart of a response?
665    async fn must_process_rooms_response(&self) -> bool {
666        // We consider that we must, if there's any room subscription or there's any
667        // list.
668        !self.inner.room_subscriptions.read().unwrap().is_empty()
669            || !self.inner.lists.read().await.is_empty()
670    }
671
672    /// Send a single sliding sync request, and returns the response summary.
673    ///
674    /// Public for testing purposes only.
675    #[doc(hidden)]
676    #[instrument(skip_all, fields(conn_id = self.inner.id, pos, timeout))]
677    pub async fn sync_once(&self) -> Result<UpdateSummary> {
678        let (request, request_config, position_guard) = self.generate_sync_request().await?;
679
680        // Send the request.
681        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
682
683        // Notify a new sync was received.
684        self.inner.client.inner.sync_beat.notify(usize::MAX);
685
686        Ok(summaries)
687    }
688
689    /// Create a _new_ Sliding Sync sync loop.
690    ///
691    /// This method returns a `Stream`, which will send requests and will handle
692    /// responses automatically. Lists and rooms are updated automatically.
693    ///
694    /// This function returns `Ok(…)` if everything went well, otherwise it will
695    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
696    /// termination.
697    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
698    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
699    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
700        debug!("Starting sync stream");
701
702        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
703
704        stream! {
705            loop {
706                debug!("Sync stream is running");
707
708                select! {
709                    biased;
710
711                    internal_message = internal_channel_receiver.recv() => {
712                        use SlidingSyncInternalMessage::*;
713
714                        debug!(?internal_message, "Sync stream has received an internal message");
715
716                        match internal_message {
717                            Err(_) | Ok(SyncLoopStop) => {
718                                break;
719                            }
720
721                            Ok(SyncLoopSkipOverCurrentIteration) => {
722                                continue;
723                            }
724                        }
725                    }
726
727                    update_summary = self.sync_once() => {
728                        match update_summary {
729                            Ok(updates) => {
730                                yield Ok(updates);
731                            }
732
733                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
734                            Err(error) => {
735                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
736                                    // The Sliding Sync session has expired. Let's reset `pos`.
737                                    self.expire_session().await;
738                                }
739
740                                yield Err(error);
741
742                                // Terminates the loop, and terminates the stream.
743                                break;
744                            }
745                        }
746                    }
747                }
748            }
749
750            debug!("Sync stream has exited.");
751        }
752    }
753
754    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
755    ///
756    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
757    /// enough to “stop” it, but depending of how this `Stream` is used, it
758    /// might not be obvious to drop it immediately (thinking of using this API
759    /// over FFI; the foreign-language might not be able to drop a value
760    /// immediately). Thus, calling this method will ensure that the sync loop
761    /// stops gracefully and as soon as it returns.
762    pub fn stop_sync(&self) -> Result<()> {
763        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
764    }
765
766    /// Expire the current Sliding Sync session on the client-side.
767    ///
768    /// Expiring a Sliding Sync session means: resetting `pos`.
769    ///
770    /// This should only be used when it's clear that this session was about to
771    /// expire anyways, and should be used only in very specific cases (e.g.
772    /// multiple sliding syncs being run in parallel, and one of them has
773    /// expired).
774    ///
775    /// This method **MUST** be called when the sync loop is stopped.
776    #[doc(hidden)]
777    pub async fn expire_session(&self) {
778        info!("Session expired; resetting `pos`");
779
780        {
781            let lists = self.inner.lists.read().await;
782
783            for list in lists.values() {
784                // Invalidate in-memory data that would be persisted on disk.
785                list.set_maximum_number_of_rooms(None);
786            }
787        }
788
789        // Remove the cached sliding sync state as well.
790        {
791            let mut position = self.inner.position.lock().await;
792
793            // Invalidate in memory.
794            position.pos = None;
795
796            // Propagate to disk.
797            // Note: this propagates both the sliding sync state and the cached lists'
798            // state to disk.
799            if let Err(err) = self.cache_to_storage(&position).await {
800                warn!("Failed to invalidate cached sliding sync state: {err}");
801            }
802        }
803
804        {
805            // Clear all room subscriptions: we don't want to resend all room subscriptions
806            // when the session will restart.
807            self.inner.room_subscriptions.write().unwrap().clear();
808        }
809    }
810}
811
812/// Private implementation for [`SlidingSync::subscribe_to_rooms`] and
813/// [`SlidingSync::clear_and_subscribe_to_rooms`].
814fn subscribe_to_rooms(
815    mut room_subscriptions: StdRwLockWriteGuard<
816        '_,
817        BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
818    >,
819    client: &Client,
820    room_ids: &[&RoomId],
821    settings: Option<http::request::RoomSubscription>,
822    cancel_in_flight_request: bool,
823) -> bool {
824    let settings = settings.unwrap_or_default();
825    let mut skip_over_current_sync_loop_iteration = false;
826
827    for room_id in room_ids {
828        if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
829            if let Some(room) = client.get_room(room_id) {
830                room.mark_members_missing();
831            }
832
833            entry.insert(settings.clone());
834
835            skip_over_current_sync_loop_iteration = true;
836        }
837    }
838
839    cancel_in_flight_request && skip_over_current_sync_loop_iteration
840}
841
842impl SlidingSyncInner {
843    /// Send a message over the internal channel.
844    #[instrument]
845    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
846        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
847    }
848
849    /// Send a message over the internal channel if there is a receiver, i.e. if
850    /// the sync loop is running.
851    #[instrument]
852    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
853        // If there is no receiver, the send will fail, but that's OK here.
854        let _ = self.internal_channel.send(message);
855    }
856}
857
858#[derive(Copy, Clone, Debug, PartialEq)]
859enum SlidingSyncInternalMessage {
860    /// Instruct the sync loop to stop.
861    SyncLoopStop,
862
863    /// Instruct the sync loop to skip over any remaining work in its iteration,
864    /// and to jump to the next iteration.
865    SyncLoopSkipOverCurrentIteration,
866}
867
868#[cfg(any(test, feature = "testing"))]
869impl SlidingSync {
870    /// Set a new value for `pos`.
871    pub async fn set_pos(&self, new_pos: String) {
872        let mut position_lock = self.inner.position.lock().await;
873        position_lock.pos = Some(new_pos);
874    }
875}
876
877#[derive(Clone, Debug)]
878pub(super) struct SlidingSyncPositionMarkers {
879    /// An ephemeral position in the current stream, as received from the
880    /// previous `/sync` response, or `None` for the first request.
881    pos: Option<String>,
882}
883
884/// A summary of the updates received after a sync (like in
885/// [`SlidingSync::sync`]).
886#[derive(Debug, Clone)]
887pub struct UpdateSummary {
888    /// The names of the lists that have seen an update.
889    pub lists: Vec<String>,
890    /// The rooms that have seen updates
891    pub rooms: Vec<OwnedRoomId>,
892}
893
894/// Define what kind of poll timeout [`SlidingSync`] must use.
895///
896/// [The spec says about `timeout`][spec]:
897///
898/// > How long to wait for new events […] If omitted the response is always
899/// > returned immediately, even if there are no changes.
900///
901/// [spec]: https://github.com/matrix-org/matrix-spec-proposals/blob/erikj/sss/proposals/4186-simplified-sliding-sync.md#top-level
902#[derive(Debug)]
903pub enum PollTimeout {
904    /// No `timeout` must be present.
905    None,
906
907    /// A `timeout=X` must be present, where `X` is in seconds and
908    /// represents how long to wait for new events.
909    Some(u32),
910
911    /// A `timeout=X` must be present, where `X` is the default value passed to
912    /// [`SlidingSyncBuilder::poll_timeout`].
913    Default,
914}
915
916impl PollTimeout {
917    /// Computes the smallest `PollTimeout` between two of them.
918    ///
919    /// The rules are the following:
920    ///
921    /// * `None` < `Some`,
922    /// * `Some(x) < Some(y)` if and only if `x < y`,
923    /// * `Some < Default`.
924    ///
925    /// The `Default` value is unknown at this step but is assumed to be the
926    /// largest.
927    fn min(self, left: Self) -> Self {
928        match (self, left) {
929            (Self::None, _) => Self::None,
930
931            (Self::Some(_), Self::None) => Self::None,
932            (Self::Some(right), Self::Some(left)) => Self::Some(right.min(left)),
933            (Self::Some(right), Self::Default) => Self::Some(right),
934
935            (Self::Default, Self::None) => Self::None,
936            (Self::Default, Self::Some(left)) => Self::Some(left),
937            (Self::Default, Self::Default) => Self::Default,
938        }
939    }
940}
941
942#[cfg(all(test, not(target_family = "wasm")))]
943#[allow(clippy::dbg_macro)]
944mod tests {
945    use std::{
946        collections::BTreeMap,
947        future::ready,
948        ops::Not,
949        sync::{Arc, Mutex},
950        time::Duration,
951    };
952
953    use assert_matches::assert_matches;
954    use event_listener::Listener;
955    use futures_util::{StreamExt, future::join_all, pin_mut};
956    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
957    use matrix_sdk_common::executor::spawn;
958    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
959    use ruma::{
960        OwnedRoomId, assign,
961        events::{direct::DirectEvent, room::member::MembershipState},
962        owned_room_id, room_id,
963        serde::Raw,
964        uint,
965    };
966    use serde::Deserialize;
967    use serde_json::json;
968    use wiremock::{
969        Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
970    };
971
972    use super::{
973        SlidingSync, SlidingSyncBuilder, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
974        cache::restore_sliding_sync_state, http,
975    };
976    use crate::{
977        Client, Result,
978        test_utils::{logged_in_client, mocks::MatrixMockServer},
979    };
980
981    #[derive(Copy, Clone)]
982    struct SlidingSyncMatcher;
983
984    impl Match for SlidingSyncMatcher {
985        fn matches(&self, request: &Request) -> bool {
986            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
987                && request.method == Method::POST
988        }
989    }
990
991    async fn new_sliding_sync(
992        lists: Vec<SlidingSyncListBuilder>,
993    ) -> Result<(MockServer, SlidingSync)> {
994        let server = MockServer::start().await;
995        let client = logged_in_client(Some(server.uri())).await;
996
997        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
998
999        for list in lists {
1000            sliding_sync_builder = sliding_sync_builder.add_list(list);
1001        }
1002
1003        let sliding_sync = sliding_sync_builder.build().await?;
1004
1005        Ok((server, sliding_sync))
1006    }
1007
1008    #[async_test]
1009    async fn test_subscribe_to_rooms() -> Result<()> {
1010        let (server, sliding_sync) = new_sliding_sync(vec![
1011            SlidingSyncList::builder("foo")
1012                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1013        ])
1014        .await?;
1015
1016        let stream = sliding_sync.sync();
1017        pin_mut!(stream);
1018
1019        let room_id_0 = room_id!("!r0:bar.org");
1020        let room_id_1 = room_id!("!r1:bar.org");
1021        let room_id_2 = room_id!("!r2:bar.org");
1022
1023        {
1024            let _mock_guard = Mock::given(SlidingSyncMatcher)
1025                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1026                    "pos": "1",
1027                    "lists": {},
1028                    "rooms": {
1029                        room_id_0: {
1030                            "name": "Room #0",
1031                            "initial": true,
1032                        },
1033                        room_id_1: {
1034                            "name": "Room #1",
1035                            "initial": true,
1036                        },
1037                        room_id_2: {
1038                            "name": "Room #2",
1039                            "initial": true,
1040                        },
1041                    }
1042                })))
1043                .mount_as_scoped(&server)
1044                .await;
1045
1046            let _ = stream.next().await.unwrap()?;
1047        }
1048
1049        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1050
1051        // Members aren't synced.
1052        // We need to make them synced, so that we can test that subscribing to a room
1053        // make members not synced. That's a desired feature.
1054        assert!(room0.are_members_synced().not());
1055
1056        {
1057            struct MemberMatcher(OwnedRoomId);
1058
1059            impl Match for MemberMatcher {
1060                fn matches(&self, request: &Request) -> bool {
1061                    request.url.path()
1062                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1063                        && request.method == Method::GET
1064                }
1065            }
1066
1067            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1068                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1069                    "chunk": [],
1070                })))
1071                .mount_as_scoped(&server)
1072                .await;
1073
1074            assert_matches!(room0.request_members().await, Ok(()));
1075        }
1076
1077        // Members are now synced! We can start subscribing and see how it goes.
1078        assert!(room0.are_members_synced());
1079
1080        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1081
1082        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1083        // now marked as not synced.
1084        assert!(room0.are_members_synced().not());
1085
1086        {
1087            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1088
1089            assert!(room_subscriptions.contains_key(room_id_0));
1090            assert!(room_subscriptions.contains_key(room_id_1));
1091            assert!(!room_subscriptions.contains_key(room_id_2));
1092        }
1093
1094        // Subscribing to the same room doesn't reset the member sync state.
1095
1096        {
1097            struct MemberMatcher(OwnedRoomId);
1098
1099            impl Match for MemberMatcher {
1100                fn matches(&self, request: &Request) -> bool {
1101                    request.url.path()
1102                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1103                        && request.method == Method::GET
1104                }
1105            }
1106
1107            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1108                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1109                    "chunk": [],
1110                })))
1111                .mount_as_scoped(&server)
1112                .await;
1113
1114            assert_matches!(room0.request_members().await, Ok(()));
1115        }
1116
1117        // Members are synced, good, good.
1118        assert!(room0.are_members_synced());
1119
1120        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1121
1122        // Members are still synced: because we have already subscribed to the
1123        // room, the members aren't marked as unsynced.
1124        assert!(room0.are_members_synced());
1125
1126        Ok(())
1127    }
1128
1129    #[async_test]
1130    async fn test_subscribe_unsubscribe_and_clear_and_subscribe_to_rooms() -> Result<()> {
1131        let (_server, sliding_sync) = new_sliding_sync(vec![
1132            SlidingSyncList::builder("foo")
1133                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1134        ])
1135        .await?;
1136
1137        let room_id_0 = room_id!("!r0:bar.org");
1138        let room_id_1 = room_id!("!r1:bar.org");
1139        let room_id_2 = room_id!("!r2:bar.org");
1140        let room_id_3 = room_id!("!r3:bar.org");
1141
1142        // Initially empty.
1143        {
1144            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1145
1146            assert!(room_subscriptions.is_empty());
1147        }
1148
1149        // Add 2 rooms.
1150        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1151
1152        {
1153            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1154
1155            assert_eq!(room_subscriptions.len(), 2);
1156            assert!(room_subscriptions.contains_key(room_id_0));
1157            assert!(room_subscriptions.contains_key(room_id_1));
1158        }
1159
1160        // Remove 1 room.
1161        sliding_sync.unsubscribe_to_rooms(&[room_id_0], false);
1162
1163        {
1164            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1165
1166            assert_eq!(room_subscriptions.len(), 1);
1167            assert!(room_subscriptions.contains_key(room_id_1));
1168        }
1169
1170        // Add 2 rooms, but one already exists.
1171        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1172
1173        {
1174            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1175
1176            assert_eq!(room_subscriptions.len(), 2);
1177            assert!(room_subscriptions.contains_key(room_id_0));
1178            assert!(room_subscriptions.contains_key(room_id_1));
1179        }
1180
1181        // Replace all rooms with 2 other rooms.
1182        sliding_sync.clear_and_subscribe_to_rooms(
1183            &[room_id_2, room_id_3],
1184            Default::default(),
1185            false,
1186        );
1187
1188        {
1189            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1190
1191            assert_eq!(room_subscriptions.len(), 2);
1192            assert!(room_subscriptions.contains_key(room_id_2));
1193            assert!(room_subscriptions.contains_key(room_id_3));
1194        }
1195
1196        Ok(())
1197    }
1198
1199    #[async_test]
1200    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1201        let (_server, sliding_sync) = new_sliding_sync(vec![
1202            SlidingSyncList::builder("foo")
1203                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1204        ])
1205        .await?;
1206
1207        let room_id_0 = room_id!("!r0:bar.org");
1208        let room_id_1 = room_id!("!r1:bar.org");
1209        let room_id_2 = room_id!("!r2:bar.org");
1210
1211        // Subscribe to two rooms.
1212        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1213
1214        {
1215            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1216
1217            assert!(room_subscriptions.contains_key(room_id_0));
1218            assert!(room_subscriptions.contains_key(room_id_1));
1219            assert!(room_subscriptions.contains_key(room_id_2).not());
1220        }
1221
1222        // Subscribe to one more room.
1223        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1224
1225        {
1226            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1227
1228            assert!(room_subscriptions.contains_key(room_id_0));
1229            assert!(room_subscriptions.contains_key(room_id_1));
1230            assert!(room_subscriptions.contains_key(room_id_2));
1231        }
1232
1233        // Suddenly, the session expires!
1234        sliding_sync.expire_session().await;
1235
1236        {
1237            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1238
1239            assert!(room_subscriptions.is_empty());
1240        }
1241
1242        // Subscribe to one room again.
1243        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1244
1245        {
1246            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1247
1248            assert!(room_subscriptions.contains_key(room_id_0).not());
1249            assert!(room_subscriptions.contains_key(room_id_1).not());
1250            assert!(room_subscriptions.contains_key(room_id_2));
1251        }
1252
1253        Ok(())
1254    }
1255
1256    #[async_test]
1257    async fn test_add_list() -> Result<()> {
1258        let (_server, sliding_sync) = new_sliding_sync(vec![
1259            SlidingSyncList::builder("foo")
1260                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1261        ])
1262        .await?;
1263
1264        let _stream = sliding_sync.sync();
1265        pin_mut!(_stream);
1266
1267        sliding_sync
1268            .add_list(
1269                SlidingSyncList::builder("bar")
1270                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1271            )
1272            .await?;
1273
1274        let lists = sliding_sync.inner.lists.read().await;
1275
1276        assert!(lists.contains_key("foo"));
1277        assert!(lists.contains_key("bar"));
1278
1279        // this test also ensures that Tokio is not panicking when calling `add_list`.
1280
1281        Ok(())
1282    }
1283
1284    #[cfg(feature = "e2e-encryption")]
1285    #[async_test]
1286    async fn test_extensions_to_device_since_is_set() {
1287        use matrix_sdk_base::crypto::store::types::Changes;
1288
1289        let client = logged_in_client(None).await;
1290        let sliding_sync = SlidingSyncBuilder::new("foo".to_owned(), client.clone())
1291            .unwrap()
1292            .with_to_device_extension(assign!(
1293                http::request::ToDevice::default(),
1294                {
1295                    enabled: Some(true),
1296                }
1297            ))
1298            .build()
1299            .await
1300            .unwrap();
1301
1302        // Test `SlidingSyncInner::extensions`.
1303        {
1304            let to_device = &sliding_sync.inner.extensions.to_device;
1305
1306            assert_eq!(to_device.enabled, Some(true));
1307            assert!(to_device.since.is_none());
1308        }
1309
1310        // Test `Request::extensions`.
1311        {
1312            let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1313
1314            let to_device = &request.extensions.to_device;
1315
1316            assert_eq!(to_device.enabled, Some(true));
1317            assert!(to_device.since.is_none());
1318        }
1319
1320        // Define a `since` token.
1321        let since_token = "depuis".to_owned();
1322
1323        {
1324            if let Some(olm_machine) = &*client.olm_machine().await {
1325                olm_machine
1326                    .store()
1327                    .save_changes(Changes {
1328                        next_batch_token: Some(since_token.clone()),
1329                        ..Default::default()
1330                    })
1331                    .await
1332                    .unwrap();
1333            } else {
1334                panic!("Where is the Olm machine?");
1335            }
1336        }
1337
1338        // Test `Request::extensions` again.
1339        {
1340            let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1341
1342            let to_device = &request.extensions.to_device;
1343
1344            assert_eq!(to_device.enabled, Some(true));
1345            assert_eq!(to_device.since, Some(since_token));
1346        }
1347    }
1348
1349    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1350    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1351    // `/key/query` requests must be sent. See the code to see the details.
1352    //
1353    // This test is asserting that.
1354    #[async_test]
1355    #[cfg(feature = "e2e-encryption")]
1356    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1357        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1358        use matrix_sdk_test::ruma_response_from_json;
1359        use ruma::user_id;
1360
1361        let server = MockServer::start().await;
1362        let client = logged_in_client(Some(server.uri())).await;
1363
1364        let alice = user_id!("@alice:localhost");
1365        let bob = user_id!("@bob:localhost");
1366        let me = user_id!("@example:localhost");
1367
1368        // Track and mark users are not dirty, so that we can check they are “dirty”
1369        // after that. Dirty here means that a `/key/query` must be sent.
1370        {
1371            let olm_machine = client.olm_machine().await;
1372            let olm_machine = olm_machine.as_ref().unwrap();
1373
1374            olm_machine.update_tracked_users([alice, bob]).await?;
1375
1376            // Assert requests.
1377            let outgoing_requests = olm_machine.outgoing_requests().await?;
1378
1379            assert_eq!(outgoing_requests.len(), 2);
1380            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1381            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1382
1383            // Fake responses.
1384            olm_machine
1385                .mark_request_as_sent(
1386                    outgoing_requests[0].request_id(),
1387                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1388                        "one_time_key_counts": {}
1389                    }))),
1390                )
1391                .await?;
1392
1393            olm_machine
1394                .mark_request_as_sent(
1395                    outgoing_requests[1].request_id(),
1396                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1397                        "device_keys": {
1398                            alice: {},
1399                            bob: {},
1400                        }
1401                    }))),
1402                )
1403                .await?;
1404
1405            // Once more.
1406            let outgoing_requests = olm_machine.outgoing_requests().await?;
1407
1408            assert_eq!(outgoing_requests.len(), 1);
1409            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1410
1411            olm_machine
1412                .mark_request_as_sent(
1413                    outgoing_requests[0].request_id(),
1414                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1415                        "device_keys": {
1416                            me: {},
1417                        }
1418                    }))),
1419                )
1420                .await?;
1421
1422            // No more.
1423            let outgoing_requests = olm_machine.outgoing_requests().await?;
1424
1425            assert!(outgoing_requests.is_empty());
1426        }
1427
1428        let sync = client
1429            .sliding_sync("test-slidingsync")?
1430            .add_list(SlidingSyncList::builder("new_list"))
1431            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1432            .build()
1433            .await?;
1434
1435        // First request: no `pos`.
1436        let (_request, _, _) = sync.generate_sync_request().await?;
1437
1438        // Now, tracked users must be dirty.
1439        {
1440            let olm_machine = client.olm_machine().await;
1441            let olm_machine = olm_machine.as_ref().unwrap();
1442
1443            // Assert requests.
1444            let outgoing_requests = olm_machine.outgoing_requests().await?;
1445
1446            assert_eq!(outgoing_requests.len(), 1);
1447            assert_matches!(
1448                outgoing_requests[0].request(),
1449                AnyOutgoingRequest::KeysQuery(request) => {
1450                    assert!(request.device_keys.contains_key(alice));
1451                    assert!(request.device_keys.contains_key(bob));
1452                    assert!(request.device_keys.contains_key(me));
1453                }
1454            );
1455
1456            // Fake responses.
1457            olm_machine
1458                .mark_request_as_sent(
1459                    outgoing_requests[0].request_id(),
1460                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1461                        "device_keys": {
1462                            alice: {},
1463                            bob: {},
1464                            me: {},
1465                        }
1466                    }))),
1467                )
1468                .await?;
1469        }
1470
1471        // Second request: with a `pos` this time.
1472        sync.set_pos("chocolat".to_owned()).await;
1473
1474        let (_request, _, _) = sync.generate_sync_request().await?;
1475
1476        // Tracked users are not marked as dirty.
1477        {
1478            let olm_machine = client.olm_machine().await;
1479            let olm_machine = olm_machine.as_ref().unwrap();
1480
1481            // Assert requests.
1482            let outgoing_requests = olm_machine.outgoing_requests().await?;
1483
1484            assert!(outgoing_requests.is_empty());
1485        }
1486
1487        Ok(())
1488    }
1489
1490    #[cfg(feature = "e2e-encryption")]
1491    #[async_test]
1492    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1493        let server = MockServer::start().await;
1494
1495        #[derive(Deserialize)]
1496        struct PartialRequest {
1497            txn_id: Option<String>,
1498        }
1499
1500        let server_pos = Arc::new(Mutex::new(0));
1501        let _mock_guard = Mock::given(SlidingSyncMatcher)
1502            .respond_with(move |request: &Request| {
1503                // Repeat the txn_id in the response, if set.
1504                let request: PartialRequest = request.body_json().unwrap();
1505                let pos = {
1506                    let mut pos = server_pos.lock().unwrap();
1507                    let prev = *pos;
1508                    *pos += 1;
1509                    prev
1510                };
1511
1512                ResponseTemplate::new(200).set_body_json(json!({
1513                    "txn_id": request.txn_id,
1514                    "pos": pos.to_string(),
1515                }))
1516            })
1517            .mount_as_scoped(&server)
1518            .await;
1519
1520        let client = logged_in_client(Some(server.uri())).await;
1521
1522        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1523
1524        // `pos` is `None` to start with.
1525        {
1526            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1527
1528            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1529            assert!(request.pos.is_none());
1530        }
1531
1532        let sync = sliding_sync.sync();
1533        pin_mut!(sync);
1534
1535        // Sync goes well, and then the position is saved both into the internal memory
1536        // and the database.
1537        let next = sync.next().await;
1538        assert_matches!(next, Some(Ok(_update_summary)));
1539
1540        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1541
1542        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1543            .await?
1544            .expect("must have restored fields");
1545
1546        // While it has been saved into the database, it's not necessarily going to be
1547        // used later!
1548        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1549
1550        // Now, even if we mess with the position stored in the database, the sliding
1551        // sync instance isn't configured to reload the stream position from the
1552        // database, so it won't be changed.
1553        {
1554            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1555
1556            let mut position_guard = other_sync.inner.position.lock().await;
1557            position_guard.pos = Some("yolo".to_owned());
1558
1559            other_sync.cache_to_storage(&position_guard).await?;
1560        }
1561
1562        // It's still 0, not "yolo".
1563        {
1564            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1565            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1566            assert_eq!(request.pos.as_deref(), Some("0"));
1567        }
1568
1569        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1570        // asked to.
1571        {
1572            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1573            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1574        }
1575
1576        Ok(())
1577    }
1578
1579    #[cfg(feature = "e2e-encryption")]
1580    #[async_test]
1581    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1582        let server = MockServer::start().await;
1583
1584        #[derive(Deserialize)]
1585        struct PartialRequest {
1586            txn_id: Option<String>,
1587        }
1588
1589        let server_pos = Arc::new(Mutex::new(0));
1590        let _mock_guard = Mock::given(SlidingSyncMatcher)
1591            .respond_with(move |request: &Request| {
1592                // Repeat the txn_id in the response, if set.
1593                let request: PartialRequest = request.body_json().unwrap();
1594                let pos = {
1595                    let mut pos = server_pos.lock().unwrap();
1596                    let prev = *pos;
1597                    *pos += 1;
1598                    prev
1599                };
1600
1601                ResponseTemplate::new(200).set_body_json(json!({
1602                    "txn_id": request.txn_id,
1603                    "pos": pos.to_string(),
1604                }))
1605            })
1606            .mount_as_scoped(&server)
1607            .await;
1608
1609        let client = logged_in_client(Some(server.uri())).await;
1610
1611        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1612
1613        // `pos` is `None` to start with.
1614        {
1615            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1616
1617            assert!(request.pos.is_none());
1618            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1619        }
1620
1621        let sync = sliding_sync.sync();
1622        pin_mut!(sync);
1623
1624        // Sync goes well, and then the position is saved both into the internal memory
1625        // and the database.
1626        let next = sync.next().await;
1627        assert_matches!(next, Some(Ok(_update_summary)));
1628
1629        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1630
1631        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1632            .await?
1633            .expect("must have restored fields");
1634
1635        // While it has been saved into the database, it's not necessarily going to be
1636        // used later!
1637        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1638
1639        // Another process modifies the stream position under our feet...
1640        {
1641            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1642
1643            let mut position_guard = other_sync.inner.position.lock().await;
1644            position_guard.pos = Some("42".to_owned());
1645
1646            other_sync.cache_to_storage(&position_guard).await?;
1647        }
1648
1649        // It's alright, the next request will load it from the database.
1650        {
1651            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1652            assert_eq!(request.pos.as_deref(), Some("42"));
1653            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1654        }
1655
1656        // Recreating a sliding sync with the same ID will reload it too.
1657        {
1658            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1659            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1660
1661            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1662            assert_eq!(request.pos.as_deref(), Some("42"));
1663        }
1664
1665        // Invalidating the session will remove the in-memory value AND the database
1666        // value.
1667        sliding_sync.expire_session().await;
1668
1669        {
1670            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1671
1672            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1673            assert!(request.pos.is_none());
1674        }
1675
1676        // And new sliding syncs with the same ID won't find it either.
1677        {
1678            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1679            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1680
1681            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1682            assert!(request.pos.is_none());
1683        }
1684
1685        Ok(())
1686    }
1687
1688    #[async_test]
1689    async fn test_stop_sync_loop() -> Result<()> {
1690        let (_server, sliding_sync) = new_sliding_sync(vec![
1691            SlidingSyncList::builder("foo")
1692                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1693        ])
1694        .await?;
1695
1696        // Start the sync loop.
1697        let stream = sliding_sync.sync();
1698        pin_mut!(stream);
1699
1700        // The sync loop is actually running.
1701        assert!(stream.next().await.is_some());
1702
1703        // Stop the sync loop.
1704        sliding_sync.stop_sync()?;
1705
1706        // The sync loop is actually stopped.
1707        assert!(stream.next().await.is_none());
1708
1709        // Start a new sync loop.
1710        let stream = sliding_sync.sync();
1711        pin_mut!(stream);
1712
1713        // The sync loop is actually running.
1714        assert!(stream.next().await.is_some());
1715
1716        Ok(())
1717    }
1718
1719    #[async_test]
1720    async fn test_process_read_receipts() -> Result<()> {
1721        let room = owned_room_id!("!pony:example.org");
1722
1723        let server = MockServer::start().await;
1724        let client = logged_in_client(Some(server.uri())).await;
1725        client.event_cache().subscribe().unwrap();
1726
1727        let sliding_sync = client
1728            .sliding_sync("test")?
1729            .with_receipt_extension(
1730                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
1731            )
1732            .add_list(
1733                SlidingSyncList::builder("all")
1734                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1735            )
1736            .build()
1737            .await?;
1738
1739        // Initial state.
1740        {
1741            let server_response = assign!(http::Response::new("0".to_owned()), {
1742                rooms: BTreeMap::from([(
1743                    room.clone(),
1744                    http::response::Room::default(),
1745                )])
1746            });
1747
1748            let _summary = {
1749                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1750                sliding_sync
1751                    .handle_response(
1752                        server_response.clone(),
1753                        &mut pos_guard,
1754                        RequestedRequiredStates::default(),
1755                    )
1756                    .await?
1757            };
1758        }
1759
1760        let server_response = assign!(http::Response::new("1".to_owned()), {
1761            extensions: assign!(http::response::Extensions::default(), {
1762                receipts: assign!(http::response::Receipts::default(), {
1763                    rooms: BTreeMap::from([
1764                        (
1765                            room.clone(),
1766                            Raw::from_json_string(
1767                                json!({
1768                                    "room_id": room,
1769                                    "type": "m.receipt",
1770                                    "content": {
1771                                        "$event:bar.org": {
1772                                            "m.read": {
1773                                                client.user_id().unwrap(): {
1774                                                    "ts": 1436451550,
1775                                                }
1776                                            }
1777                                        }
1778                                    }
1779                                })
1780                                .to_string(),
1781                            ).unwrap()
1782                        )
1783                    ])
1784                })
1785            })
1786        });
1787
1788        let summary = {
1789            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1790            sliding_sync
1791                .handle_response(
1792                    server_response.clone(),
1793                    &mut pos_guard,
1794                    RequestedRequiredStates::default(),
1795                )
1796                .await?
1797        };
1798
1799        assert!(summary.rooms.contains(&room));
1800
1801        Ok(())
1802    }
1803
1804    #[async_test]
1805    async fn test_process_marked_unread_room_account_data() -> Result<()> {
1806        let room_id = owned_room_id!("!unicorn:example.org");
1807
1808        let server = MockServer::start().await;
1809        let client = logged_in_client(Some(server.uri())).await;
1810
1811        // Setup sliding sync with with one room and one list
1812
1813        let sliding_sync = client
1814            .sliding_sync("test")?
1815            .with_account_data_extension(
1816                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1817            )
1818            .add_list(
1819                SlidingSyncList::builder("all")
1820                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1821            )
1822            .build()
1823            .await?;
1824
1825        // Initial state.
1826        {
1827            let server_response = assign!(http::Response::new("0".to_owned()), {
1828                rooms: BTreeMap::from([(
1829                    room_id.clone(),
1830                    http::response::Room::default(),
1831                )])
1832            });
1833
1834            let _summary = {
1835                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1836                sliding_sync
1837                    .handle_response(
1838                        server_response.clone(),
1839                        &mut pos_guard,
1840                        RequestedRequiredStates::default(),
1841                    )
1842                    .await?
1843            };
1844        }
1845
1846        // Simulate a response that only changes the marked unread state of the room to
1847        // true
1848
1849        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
1850
1851        let update_summary = {
1852            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1853            sliding_sync
1854                .handle_response(
1855                    server_response.clone(),
1856                    &mut pos_guard,
1857                    RequestedRequiredStates::default(),
1858                )
1859                .await?
1860        };
1861
1862        // Check that the list list and entry received the update
1863
1864        assert!(update_summary.rooms.contains(&room_id));
1865
1866        let room = client.get_room(&room_id).unwrap();
1867
1868        // Check the actual room data, this powers RoomInfo
1869
1870        assert!(room.is_marked_unread());
1871
1872        // Change it back to false and check if it updates
1873
1874        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
1875
1876        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1877        sliding_sync
1878            .handle_response(
1879                server_response.clone(),
1880                &mut pos_guard,
1881                RequestedRequiredStates::default(),
1882            )
1883            .await?;
1884
1885        let room = client.get_room(&room_id).unwrap();
1886
1887        assert!(!room.is_marked_unread());
1888
1889        Ok(())
1890    }
1891
1892    fn make_mark_unread_response(
1893        response_number: &str,
1894        room_id: OwnedRoomId,
1895        unread: bool,
1896        add_rooms_section: bool,
1897    ) -> http::Response {
1898        let rooms = if add_rooms_section {
1899            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
1900        } else {
1901            BTreeMap::new()
1902        };
1903
1904        let extensions = assign!(http::response::Extensions::default(), {
1905            account_data: assign!(http::response::AccountData::default(), {
1906                rooms: BTreeMap::from([
1907                    (
1908                        room_id,
1909                        vec![
1910                            Raw::from_json_string(
1911                                json!({
1912                                    "content": {
1913                                        "unread": unread
1914                                    },
1915                                    "type": "m.marked_unread"
1916                                })
1917                                .to_string(),
1918                            ).unwrap()
1919                        ]
1920                    )
1921                ])
1922            })
1923        });
1924
1925        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
1926    }
1927
1928    #[async_test]
1929    async fn test_process_rooms_account_data() -> Result<()> {
1930        let room = owned_room_id!("!pony:example.org");
1931
1932        let server = MockServer::start().await;
1933        let client = logged_in_client(Some(server.uri())).await;
1934
1935        let sliding_sync = client
1936            .sliding_sync("test")?
1937            .with_account_data_extension(
1938                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1939            )
1940            .add_list(
1941                SlidingSyncList::builder("all")
1942                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1943            )
1944            .build()
1945            .await?;
1946
1947        // Initial state.
1948        {
1949            let server_response = assign!(http::Response::new("0".to_owned()), {
1950                rooms: BTreeMap::from([(
1951                    room.clone(),
1952                    http::response::Room::default(),
1953                )])
1954            });
1955
1956            let _summary = {
1957                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1958                sliding_sync
1959                    .handle_response(
1960                        server_response.clone(),
1961                        &mut pos_guard,
1962                        RequestedRequiredStates::default(),
1963                    )
1964                    .await?
1965            };
1966        }
1967
1968        let server_response = assign!(http::Response::new("1".to_owned()), {
1969            extensions: assign!(http::response::Extensions::default(), {
1970                account_data: assign!(http::response::AccountData::default(), {
1971                    rooms: BTreeMap::from([
1972                        (
1973                            room.clone(),
1974                            vec![
1975                                Raw::from_json_string(
1976                                    json!({
1977                                        "content": {
1978                                            "tags": {
1979                                                "u.work": {
1980                                                    "order": 0.9
1981                                                }
1982                                            }
1983                                        },
1984                                        "type": "m.tag"
1985                                    })
1986                                    .to_string(),
1987                                ).unwrap()
1988                            ]
1989                        )
1990                    ])
1991                })
1992            })
1993        });
1994        let summary = {
1995            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1996            sliding_sync
1997                .handle_response(
1998                    server_response.clone(),
1999                    &mut pos_guard,
2000                    RequestedRequiredStates::default(),
2001                )
2002                .await?
2003        };
2004
2005        assert!(summary.rooms.contains(&room));
2006
2007        Ok(())
2008    }
2009
2010    #[async_test]
2011    #[cfg(feature = "e2e-encryption")]
2012    async fn test_process_only_encryption_events() -> Result<()> {
2013        use ruma::OneTimeKeyAlgorithm;
2014
2015        let room = owned_room_id!("!croissant:example.org");
2016
2017        let server = MockServer::start().await;
2018        let client = logged_in_client(Some(server.uri())).await;
2019
2020        let server_response = assign!(http::Response::new("0".to_owned()), {
2021            rooms: BTreeMap::from([(
2022                room.clone(),
2023                assign!(http::response::Room::default(), {
2024                    name: Some("Croissants lovers".to_owned()),
2025                    timeline: Vec::new(),
2026                }),
2027            )]),
2028
2029            extensions: assign!(http::response::Extensions::default(), {
2030                e2ee: assign!(http::response::E2EE::default(), {
2031                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2032                }),
2033                to_device: Some(assign!(http::response::ToDevice::default(), {
2034                    next_batch: "to-device-token".to_owned(),
2035                })),
2036            })
2037        });
2038
2039        // Don't process non-encryption events if the sliding sync is configured for
2040        // encryption only.
2041
2042        let sliding_sync = client
2043            .sliding_sync("test")?
2044            .with_to_device_extension(
2045                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2046            )
2047            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2048            .build()
2049            .await?;
2050
2051        {
2052            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2053
2054            sliding_sync
2055                .handle_response(
2056                    server_response.clone(),
2057                    &mut position_guard,
2058                    RequestedRequiredStates::default(),
2059                )
2060                .await?;
2061        }
2062
2063        // E2EE has been properly handled.
2064        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2065        assert_eq!(uploaded_key_count, 42);
2066
2067        {
2068            let olm_machine = &*client.olm_machine_for_testing().await;
2069            assert_eq!(
2070                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2071                Some("to-device-token")
2072            );
2073        }
2074
2075        // Room events haven't.
2076        assert!(client.get_room(&room).is_none());
2077
2078        // Conversely, only process room lists events if the sliding sync was configured
2079        // as so.
2080        let client = logged_in_client(Some(server.uri())).await;
2081
2082        let sliding_sync = client
2083            .sliding_sync("test")?
2084            .add_list(SlidingSyncList::builder("thelist"))
2085            .build()
2086            .await?;
2087
2088        {
2089            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2090
2091            sliding_sync
2092                .handle_response(
2093                    server_response.clone(),
2094                    &mut position_guard,
2095                    RequestedRequiredStates::default(),
2096                )
2097                .await?;
2098        }
2099
2100        // E2EE response has been ignored.
2101        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2102        assert_eq!(uploaded_key_count, 0);
2103
2104        {
2105            let olm_machine = &*client.olm_machine_for_testing().await;
2106            assert_eq!(
2107                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2108                None
2109            );
2110        }
2111
2112        // The room is now known.
2113        assert!(client.get_room(&room).is_some());
2114
2115        // And it's also possible to set up both.
2116        let client = logged_in_client(Some(server.uri())).await;
2117
2118        let sliding_sync = client
2119            .sliding_sync("test")?
2120            .add_list(SlidingSyncList::builder("thelist"))
2121            .with_to_device_extension(
2122                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2123            )
2124            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2125            .build()
2126            .await?;
2127
2128        {
2129            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2130
2131            sliding_sync
2132                .handle_response(
2133                    server_response.clone(),
2134                    &mut position_guard,
2135                    RequestedRequiredStates::default(),
2136                )
2137                .await?;
2138        }
2139
2140        // E2EE has been properly handled.
2141        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2142        assert_eq!(uploaded_key_count, 42);
2143
2144        {
2145            let olm_machine = &*client.olm_machine_for_testing().await;
2146            assert_eq!(
2147                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2148                Some("to-device-token")
2149            );
2150        }
2151
2152        // The room is now known.
2153        assert!(client.get_room(&room).is_some());
2154
2155        Ok(())
2156    }
2157
2158    #[async_test]
2159    async fn test_lock_multiple_requests() -> Result<()> {
2160        let server = MockServer::start().await;
2161        let client = logged_in_client(Some(server.uri())).await;
2162
2163        let pos = Arc::new(Mutex::new(0));
2164        let _mock_guard = Mock::given(SlidingSyncMatcher)
2165            .respond_with(move |_: &Request| {
2166                let mut pos = pos.lock().unwrap();
2167                *pos += 1;
2168                ResponseTemplate::new(200).set_body_json(json!({
2169                    "pos": pos.to_string(),
2170                    "lists": {},
2171                    "rooms": {}
2172                }))
2173            })
2174            .mount_as_scoped(&server)
2175            .await;
2176
2177        let sliding_sync = client
2178            .sliding_sync("test")?
2179            .with_to_device_extension(
2180                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2181            )
2182            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2183            .build()
2184            .await?;
2185
2186        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2187        // test would never terminate.
2188        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2189
2190        for result in requests.await {
2191            result?;
2192        }
2193
2194        Ok(())
2195    }
2196
2197    #[async_test]
2198    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2199        let server = MockServer::start().await;
2200        let client = logged_in_client(Some(server.uri())).await;
2201
2202        let pos = Arc::new(Mutex::new(0));
2203        let _mock_guard = Mock::given(SlidingSyncMatcher)
2204            .respond_with(move |_: &Request| {
2205                let mut pos = pos.lock().unwrap();
2206                *pos += 1;
2207                // Respond slowly enough that we can skip one iteration.
2208                ResponseTemplate::new(200)
2209                    .set_body_json(json!({
2210                        "pos": pos.to_string(),
2211                        "lists": {},
2212                        "rooms": {}
2213                    }))
2214                    .set_delay(Duration::from_secs(2))
2215            })
2216            .mount_as_scoped(&server)
2217            .await;
2218
2219        let sliding_sync =
2220            client
2221                .sliding_sync("test")?
2222                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2223                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2224                ))
2225                .add_list(
2226                    SlidingSyncList::builder("another-list")
2227                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2228                )
2229                .build()
2230                .await?;
2231
2232        let stream = sliding_sync.sync();
2233        pin_mut!(stream);
2234
2235        let cloned_sync = sliding_sync.clone();
2236        spawn(async move {
2237            tokio::time::sleep(Duration::from_millis(100)).await;
2238
2239            cloned_sync
2240                .on_list("another-list", |list| {
2241                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2242                    ready(())
2243                })
2244                .await;
2245        });
2246
2247        assert_matches!(stream.next().await, Some(Ok(_)));
2248
2249        sliding_sync.stop_sync().unwrap();
2250
2251        assert_matches!(stream.next().await, None);
2252
2253        let mut num_requests = 0;
2254
2255        for request in server.received_requests().await.unwrap() {
2256            if !SlidingSyncMatcher.matches(&request) {
2257                continue;
2258            }
2259
2260            let another_list_ranges = if num_requests == 0 {
2261                // First request
2262                json!([[0, 10]])
2263            } else {
2264                // Second request
2265                json!([[10, 20]])
2266            };
2267
2268            num_requests += 1;
2269            assert!(num_requests <= 2, "more than one request hit the server");
2270
2271            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2272
2273            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2274                &json_value,
2275                &json!({
2276                    "conn_id": "test",
2277                    "lists": {
2278                        "room-list": {
2279                            "ranges": [[0, 9]],
2280                            "required_state": [
2281                                ["m.room.encryption", ""],
2282                                ["m.room.tombstone", ""]
2283                            ],
2284                        },
2285                        "another-list": {
2286                            "ranges": another_list_ranges,
2287                            "required_state": [
2288                                ["m.room.encryption", ""],
2289                                ["m.room.tombstone", ""]
2290                            ],
2291                        },
2292                    }
2293                }),
2294                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2295            ) {
2296                dbg!(json_value);
2297                panic!("json differ: {err}");
2298            }
2299        }
2300
2301        assert_eq!(num_requests, 2);
2302
2303        Ok(())
2304    }
2305
2306    #[async_test]
2307    async fn test_timeout_zero_list() -> Result<()> {
2308        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2309
2310        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2311
2312        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2313        // on new update to pop.
2314        assert!(request.timeout.is_some());
2315
2316        Ok(())
2317    }
2318
2319    #[async_test]
2320    async fn test_timeout_one_list() -> Result<()> {
2321        let (_server, sliding_sync) = new_sliding_sync(vec![
2322            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2323        ])
2324        .await?;
2325
2326        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2327
2328        // The list does not require a timeout.
2329        assert!(request.timeout.is_none());
2330
2331        // Simulate a response.
2332        {
2333            let server_response = assign!(http::Response::new("0".to_owned()), {
2334                lists: BTreeMap::from([(
2335                    "foo".to_owned(),
2336                    assign!(http::response::List::default(), {
2337                        count: uint!(7),
2338                    })
2339                 )])
2340            });
2341
2342            let _summary = {
2343                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2344                sliding_sync
2345                    .handle_response(
2346                        server_response.clone(),
2347                        &mut pos_guard,
2348                        RequestedRequiredStates::default(),
2349                    )
2350                    .await?
2351            };
2352        }
2353
2354        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2355
2356        // The list is now fully loaded, so it requires a timeout.
2357        assert!(request.timeout.is_some());
2358
2359        Ok(())
2360    }
2361
2362    #[async_test]
2363    async fn test_timeout_three_lists() -> Result<()> {
2364        let (_server, sliding_sync) = new_sliding_sync(vec![
2365            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2366            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2367            SlidingSyncList::builder("baz")
2368                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2369        ])
2370        .await?;
2371
2372        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2373
2374        // Two lists don't require a timeout.
2375        assert!(request.timeout.is_none());
2376
2377        // Simulate a response.
2378        {
2379            let server_response = assign!(http::Response::new("0".to_owned()), {
2380                lists: BTreeMap::from([(
2381                    "foo".to_owned(),
2382                    assign!(http::response::List::default(), {
2383                        count: uint!(7),
2384                    })
2385                 )])
2386            });
2387
2388            let _summary = {
2389                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2390                sliding_sync
2391                    .handle_response(
2392                        server_response.clone(),
2393                        &mut pos_guard,
2394                        RequestedRequiredStates::default(),
2395                    )
2396                    .await?
2397            };
2398        }
2399
2400        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2401
2402        // One don't require a timeout.
2403        assert!(request.timeout.is_none());
2404
2405        // Simulate a response.
2406        {
2407            let server_response = assign!(http::Response::new("1".to_owned()), {
2408                lists: BTreeMap::from([(
2409                    "bar".to_owned(),
2410                    assign!(http::response::List::default(), {
2411                        count: uint!(7),
2412                    })
2413                 )])
2414            });
2415
2416            let _summary = {
2417                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2418                sliding_sync
2419                    .handle_response(
2420                        server_response.clone(),
2421                        &mut pos_guard,
2422                        RequestedRequiredStates::default(),
2423                    )
2424                    .await?
2425            };
2426        }
2427
2428        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2429
2430        // All lists require a timeout.
2431        assert!(request.timeout.is_some());
2432
2433        Ok(())
2434    }
2435
2436    #[async_test]
2437    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2438        let server = MockServer::start().await;
2439        let client = logged_in_client(Some(server.uri())).await;
2440
2441        let _mock_guard = Mock::given(SlidingSyncMatcher)
2442            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2443                "pos": "0",
2444                "lists": {},
2445                "rooms": {}
2446            })))
2447            .mount_as_scoped(&server)
2448            .await;
2449
2450        let sliding_sync = client
2451            .sliding_sync("test")?
2452            .with_to_device_extension(
2453                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2454            )
2455            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2456            .build()
2457            .await?;
2458
2459        let sliding_sync = Arc::new(sliding_sync);
2460
2461        // Create the listener and perform a sync request
2462        let sync_beat_listener = client.inner.sync_beat.listen();
2463        sliding_sync.sync_once().await?;
2464
2465        // The sync beat listener should be notified shortly after
2466        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2467        Ok(())
2468    }
2469
2470    #[async_test]
2471    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2472        let server = MockServer::start().await;
2473        let client = logged_in_client(Some(server.uri())).await;
2474
2475        let _mock_guard = Mock::given(SlidingSyncMatcher)
2476            .respond_with(ResponseTemplate::new(404))
2477            .mount_as_scoped(&server)
2478            .await;
2479
2480        let sliding_sync = client
2481            .sliding_sync("test")?
2482            .with_to_device_extension(
2483                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2484            )
2485            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2486            .build()
2487            .await?;
2488
2489        let sliding_sync = Arc::new(sliding_sync);
2490
2491        // Create the listener and perform a sync request
2492        let sync_beat_listener = client.inner.sync_beat.listen();
2493        let sync_result = sliding_sync.sync_once().await;
2494        assert!(sync_result.is_err());
2495
2496        // The sync beat listener won't be notified in this case
2497        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2498
2499        Ok(())
2500    }
2501
2502    #[async_test]
2503    async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2504        let server = MatrixMockServer::new().await;
2505        let client = server.client_builder().build().await;
2506        let room_id = room_id!("!mu5hr00m:example.org");
2507
2508        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2509            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2510                "pos": "0",
2511                "lists": {},
2512                "extensions": {
2513                    "account_data": {
2514                        "global": [
2515                            {
2516                                "type": "m.direct",
2517                                "content": {
2518                                    "@de4dlockh0lmes:example.org": [
2519                                        "!mu5hr00m:example.org"
2520                                    ]
2521                                }
2522                            }
2523                        ]
2524                    }
2525                },
2526                "rooms": {
2527                    room_id: {
2528                        "name": "Mario Bros Fanbase Room",
2529                        "initial": true,
2530                    },
2531                }
2532            })))
2533            .mount_as_scoped(server.server())
2534            .await;
2535
2536        let f = EventFactory::new().room(room_id);
2537
2538        Mock::given(method("GET"))
2539            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2540            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2541                "chunk": [
2542                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2543                ]
2544            })))
2545            .mount(server.server())
2546            .await;
2547
2548        let (tx, rx) = tokio::sync::oneshot::channel();
2549
2550        let tx = Arc::new(Mutex::new(Some(tx)));
2551        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2552            // Try to run a /members query while in a event handler.
2553            let members =
2554                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2555            assert_eq!(members.len(), 1);
2556            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2557        });
2558
2559        let sliding_sync = client
2560            .sliding_sync("test")?
2561            .add_list(SlidingSyncList::builder("thelist"))
2562            .with_account_data_extension(
2563                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2564            )
2565            .build()
2566            .await?;
2567
2568        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2569            .await
2570            .expect("Sync did not complete in time")
2571            .expect("Sync failed");
2572
2573        // Wait for the event handler to complete.
2574        tokio::time::timeout(Duration::from_secs(5), rx)
2575            .await
2576            .expect("Event handler did not complete in time")
2577            .expect("Event handler failed");
2578
2579        Ok(())
2580    }
2581}