Skip to main content

matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23
24use std::{
25    collections::{BTreeMap, btree_map::Entry},
26    fmt::Debug,
27    future::Future,
28    sync::{Arc, RwLock as StdRwLock, RwLockWriteGuard as StdRwLockWriteGuard},
29    time::Duration,
30};
31
32use async_stream::stream;
33pub use client::{Version, VersionBuilder};
34use futures_core::stream::Stream;
35use matrix_sdk_base::RequestedRequiredStates;
36#[cfg(feature = "e2e-encryption")]
37use matrix_sdk_common::executor::JoinHandleExt as _;
38use matrix_sdk_common::{executor::spawn, timer};
39use ruma::{
40    OwnedRoomId, RoomId,
41    api::{client::sync::sync_events::v5 as http, error::ErrorKind},
42    assign,
43};
44use tokio::{
45    select,
46    sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
47};
48use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
49
50pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
51use self::{cache::restore_sliding_sync_state, client::SlidingSyncResponseProcessor};
52use crate::{Client, Result, config::RequestConfig};
53
54/// The Sliding Sync instance.
55///
56/// It is OK to clone this type as much as you need: cloning it is cheap.
57#[derive(Clone, Debug)]
58pub struct SlidingSync {
59    /// The Sliding Sync data.
60    inner: Arc<SlidingSyncInner>,
61}
62
63#[derive(Debug)]
64pub(super) struct SlidingSyncInner {
65    /// A unique identifier for this instance of sliding sync.
66    ///
67    /// Used to distinguish different connections to sliding sync.
68    id: String,
69
70    /// The HTTP Matrix client.
71    client: Client,
72
73    /// Long-polling timeout that appears in sliding sync request.
74    poll_timeout: Duration,
75
76    /// Extra duration for the sliding sync request to timeout. This is added to
77    /// the [`Self::poll_timeout`].
78    network_timeout: Duration,
79
80    /// The storage key to keep this cache at and load it from.
81    storage_key: String,
82
83    /// Should this sliding sync instance try to restore its sync position
84    /// from the database?
85    ///
86    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
87    /// keep it even so, to avoid sparkling cfg statements everywhere
88    /// throughout this file.
89    share_pos: bool,
90
91    /// Position markers.
92    ///
93    /// The `pos` marker represents a progression when exchanging requests and
94    /// responses with the server: the server acknowledges the request by
95    /// responding with a new `pos`. If the client sends two non-necessarily
96    /// consecutive requests with the same `pos`, the server has to reply with
97    /// the same identical response.
98    ///
99    /// `position` is behind a mutex so that a new request starts after the
100    /// previous request trip has fully ended (successfully or not). This
101    /// mechanism exists to wait for the response to be handled and to see the
102    /// `position` being updated, before sending a new request.
103    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
104
105    /// The lists of this Sliding Sync instance.
106    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
107
108    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
109    /// but one wants to receive updates.
110    room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, http::request::RoomSubscription>>,
111
112    /// The intended state of the extensions being supplied to sliding /sync
113    /// calls.
114    extensions: http::request::Extensions,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127        cache::store_sliding_sync_state(self, position).await
128    }
129
130    /// Create a new [`SlidingSyncBuilder`].
131    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132        SlidingSyncBuilder::new(id, client)
133    }
134
135    /// Add subscriptions to many rooms.
136    ///
137    /// If the associated `Room`s exist, they will be marked as members are
138    /// missing, so that it ensures to re-fetch all members.
139    ///
140    /// A subscription to an already subscribed room is ignored.
141    pub fn subscribe_to_rooms(
142        &self,
143        room_ids: &[&RoomId],
144        settings: Option<http::request::RoomSubscription>,
145        cancel_in_flight_request: bool,
146    ) {
147        if subscribe_to_rooms(
148            self.inner.room_subscriptions.write().unwrap(),
149            &self.inner.client,
150            room_ids,
151            settings,
152            cancel_in_flight_request,
153        ) {
154            self.inner.internal_channel_send_if_possible(
155                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
156            );
157        }
158    }
159
160    /// Remove subscriptions to many rooms.
161    pub fn unsubscribe_to_rooms(&self, room_ids: &[&RoomId], cancel_in_flight_request: bool) {
162        let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
163        let mut skip_over_current_sync_loop_iteration = false;
164
165        for room_id in room_ids {
166            if room_subscriptions.remove(*room_id).is_some() {
167                skip_over_current_sync_loop_iteration = true;
168            }
169        }
170
171        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172            self.inner.internal_channel_send_if_possible(
173                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174            );
175        }
176    }
177
178    /// Replace all subscriptions to rooms by other ones.
179    ///
180    /// If the associated `Room`s exist, they will be marked as members are
181    /// missing, so that it ensures to re-fetch all members.
182    pub fn clear_and_subscribe_to_rooms(
183        &self,
184        room_ids: &[&RoomId],
185        settings: Option<http::request::RoomSubscription>,
186        cancel_in_flight_request: bool,
187    ) {
188        let mut room_subscriptions = self.inner.room_subscriptions.write().unwrap();
189        room_subscriptions.clear();
190
191        if subscribe_to_rooms(
192            room_subscriptions,
193            &self.inner.client,
194            room_ids,
195            settings,
196            cancel_in_flight_request,
197        ) {
198            self.inner.internal_channel_send_if_possible(
199                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
200            );
201        }
202    }
203
204    /// Find a list by its name, and do something on it if it exists.
205    pub async fn on_list<Function, FunctionOutput, R>(
206        &self,
207        list_name: &str,
208        function: Function,
209    ) -> Option<R>
210    where
211        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
212        FunctionOutput: Future<Output = R>,
213    {
214        let lists = self.inner.lists.read().await;
215
216        match lists.get(list_name) {
217            Some(list) => Some(function(list).await),
218            None => None,
219        }
220    }
221
222    /// Add the list to the list of lists.
223    ///
224    /// As lists need to have a unique `.name`, if a list with the same name
225    /// is found the new list will replace the old one and the return it or
226    /// `None`.
227    pub async fn add_list(
228        &self,
229        list_builder: SlidingSyncListBuilder,
230    ) -> Result<Option<SlidingSyncList>> {
231        let list = list_builder.build(self.inner.internal_channel.clone());
232
233        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
234
235        self.inner.internal_channel_send_if_possible(
236            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
237        );
238
239        Ok(old_list)
240    }
241
242    /// Add a list that will be cached and reloaded from the cache.
243    ///
244    /// This will raise an error if a storage key was not set, or if there
245    /// was a I/O error reading from the cache.
246    ///
247    /// The rest of the semantics is the same as [`Self::add_list`].
248    pub async fn add_cached_list(
249        &self,
250        mut list_builder: SlidingSyncListBuilder,
251    ) -> Result<Option<SlidingSyncList>> {
252        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
253
254        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
255
256        self.add_list(list_builder).await
257    }
258
259    /// Handle the HTTP response.
260    #[instrument(skip_all)]
261    async fn handle_response(
262        &self,
263        mut sliding_sync_response: http::Response,
264        position: &mut SlidingSyncPositionMarkers,
265        requested_required_states: RequestedRequiredStates,
266    ) -> Result<UpdateSummary, crate::Error> {
267        let pos = Some(sliding_sync_response.pos.clone());
268
269        let must_process_rooms_response = self.must_process_rooms_response().await;
270
271        trace!(yes = must_process_rooms_response, "Must process rooms response?");
272
273        // Transform a Sliding Sync Response to a `SyncResponse`.
274        //
275        // We may not need the `sync_response` in the future (once `SyncResponse` will
276        // move to Sliding Sync, i.e. to `http::Response`), but processing the
277        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
278        // happens here.
279
280        let sync_response = {
281            let _timer = timer!("response processor");
282
283            let response_processor = {
284                // Take the lock to synchronise accesses to the state store, to avoid concurrent
285                // sliding syncs overwriting each other's room infos.
286                let state_store_guard = {
287                    let _timer = timer!("acquiring the `state_store_lock`");
288
289                    self.inner.client.base_client().state_store_lock().lock().await
290                };
291
292                let mut response_processor =
293                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
294
295                // Process thread subscriptions if they're available.
296                //
297                // It's important to do this *before* handling the room responses, so that
298                // notifications can be properly generated based on the thread subscriptions,
299                // for the events in threads we've subscribed to.
300                if self.is_thread_subscriptions_enabled() {
301                    response_processor
302                        .handle_thread_subscriptions(
303                            position.pos.as_deref(),
304                            std::mem::take(
305                                &mut sliding_sync_response.extensions.thread_subscriptions,
306                            ),
307                        )
308                        .await?;
309                }
310
311                #[cfg(feature = "e2e-encryption")]
312                if self.is_e2ee_enabled() {
313                    response_processor
314                        .handle_encryption(&sliding_sync_response.extensions, &state_store_guard)
315                        .await?
316                }
317
318                // Only handle the room's subsection of the response, if this sliding sync was
319                // configured to do so.
320                if must_process_rooms_response {
321                    response_processor
322                        .handle_room_response(
323                            &sliding_sync_response,
324                            &requested_required_states,
325                            &state_store_guard,
326                        )
327                        .await?;
328                }
329
330                response_processor
331            };
332
333            // Release the lock before calling event handlers
334            response_processor.process_and_take_response().await?
335        };
336
337        debug!("Sliding Sync response has been handled by the client");
338        trace!(?sync_response);
339
340        let update_summary = {
341            // Update the rooms.
342            let updated_rooms = {
343                let mut updated_rooms = Vec::with_capacity(
344                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
345                );
346
347                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
348
349                // There might be other rooms that were only mentioned in the sliding sync
350                // extensions part of the response, and thus would result in rooms present in
351                // the `sync_response.joined`. Mark them as updated too.
352                //
353                // Since we've removed rooms that were in the room subsection from
354                // `sync_response.rooms.joined`, the remaining ones aren't already present in
355                // `updated_rooms` and wouldn't cause any duplicates.
356                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
357
358                updated_rooms
359            };
360
361            // Update the lists.
362            let updated_lists = {
363                debug!(
364                    lists = ?sliding_sync_response.lists,
365                    "Update lists"
366                );
367
368                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
369                let mut lists = self.inner.lists.write().await;
370
371                // Iterate on known lists, not on lists in the response. Rooms may have been
372                // updated that were not involved in any list update.
373                for (name, list) in lists.iter_mut() {
374                    if let Some(updates) = sliding_sync_response.lists.get(name) {
375                        let maximum_number_of_rooms: u32 =
376                            updates.count.try_into().expect("failed to convert `count` to `u32`");
377
378                        if list.update(Some(maximum_number_of_rooms))? {
379                            updated_lists.push(name.clone());
380                        }
381                    } else if list.update(None)? {
382                        updated_lists.push(name.clone());
383                    }
384                }
385
386                // Report about unknown lists.
387                for name in sliding_sync_response.lists.keys() {
388                    if !lists.contains_key(name) {
389                        error!("Response for list `{name}` - unknown to us; skipping");
390                    }
391                }
392
393                updated_lists
394            };
395
396            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
397        };
398
399        // Everything went well, we can update the position markers.
400        //
401        // Save the new position markers.
402        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
403
404        position.pos = pos;
405
406        Ok(update_summary)
407    }
408
409    async fn generate_sync_request(
410        &self,
411    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
412        // Collect requests for lists.
413        let mut requests_lists = BTreeMap::new();
414
415        let timeout = {
416            let lists = self.inner.lists.read().await;
417
418            // Start at `Default` in case there is zero list.
419            let mut timeout = PollTimeout::Default;
420
421            for (name, list) in lists.iter() {
422                requests_lists.insert(name.clone(), list.next_request()?);
423                timeout = timeout.min(list.requires_timeout());
424            }
425
426            timeout
427        };
428
429        // Collect the `pos`.
430        //
431        // Wait on the `position` mutex to be available. It means no request nor
432        // response is running. The `position` mutex is released whether the response
433        // has been fully handled successfully, in this case the `pos` is updated, or
434        // the response handling has failed, in this case the `pos` hasn't been updated
435        // and the same `pos` will be used for this new request.
436        let mut position_guard = {
437            debug!("Waiting to acquire the `position` lock");
438
439            let _timer = timer!("acquiring the `position` lock");
440
441            self.inner.position.clone().lock_owned().await
442        };
443
444        debug!(pos = ?position_guard.pos, "Got a position");
445
446        let to_device_enabled = self.inner.extensions.to_device.enabled == Some(true);
447
448        let restored_fields = if self.inner.share_pos || to_device_enabled {
449            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
450        } else {
451            None
452        };
453
454        // Update pos: either the one restored from the database, if any and the sliding
455        // sync was configured so, or read it from the memory cache.
456        let pos = if self.inner.share_pos {
457            if let Some(fields) = &restored_fields {
458                // Override the memory one with the database one, for consistency.
459                if fields.pos != position_guard.pos {
460                    info!(
461                        "Pos from previous request ('{:?}') was different from \
462                         pos in database ('{:?}').",
463                        position_guard.pos, fields.pos
464                    );
465                    position_guard.pos = fields.pos.clone();
466                }
467                fields.pos.clone()
468            } else {
469                position_guard.pos.clone()
470            }
471        } else {
472            position_guard.pos.clone()
473        };
474
475        // When the client sends a request with no `pos`, MSC4186 returns no device
476        // lists updates, as it only returns changes since the provided `pos`
477        // (which is `null` in this case); this is in line with sync v2.
478        //
479        // Therefore, with MSC4186, the device list cache must be marked as to be
480        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
481        // device lists updates that happened between the previous request and the new
482        // “initial” request.
483        #[cfg(feature = "e2e-encryption")]
484        if pos.is_none() && self.is_e2ee_enabled() {
485            info!("Marking all tracked users as dirty");
486
487            let olm_machine = self.inner.client.olm_machine().await;
488            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
489            olm_machine.mark_all_tracked_users_as_dirty().await?;
490        }
491
492        // Configure the timeout.
493        //
494        // The `timeout` query is necessary when all lists require it. Please see
495        // [`SlidingSyncList::requires_timeout`].
496        let timeout = match timeout {
497            PollTimeout::None => None,
498            PollTimeout::Some(timeout) => Some(Duration::from_secs(timeout.into())),
499            PollTimeout::Default => Some(self.inner.poll_timeout),
500        };
501
502        Span::current()
503            .record("pos", &pos)
504            .record("timeout", timeout.map(|duration| duration.as_millis()));
505
506        let mut request = assign!(http::Request::new(), {
507            conn_id: Some(self.inner.id.clone()),
508            pos,
509            timeout,
510            lists: requests_lists,
511        });
512
513        // Add room subscriptions.
514        request.room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
515
516        // Add extensions.
517        request.extensions = self.inner.extensions.clone();
518
519        // Override the to-device token if the extension is enabled.
520        if to_device_enabled {
521            request.extensions.to_device.since =
522                restored_fields.and_then(|fields| fields.to_device_token);
523        }
524
525        Ok((
526            // The request itself.
527            request,
528            // Configure long-polling. We need some time for the long-poll itself,
529            // and extra time for the network delays.
530            RequestConfig::default()
531                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
532                .retry_limit(3),
533            position_guard,
534        ))
535    }
536
537    /// Send a sliding sync request.
538    ///
539    /// This method contains the sending logic.
540    async fn send_sync_request(
541        &self,
542        request: http::Request,
543        request_config: RequestConfig,
544        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
545    ) -> Result<UpdateSummary> {
546        debug!("Sending request");
547
548        // Prepare the request.
549        let requested_required_states = RequestedRequiredStates::from(&request);
550        let request = self.inner.client.send(request).with_request_config(request_config);
551
552        // Send the request and get a response with end-to-end encryption support.
553        //
554        // Sending the `/sync` request out when end-to-end encryption is enabled means
555        // that we need to also send out any outgoing e2ee related request out
556        // coming from the `OlmMachine::outgoing_requests()` method.
557
558        #[cfg(feature = "e2e-encryption")]
559        let response = {
560            if self.is_e2ee_enabled() {
561                // Here, we need to run 2 things:
562                //
563                // 1. Send the sliding sync request and get a response,
564                // 2. Send the E2EE requests.
565                //
566                // We don't want to use a `join` or `try_join` because we want to fail if and
567                // only if sending the sliding sync request fails. Failing to send the E2EE
568                // requests should just result in a log.
569                //
570                // We also want to give the priority to sliding sync request. E2EE requests are
571                // sent concurrently to the sliding sync request, but the priority is on waiting
572                // a sliding sync response.
573                //
574                // If sending sliding sync request fails, the sending of E2EE requests must be
575                // aborted as soon as possible.
576
577                let client = self.inner.client.clone();
578                let e2ee_uploads = spawn(
579                    async move {
580                        if let Err(error) = client.send_outgoing_requests().await {
581                            error!(?error, "Error while sending outgoing E2EE requests");
582                        }
583                    }
584                    .instrument(Span::current()),
585                )
586                // Ensure that the task is not running in detached mode. It is aborted when it's
587                // dropped.
588                .abort_on_drop();
589
590                // Wait on the sliding sync request success or failure early.
591                let response = request.await?;
592
593                // At this point, if `request` has been resolved successfully, we wait on
594                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
595                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
596                // been dropped, so aborted.
597                e2ee_uploads.await.map_err(|error| Error::JoinError {
598                    task_description: "e2ee_uploads".to_owned(),
599                    error,
600                })?;
601
602                response
603            } else {
604                request.await?
605            }
606        };
607
608        // Send the request and get a response _without_ end-to-end encryption support.
609        #[cfg(not(feature = "e2e-encryption"))]
610        let response = request.await?;
611
612        debug!("Received response");
613
614        // At this point, the request has been sent, and a response has been received.
615        //
616        // We must ensure the handling of the response cannot be stopped/
617        // cancelled. It must be done entirely, otherwise we can have
618        // corrupted/incomplete states for Sliding Sync and other parts of
619        // the code.
620        //
621        // That's why we are running the handling of the response in a spawned
622        // future that cannot be cancelled by anything.
623        let this = self.clone();
624
625        // Spawn a new future to ensure that the code inside this future cannot be
626        // cancelled if this method is cancelled.
627        let future = async move {
628            debug!("Start handling response");
629
630            // In case the task running this future is detached, we must
631            // ensure responses are handled one at a time. At this point we still own
632            // `position_guard`, so we're fine.
633
634            // Handle the response.
635            let updates = this
636                .handle_response(response, &mut position_guard, requested_required_states)
637                .await?;
638
639            this.cache_to_storage(&position_guard).await?;
640
641            // Release the position guard lock.
642            // It means that other responses can be generated and then handled later.
643            drop(position_guard);
644
645            debug!("Done handling response");
646
647            Ok(updates)
648        };
649
650        spawn(future.instrument(Span::current())).await.map_err(|error| Error::JoinError {
651            task_description: "handle_response".to_owned(),
652            error,
653        })?
654    }
655
656    /// Is the e2ee extension enabled for this sliding sync instance?
657    #[cfg(feature = "e2e-encryption")]
658    fn is_e2ee_enabled(&self) -> bool {
659        self.inner.extensions.e2ee.enabled == Some(true)
660    }
661
662    /// Is the thread subscriptions extension enabled for this sliding sync
663    /// instance?
664    fn is_thread_subscriptions_enabled(&self) -> bool {
665        self.inner.extensions.thread_subscriptions.enabled == Some(true)
666    }
667
668    #[cfg(not(feature = "e2e-encryption"))]
669    fn is_e2ee_enabled(&self) -> bool {
670        false
671    }
672
673    /// Should we process the room's subpart of a response?
674    async fn must_process_rooms_response(&self) -> bool {
675        // We consider that we must, if there's any room subscription or there's any
676        // list.
677        !self.inner.room_subscriptions.read().unwrap().is_empty()
678            || !self.inner.lists.read().await.is_empty()
679    }
680
681    /// Send a single sliding sync request, and returns the response summary.
682    ///
683    /// Public for testing purposes only.
684    #[doc(hidden)]
685    #[instrument(skip_all, fields(conn_id = self.inner.id, pos, timeout))]
686    pub async fn sync_once(&self) -> Result<UpdateSummary> {
687        let (request, request_config, position_guard) = self.generate_sync_request().await?;
688
689        // Send the request.
690        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
691
692        // Notify a new sync was received.
693        self.inner.client.inner.sync_beat.notify(usize::MAX);
694
695        Ok(summaries)
696    }
697
698    /// Create a _new_ Sliding Sync sync loop.
699    ///
700    /// This method returns a `Stream`, which will send requests and will handle
701    /// responses automatically. Lists and rooms are updated automatically.
702    ///
703    /// This function returns `Ok(…)` if everything went well, otherwise it will
704    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
705    /// termination.
706    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
707    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
708    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
709        debug!("Starting sync stream");
710
711        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
712
713        stream! {
714            loop {
715                debug!("Sync stream is running");
716
717                select! {
718                    biased;
719
720                    internal_message = internal_channel_receiver.recv() => {
721                        use SlidingSyncInternalMessage::*;
722
723                        debug!(?internal_message, "Sync stream has received an internal message");
724
725                        match internal_message {
726                            Err(_) | Ok(SyncLoopStop) => {
727                                break;
728                            }
729
730                            Ok(SyncLoopSkipOverCurrentIteration) => {
731                                continue;
732                            }
733                        }
734                    }
735
736                    update_summary = self.sync_once() => {
737                        match update_summary {
738                            Ok(updates) => {
739                                yield Ok(updates);
740                            }
741
742                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
743                            Err(error) => {
744                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
745                                    // The Sliding Sync session has expired. Let's reset `pos`.
746                                    self.expire_session().await;
747                                }
748
749                                yield Err(error);
750
751                                // Terminates the loop, and terminates the stream.
752                                break;
753                            }
754                        }
755                    }
756                }
757            }
758
759            debug!("Sync stream has exited.");
760        }
761    }
762
763    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
764    ///
765    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
766    /// enough to “stop” it, but depending of how this `Stream` is used, it
767    /// might not be obvious to drop it immediately (thinking of using this API
768    /// over FFI; the foreign-language might not be able to drop a value
769    /// immediately). Thus, calling this method will ensure that the sync loop
770    /// stops gracefully and as soon as it returns.
771    pub fn stop_sync(&self) -> Result<()> {
772        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
773    }
774
775    /// Expire the current Sliding Sync session on the client-side.
776    ///
777    /// Expiring a Sliding Sync session means: resetting `pos`.
778    ///
779    /// This should only be used when it's clear that this session was about to
780    /// expire anyways, and should be used only in very specific cases (e.g.
781    /// multiple sliding syncs being run in parallel, and one of them has
782    /// expired).
783    ///
784    /// This method **MUST** be called when the sync loop is stopped.
785    #[doc(hidden)]
786    pub async fn expire_session(&self) {
787        info!("Session expired; resetting `pos`");
788
789        {
790            let lists = self.inner.lists.read().await;
791
792            for list in lists.values() {
793                // Invalidate in-memory data that would be persisted on disk.
794                list.set_maximum_number_of_rooms(None);
795            }
796        }
797
798        // Remove the cached sliding sync state as well.
799        {
800            let mut position = self.inner.position.lock().await;
801
802            // Invalidate in memory.
803            position.pos = None;
804
805            // Propagate to disk.
806            // Note: this propagates both the sliding sync state and the cached lists'
807            // state to disk.
808            if let Err(err) = self.cache_to_storage(&position).await {
809                warn!("Failed to invalidate cached sliding sync state: {err}");
810            }
811        }
812
813        {
814            // Clear all room subscriptions: we don't want to resend all room subscriptions
815            // when the session will restart.
816            self.inner.room_subscriptions.write().unwrap().clear();
817        }
818    }
819}
820
821/// Private implementation for [`SlidingSync::subscribe_to_rooms`] and
822/// [`SlidingSync::clear_and_subscribe_to_rooms`].
823fn subscribe_to_rooms(
824    mut room_subscriptions: StdRwLockWriteGuard<
825        '_,
826        BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
827    >,
828    client: &Client,
829    room_ids: &[&RoomId],
830    settings: Option<http::request::RoomSubscription>,
831    cancel_in_flight_request: bool,
832) -> bool {
833    let settings = settings.unwrap_or_default();
834    let mut skip_over_current_sync_loop_iteration = false;
835
836    for room_id in room_ids {
837        if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
838            if let Some(room) = client.get_room(room_id) {
839                room.mark_members_missing();
840            }
841
842            entry.insert(settings.clone());
843
844            skip_over_current_sync_loop_iteration = true;
845        }
846    }
847
848    cancel_in_flight_request && skip_over_current_sync_loop_iteration
849}
850
851impl SlidingSyncInner {
852    /// Send a message over the internal channel.
853    #[instrument]
854    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
855        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
856    }
857
858    /// Send a message over the internal channel if there is a receiver, i.e. if
859    /// the sync loop is running.
860    #[instrument]
861    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
862        // If there is no receiver, the send will fail, but that's OK here.
863        let _ = self.internal_channel.send(message);
864    }
865}
866
867#[derive(Copy, Clone, Debug, PartialEq)]
868enum SlidingSyncInternalMessage {
869    /// Instruct the sync loop to stop.
870    SyncLoopStop,
871
872    /// Instruct the sync loop to skip over any remaining work in its iteration,
873    /// and to jump to the next iteration.
874    SyncLoopSkipOverCurrentIteration,
875}
876
877#[cfg(any(test, feature = "testing"))]
878impl SlidingSync {
879    /// Set a new value for `pos`.
880    pub async fn set_pos(&self, new_pos: String) {
881        let mut position_lock = self.inner.position.lock().await;
882        position_lock.pos = Some(new_pos);
883    }
884}
885
886#[derive(Clone, Debug)]
887pub(super) struct SlidingSyncPositionMarkers {
888    /// An ephemeral position in the current stream, as received from the
889    /// previous `/sync` response, or `None` for the first request.
890    pos: Option<String>,
891}
892
893/// A summary of the updates received after a sync (like in
894/// [`SlidingSync::sync`]).
895#[derive(Debug, Clone)]
896pub struct UpdateSummary {
897    /// The names of the lists that have seen an update.
898    pub lists: Vec<String>,
899    /// The rooms that have seen updates
900    pub rooms: Vec<OwnedRoomId>,
901}
902
903/// Define what kind of poll timeout [`SlidingSync`] must use.
904///
905/// [The spec says about `timeout`][spec]:
906///
907/// > How long to wait for new events […] If omitted the response is always
908/// > returned immediately, even if there are no changes.
909///
910/// [spec]: https://github.com/matrix-org/matrix-spec-proposals/blob/erikj/sss/proposals/4186-simplified-sliding-sync.md#top-level
911#[derive(Debug)]
912pub enum PollTimeout {
913    /// No `timeout` must be present.
914    None,
915
916    /// A `timeout=X` must be present, where `X` is in seconds and
917    /// represents how long to wait for new events.
918    Some(u32),
919
920    /// A `timeout=X` must be present, where `X` is the default value passed to
921    /// [`SlidingSyncBuilder::poll_timeout`].
922    Default,
923}
924
925impl PollTimeout {
926    /// Computes the smallest `PollTimeout` between two of them.
927    ///
928    /// The rules are the following:
929    ///
930    /// * `None` < `Some`,
931    /// * `Some(x) < Some(y)` if and only if `x < y`,
932    /// * `Some < Default`.
933    ///
934    /// The `Default` value is unknown at this step but is assumed to be the
935    /// largest.
936    fn min(self, left: Self) -> Self {
937        match (self, left) {
938            (Self::None, _) => Self::None,
939
940            (Self::Some(_), Self::None) => Self::None,
941            (Self::Some(right), Self::Some(left)) => Self::Some(right.min(left)),
942            (Self::Some(right), Self::Default) => Self::Some(right),
943
944            (Self::Default, Self::None) => Self::None,
945            (Self::Default, Self::Some(left)) => Self::Some(left),
946            (Self::Default, Self::Default) => Self::Default,
947        }
948    }
949}
950
951#[cfg(all(test, not(target_family = "wasm")))]
952#[allow(clippy::dbg_macro)]
953mod tests {
954    use std::{
955        collections::BTreeMap,
956        future::ready,
957        ops::Not,
958        sync::{Arc, Mutex},
959        time::Duration,
960    };
961
962    use assert_matches::assert_matches;
963    use event_listener::Listener;
964    use futures_util::{StreamExt, future::join_all, pin_mut};
965    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
966    use matrix_sdk_common::executor::spawn;
967    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
968    use ruma::{
969        OwnedRoomId, assign,
970        events::{direct::DirectEvent, room::member::MembershipState},
971        owned_room_id, room_id,
972        serde::Raw,
973        uint,
974    };
975    use serde::Deserialize;
976    use serde_json::json;
977    use wiremock::{
978        Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
979    };
980
981    use super::{
982        SlidingSync, SlidingSyncBuilder, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
983        cache::restore_sliding_sync_state, http,
984    };
985    use crate::{
986        Client, Result,
987        test_utils::{logged_in_client, mocks::MatrixMockServer},
988    };
989
990    #[derive(Copy, Clone)]
991    struct SlidingSyncMatcher;
992
993    impl Match for SlidingSyncMatcher {
994        fn matches(&self, request: &Request) -> bool {
995            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
996                && request.method == Method::POST
997        }
998    }
999
1000    async fn new_sliding_sync(
1001        lists: Vec<SlidingSyncListBuilder>,
1002    ) -> Result<(MockServer, SlidingSync)> {
1003        let server = MockServer::start().await;
1004        let client = logged_in_client(Some(server.uri())).await;
1005
1006        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1007
1008        for list in lists {
1009            sliding_sync_builder = sliding_sync_builder.add_list(list);
1010        }
1011
1012        let sliding_sync = sliding_sync_builder.build().await?;
1013
1014        Ok((server, sliding_sync))
1015    }
1016
1017    #[async_test]
1018    async fn test_subscribe_to_rooms() -> Result<()> {
1019        let (server, sliding_sync) = new_sliding_sync(vec![
1020            SlidingSyncList::builder("foo")
1021                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1022        ])
1023        .await?;
1024
1025        let stream = sliding_sync.sync();
1026        pin_mut!(stream);
1027
1028        let room_id_0 = room_id!("!r0:bar.org");
1029        let room_id_1 = room_id!("!r1:bar.org");
1030        let room_id_2 = room_id!("!r2:bar.org");
1031
1032        {
1033            let _mock_guard = Mock::given(SlidingSyncMatcher)
1034                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1035                    "pos": "1",
1036                    "lists": {},
1037                    "rooms": {
1038                        room_id_0: {
1039                            "name": "Room #0",
1040                            "initial": true,
1041                        },
1042                        room_id_1: {
1043                            "name": "Room #1",
1044                            "initial": true,
1045                        },
1046                        room_id_2: {
1047                            "name": "Room #2",
1048                            "initial": true,
1049                        },
1050                    }
1051                })))
1052                .mount_as_scoped(&server)
1053                .await;
1054
1055            let _ = stream.next().await.unwrap()?;
1056        }
1057
1058        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1059
1060        // Members aren't synced.
1061        // We need to make them synced, so that we can test that subscribing to a room
1062        // make members not synced. That's a desired feature.
1063        assert!(room0.are_members_synced().not());
1064
1065        {
1066            struct MemberMatcher(OwnedRoomId);
1067
1068            impl Match for MemberMatcher {
1069                fn matches(&self, request: &Request) -> bool {
1070                    request.url.path()
1071                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1072                        && request.method == Method::GET
1073                }
1074            }
1075
1076            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1077                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1078                    "chunk": [],
1079                })))
1080                .mount_as_scoped(&server)
1081                .await;
1082
1083            assert_matches!(room0.request_members().await, Ok(()));
1084        }
1085
1086        // Members are now synced! We can start subscribing and see how it goes.
1087        assert!(room0.are_members_synced());
1088
1089        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1090
1091        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1092        // now marked as not synced.
1093        assert!(room0.are_members_synced().not());
1094
1095        {
1096            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1097
1098            assert!(room_subscriptions.contains_key(room_id_0));
1099            assert!(room_subscriptions.contains_key(room_id_1));
1100            assert!(!room_subscriptions.contains_key(room_id_2));
1101        }
1102
1103        // Subscribing to the same room doesn't reset the member sync state.
1104
1105        {
1106            struct MemberMatcher(OwnedRoomId);
1107
1108            impl Match for MemberMatcher {
1109                fn matches(&self, request: &Request) -> bool {
1110                    request.url.path()
1111                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1112                        && request.method == Method::GET
1113                }
1114            }
1115
1116            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1117                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1118                    "chunk": [],
1119                })))
1120                .mount_as_scoped(&server)
1121                .await;
1122
1123            assert_matches!(room0.request_members().await, Ok(()));
1124        }
1125
1126        // Members are synced, good, good.
1127        assert!(room0.are_members_synced());
1128
1129        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1130
1131        // Members are still synced: because we have already subscribed to the
1132        // room, the members aren't marked as unsynced.
1133        assert!(room0.are_members_synced());
1134
1135        Ok(())
1136    }
1137
1138    #[async_test]
1139    async fn test_subscribe_unsubscribe_and_clear_and_subscribe_to_rooms() -> Result<()> {
1140        let (_server, sliding_sync) = new_sliding_sync(vec![
1141            SlidingSyncList::builder("foo")
1142                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1143        ])
1144        .await?;
1145
1146        let room_id_0 = room_id!("!r0:bar.org");
1147        let room_id_1 = room_id!("!r1:bar.org");
1148        let room_id_2 = room_id!("!r2:bar.org");
1149        let room_id_3 = room_id!("!r3:bar.org");
1150
1151        // Initially empty.
1152        {
1153            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1154
1155            assert!(room_subscriptions.is_empty());
1156        }
1157
1158        // Add 2 rooms.
1159        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1160
1161        {
1162            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1163
1164            assert_eq!(room_subscriptions.len(), 2);
1165            assert!(room_subscriptions.contains_key(room_id_0));
1166            assert!(room_subscriptions.contains_key(room_id_1));
1167        }
1168
1169        // Remove 1 room.
1170        sliding_sync.unsubscribe_to_rooms(&[room_id_0], false);
1171
1172        {
1173            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1174
1175            assert_eq!(room_subscriptions.len(), 1);
1176            assert!(room_subscriptions.contains_key(room_id_1));
1177        }
1178
1179        // Add 2 rooms, but one already exists.
1180        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], Default::default(), false);
1181
1182        {
1183            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1184
1185            assert_eq!(room_subscriptions.len(), 2);
1186            assert!(room_subscriptions.contains_key(room_id_0));
1187            assert!(room_subscriptions.contains_key(room_id_1));
1188        }
1189
1190        // Replace all rooms with 2 other rooms.
1191        sliding_sync.clear_and_subscribe_to_rooms(
1192            &[room_id_2, room_id_3],
1193            Default::default(),
1194            false,
1195        );
1196
1197        {
1198            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1199
1200            assert_eq!(room_subscriptions.len(), 2);
1201            assert!(room_subscriptions.contains_key(room_id_2));
1202            assert!(room_subscriptions.contains_key(room_id_3));
1203        }
1204
1205        Ok(())
1206    }
1207
1208    #[async_test]
1209    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1210        let (_server, sliding_sync) = new_sliding_sync(vec![
1211            SlidingSyncList::builder("foo")
1212                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1213        ])
1214        .await?;
1215
1216        let room_id_0 = room_id!("!r0:bar.org");
1217        let room_id_1 = room_id!("!r1:bar.org");
1218        let room_id_2 = room_id!("!r2:bar.org");
1219
1220        // Subscribe to two rooms.
1221        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1222
1223        {
1224            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1225
1226            assert!(room_subscriptions.contains_key(room_id_0));
1227            assert!(room_subscriptions.contains_key(room_id_1));
1228            assert!(room_subscriptions.contains_key(room_id_2).not());
1229        }
1230
1231        // Subscribe to one more room.
1232        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1233
1234        {
1235            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1236
1237            assert!(room_subscriptions.contains_key(room_id_0));
1238            assert!(room_subscriptions.contains_key(room_id_1));
1239            assert!(room_subscriptions.contains_key(room_id_2));
1240        }
1241
1242        // Suddenly, the session expires!
1243        sliding_sync.expire_session().await;
1244
1245        {
1246            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1247
1248            assert!(room_subscriptions.is_empty());
1249        }
1250
1251        // Subscribe to one room again.
1252        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1253
1254        {
1255            let room_subscriptions = sliding_sync.inner.room_subscriptions.read().unwrap();
1256
1257            assert!(room_subscriptions.contains_key(room_id_0).not());
1258            assert!(room_subscriptions.contains_key(room_id_1).not());
1259            assert!(room_subscriptions.contains_key(room_id_2));
1260        }
1261
1262        Ok(())
1263    }
1264
1265    #[async_test]
1266    async fn test_add_list() -> Result<()> {
1267        let (_server, sliding_sync) = new_sliding_sync(vec![
1268            SlidingSyncList::builder("foo")
1269                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1270        ])
1271        .await?;
1272
1273        let _stream = sliding_sync.sync();
1274        pin_mut!(_stream);
1275
1276        sliding_sync
1277            .add_list(
1278                SlidingSyncList::builder("bar")
1279                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1280            )
1281            .await?;
1282
1283        let lists = sliding_sync.inner.lists.read().await;
1284
1285        assert!(lists.contains_key("foo"));
1286        assert!(lists.contains_key("bar"));
1287
1288        // this test also ensures that Tokio is not panicking when calling `add_list`.
1289
1290        Ok(())
1291    }
1292
1293    #[cfg(feature = "e2e-encryption")]
1294    #[async_test]
1295    async fn test_extensions_to_device_since_is_set() {
1296        use matrix_sdk_base::crypto::store::types::Changes;
1297
1298        let client = logged_in_client(None).await;
1299        let sliding_sync = SlidingSyncBuilder::new("foo".to_owned(), client.clone())
1300            .unwrap()
1301            .with_to_device_extension(assign!(
1302                http::request::ToDevice::default(),
1303                {
1304                    enabled: Some(true),
1305                }
1306            ))
1307            .build()
1308            .await
1309            .unwrap();
1310
1311        // Test `SlidingSyncInner::extensions`.
1312        {
1313            let to_device = &sliding_sync.inner.extensions.to_device;
1314
1315            assert_eq!(to_device.enabled, Some(true));
1316            assert!(to_device.since.is_none());
1317        }
1318
1319        // Test `Request::extensions`.
1320        {
1321            let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1322
1323            let to_device = &request.extensions.to_device;
1324
1325            assert_eq!(to_device.enabled, Some(true));
1326            assert!(to_device.since.is_none());
1327        }
1328
1329        // Define a `since` token.
1330        let since_token = "depuis".to_owned();
1331
1332        {
1333            if let Some(olm_machine) = &*client.olm_machine().await {
1334                olm_machine
1335                    .store()
1336                    .save_changes(Changes {
1337                        next_batch_token: Some(since_token.clone()),
1338                        ..Default::default()
1339                    })
1340                    .await
1341                    .unwrap();
1342            } else {
1343                panic!("Where is the Olm machine?");
1344            }
1345        }
1346
1347        // Test `Request::extensions` again.
1348        {
1349            let (request, _, _) = sliding_sync.generate_sync_request().await.unwrap();
1350
1351            let to_device = &request.extensions.to_device;
1352
1353            assert_eq!(to_device.enabled, Some(true));
1354            assert_eq!(to_device.since, Some(since_token));
1355        }
1356    }
1357
1358    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1359    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1360    // `/key/query` requests must be sent. See the code to see the details.
1361    //
1362    // This test is asserting that.
1363    #[async_test]
1364    #[cfg(feature = "e2e-encryption")]
1365    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1366        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1367        use matrix_sdk_test::ruma_response_from_json;
1368        use ruma::user_id;
1369
1370        let server = MockServer::start().await;
1371        let client = logged_in_client(Some(server.uri())).await;
1372
1373        let alice = user_id!("@alice:localhost");
1374        let bob = user_id!("@bob:localhost");
1375        let me = user_id!("@example:localhost");
1376
1377        // Track and mark users are not dirty, so that we can check they are “dirty”
1378        // after that. Dirty here means that a `/key/query` must be sent.
1379        {
1380            let olm_machine = client.olm_machine().await;
1381            let olm_machine = olm_machine.as_ref().unwrap();
1382
1383            olm_machine.update_tracked_users([alice, bob]).await?;
1384
1385            // Assert requests.
1386            let outgoing_requests = olm_machine.outgoing_requests().await?;
1387
1388            assert_eq!(outgoing_requests.len(), 2);
1389            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1390            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1391
1392            // Fake responses.
1393            olm_machine
1394                .mark_request_as_sent(
1395                    outgoing_requests[0].request_id(),
1396                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1397                        "one_time_key_counts": {}
1398                    }))),
1399                )
1400                .await?;
1401
1402            olm_machine
1403                .mark_request_as_sent(
1404                    outgoing_requests[1].request_id(),
1405                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1406                        "device_keys": {
1407                            alice: {},
1408                            bob: {},
1409                        }
1410                    }))),
1411                )
1412                .await?;
1413
1414            // Once more.
1415            let outgoing_requests = olm_machine.outgoing_requests().await?;
1416
1417            assert_eq!(outgoing_requests.len(), 1);
1418            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1419
1420            olm_machine
1421                .mark_request_as_sent(
1422                    outgoing_requests[0].request_id(),
1423                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1424                        "device_keys": {
1425                            me: {},
1426                        }
1427                    }))),
1428                )
1429                .await?;
1430
1431            // No more.
1432            let outgoing_requests = olm_machine.outgoing_requests().await?;
1433
1434            assert!(outgoing_requests.is_empty());
1435        }
1436
1437        let sync = client
1438            .sliding_sync("test-slidingsync")?
1439            .add_list(SlidingSyncList::builder("new_list"))
1440            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1441            .build()
1442            .await?;
1443
1444        // First request: no `pos`.
1445        let (_request, _, _) = sync.generate_sync_request().await?;
1446
1447        // Now, tracked users must be dirty.
1448        {
1449            let olm_machine = client.olm_machine().await;
1450            let olm_machine = olm_machine.as_ref().unwrap();
1451
1452            // Assert requests.
1453            let outgoing_requests = olm_machine.outgoing_requests().await?;
1454
1455            assert_eq!(outgoing_requests.len(), 1);
1456            assert_matches!(
1457                outgoing_requests[0].request(),
1458                AnyOutgoingRequest::KeysQuery(request) => {
1459                    assert!(request.device_keys.contains_key(alice));
1460                    assert!(request.device_keys.contains_key(bob));
1461                    assert!(request.device_keys.contains_key(me));
1462                }
1463            );
1464
1465            // Fake responses.
1466            olm_machine
1467                .mark_request_as_sent(
1468                    outgoing_requests[0].request_id(),
1469                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1470                        "device_keys": {
1471                            alice: {},
1472                            bob: {},
1473                            me: {},
1474                        }
1475                    }))),
1476                )
1477                .await?;
1478        }
1479
1480        // Second request: with a `pos` this time.
1481        sync.set_pos("chocolat".to_owned()).await;
1482
1483        let (_request, _, _) = sync.generate_sync_request().await?;
1484
1485        // Tracked users are not marked as dirty.
1486        {
1487            let olm_machine = client.olm_machine().await;
1488            let olm_machine = olm_machine.as_ref().unwrap();
1489
1490            // Assert requests.
1491            let outgoing_requests = olm_machine.outgoing_requests().await?;
1492
1493            assert!(outgoing_requests.is_empty());
1494        }
1495
1496        Ok(())
1497    }
1498
1499    #[cfg(feature = "e2e-encryption")]
1500    #[async_test]
1501    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1502        let server = MockServer::start().await;
1503
1504        #[derive(Deserialize)]
1505        struct PartialRequest {
1506            txn_id: Option<String>,
1507        }
1508
1509        let server_pos = Arc::new(Mutex::new(0));
1510        let _mock_guard = Mock::given(SlidingSyncMatcher)
1511            .respond_with(move |request: &Request| {
1512                // Repeat the txn_id in the response, if set.
1513                let request: PartialRequest = request.body_json().unwrap();
1514                let pos = {
1515                    let mut pos = server_pos.lock().unwrap();
1516                    let prev = *pos;
1517                    *pos += 1;
1518                    prev
1519                };
1520
1521                ResponseTemplate::new(200).set_body_json(json!({
1522                    "txn_id": request.txn_id,
1523                    "pos": pos.to_string(),
1524                }))
1525            })
1526            .mount_as_scoped(&server)
1527            .await;
1528
1529        let client = logged_in_client(Some(server.uri())).await;
1530
1531        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1532
1533        // `pos` is `None` to start with.
1534        {
1535            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1536
1537            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1538            assert!(request.pos.is_none());
1539        }
1540
1541        let sync = sliding_sync.sync();
1542        pin_mut!(sync);
1543
1544        // Sync goes well, and then the position is saved both into the internal memory
1545        // and the database.
1546        let next = sync.next().await;
1547        assert_matches!(next, Some(Ok(_update_summary)));
1548
1549        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1550
1551        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1552            .await?
1553            .expect("must have restored fields");
1554
1555        // While it has been saved into the database, it's not necessarily going to be
1556        // used later!
1557        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1558
1559        // Now, even if we mess with the position stored in the database, the sliding
1560        // sync instance isn't configured to reload the stream position from the
1561        // database, so it won't be changed.
1562        {
1563            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1564
1565            let mut position_guard = other_sync.inner.position.lock().await;
1566            position_guard.pos = Some("yolo".to_owned());
1567
1568            other_sync.cache_to_storage(&position_guard).await?;
1569        }
1570
1571        // It's still 0, not "yolo".
1572        {
1573            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1574            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1575            assert_eq!(request.pos.as_deref(), Some("0"));
1576        }
1577
1578        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1579        // asked to.
1580        {
1581            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1582            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1583        }
1584
1585        Ok(())
1586    }
1587
1588    #[cfg(feature = "e2e-encryption")]
1589    #[async_test]
1590    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1591        let server = MockServer::start().await;
1592
1593        #[derive(Deserialize)]
1594        struct PartialRequest {
1595            txn_id: Option<String>,
1596        }
1597
1598        let server_pos = Arc::new(Mutex::new(0));
1599        let _mock_guard = Mock::given(SlidingSyncMatcher)
1600            .respond_with(move |request: &Request| {
1601                // Repeat the txn_id in the response, if set.
1602                let request: PartialRequest = request.body_json().unwrap();
1603                let pos = {
1604                    let mut pos = server_pos.lock().unwrap();
1605                    let prev = *pos;
1606                    *pos += 1;
1607                    prev
1608                };
1609
1610                ResponseTemplate::new(200).set_body_json(json!({
1611                    "txn_id": request.txn_id,
1612                    "pos": pos.to_string(),
1613                }))
1614            })
1615            .mount_as_scoped(&server)
1616            .await;
1617
1618        let client = logged_in_client(Some(server.uri())).await;
1619
1620        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1621
1622        // `pos` is `None` to start with.
1623        {
1624            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1625
1626            assert!(request.pos.is_none());
1627            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1628        }
1629
1630        let sync = sliding_sync.sync();
1631        pin_mut!(sync);
1632
1633        // Sync goes well, and then the position is saved both into the internal memory
1634        // and the database.
1635        let next = sync.next().await;
1636        assert_matches!(next, Some(Ok(_update_summary)));
1637
1638        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1639
1640        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1641            .await?
1642            .expect("must have restored fields");
1643
1644        // While it has been saved into the database, it's not necessarily going to be
1645        // used later!
1646        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1647
1648        // Another process modifies the stream position under our feet...
1649        {
1650            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1651
1652            let mut position_guard = other_sync.inner.position.lock().await;
1653            position_guard.pos = Some("42".to_owned());
1654
1655            other_sync.cache_to_storage(&position_guard).await?;
1656        }
1657
1658        // It's alright, the next request will load it from the database.
1659        {
1660            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1661            assert_eq!(request.pos.as_deref(), Some("42"));
1662            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1663        }
1664
1665        // Recreating a sliding sync with the same ID will reload it too.
1666        {
1667            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1668            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1669
1670            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1671            assert_eq!(request.pos.as_deref(), Some("42"));
1672        }
1673
1674        // Invalidating the session will remove the in-memory value AND the database
1675        // value.
1676        sliding_sync.expire_session().await;
1677
1678        {
1679            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1680
1681            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1682            assert!(request.pos.is_none());
1683        }
1684
1685        // And new sliding syncs with the same ID won't find it either.
1686        {
1687            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1688            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1689
1690            let (request, _, _) = sliding_sync.generate_sync_request().await?;
1691            assert!(request.pos.is_none());
1692        }
1693
1694        Ok(())
1695    }
1696
1697    #[async_test]
1698    async fn test_stop_sync_loop() -> Result<()> {
1699        let (_server, sliding_sync) = new_sliding_sync(vec![
1700            SlidingSyncList::builder("foo")
1701                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1702        ])
1703        .await?;
1704
1705        // Start the sync loop.
1706        let stream = sliding_sync.sync();
1707        pin_mut!(stream);
1708
1709        // The sync loop is actually running.
1710        assert!(stream.next().await.is_some());
1711
1712        // Stop the sync loop.
1713        sliding_sync.stop_sync()?;
1714
1715        // The sync loop is actually stopped.
1716        assert!(stream.next().await.is_none());
1717
1718        // Start a new sync loop.
1719        let stream = sliding_sync.sync();
1720        pin_mut!(stream);
1721
1722        // The sync loop is actually running.
1723        assert!(stream.next().await.is_some());
1724
1725        Ok(())
1726    }
1727
1728    #[async_test]
1729    async fn test_process_read_receipts() -> Result<()> {
1730        let room = owned_room_id!("!pony:example.org");
1731
1732        let server = MockServer::start().await;
1733        let client = logged_in_client(Some(server.uri())).await;
1734        client.event_cache().subscribe().unwrap();
1735
1736        let sliding_sync = client
1737            .sliding_sync("test")?
1738            .with_receipt_extension(
1739                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
1740            )
1741            .add_list(
1742                SlidingSyncList::builder("all")
1743                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1744            )
1745            .build()
1746            .await?;
1747
1748        // Initial state.
1749        {
1750            let server_response = assign!(http::Response::new("0".to_owned()), {
1751                rooms: BTreeMap::from([(
1752                    room.clone(),
1753                    http::response::Room::default(),
1754                )])
1755            });
1756
1757            let _summary = {
1758                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1759                sliding_sync
1760                    .handle_response(
1761                        server_response.clone(),
1762                        &mut pos_guard,
1763                        RequestedRequiredStates::default(),
1764                    )
1765                    .await?
1766            };
1767        }
1768
1769        let server_response = assign!(http::Response::new("1".to_owned()), {
1770            extensions: assign!(http::response::Extensions::default(), {
1771                receipts: assign!(http::response::Receipts::default(), {
1772                    rooms: BTreeMap::from([
1773                        (
1774                            room.clone(),
1775                            Raw::from_json_string(
1776                                json!({
1777                                    "room_id": room,
1778                                    "type": "m.receipt",
1779                                    "content": {
1780                                        "$event:bar.org": {
1781                                            "m.read": {
1782                                                client.user_id().unwrap(): {
1783                                                    "ts": 1436451550,
1784                                                }
1785                                            }
1786                                        }
1787                                    }
1788                                })
1789                                .to_string(),
1790                            ).unwrap()
1791                        )
1792                    ])
1793                })
1794            })
1795        });
1796
1797        let summary = {
1798            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1799            sliding_sync
1800                .handle_response(
1801                    server_response.clone(),
1802                    &mut pos_guard,
1803                    RequestedRequiredStates::default(),
1804                )
1805                .await?
1806        };
1807
1808        assert!(summary.rooms.contains(&room));
1809
1810        Ok(())
1811    }
1812
1813    #[async_test]
1814    async fn test_process_marked_unread_room_account_data() -> Result<()> {
1815        let room_id = owned_room_id!("!unicorn:example.org");
1816
1817        let server = MockServer::start().await;
1818        let client = logged_in_client(Some(server.uri())).await;
1819
1820        // Setup sliding sync with with one room and one list
1821
1822        let sliding_sync = client
1823            .sliding_sync("test")?
1824            .with_account_data_extension(
1825                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1826            )
1827            .add_list(
1828                SlidingSyncList::builder("all")
1829                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1830            )
1831            .build()
1832            .await?;
1833
1834        // Initial state.
1835        {
1836            let server_response = assign!(http::Response::new("0".to_owned()), {
1837                rooms: BTreeMap::from([(
1838                    room_id.clone(),
1839                    http::response::Room::default(),
1840                )])
1841            });
1842
1843            let _summary = {
1844                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1845                sliding_sync
1846                    .handle_response(
1847                        server_response.clone(),
1848                        &mut pos_guard,
1849                        RequestedRequiredStates::default(),
1850                    )
1851                    .await?
1852            };
1853        }
1854
1855        // Simulate a response that only changes the marked unread state of the room to
1856        // true
1857
1858        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
1859
1860        let update_summary = {
1861            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1862            sliding_sync
1863                .handle_response(
1864                    server_response.clone(),
1865                    &mut pos_guard,
1866                    RequestedRequiredStates::default(),
1867                )
1868                .await?
1869        };
1870
1871        // Check that the list list and entry received the update
1872
1873        assert!(update_summary.rooms.contains(&room_id));
1874
1875        let room = client.get_room(&room_id).unwrap();
1876
1877        // Check the actual room data, this powers RoomInfo
1878
1879        assert!(room.is_marked_unread());
1880
1881        // Change it back to false and check if it updates
1882
1883        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
1884
1885        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1886        sliding_sync
1887            .handle_response(
1888                server_response.clone(),
1889                &mut pos_guard,
1890                RequestedRequiredStates::default(),
1891            )
1892            .await?;
1893
1894        let room = client.get_room(&room_id).unwrap();
1895
1896        assert!(!room.is_marked_unread());
1897
1898        Ok(())
1899    }
1900
1901    fn make_mark_unread_response(
1902        response_number: &str,
1903        room_id: OwnedRoomId,
1904        unread: bool,
1905        add_rooms_section: bool,
1906    ) -> http::Response {
1907        let rooms = if add_rooms_section {
1908            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
1909        } else {
1910            BTreeMap::new()
1911        };
1912
1913        let extensions = assign!(http::response::Extensions::default(), {
1914            account_data: assign!(http::response::AccountData::default(), {
1915                rooms: BTreeMap::from([
1916                    (
1917                        room_id,
1918                        vec![
1919                            Raw::from_json_string(
1920                                json!({
1921                                    "content": {
1922                                        "unread": unread
1923                                    },
1924                                    "type": "m.marked_unread"
1925                                })
1926                                .to_string(),
1927                            ).unwrap()
1928                        ]
1929                    )
1930                ])
1931            })
1932        });
1933
1934        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
1935    }
1936
1937    #[async_test]
1938    async fn test_process_rooms_account_data() -> Result<()> {
1939        let room = owned_room_id!("!pony:example.org");
1940
1941        let server = MockServer::start().await;
1942        let client = logged_in_client(Some(server.uri())).await;
1943
1944        let sliding_sync = client
1945            .sliding_sync("test")?
1946            .with_account_data_extension(
1947                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
1948            )
1949            .add_list(
1950                SlidingSyncList::builder("all")
1951                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
1952            )
1953            .build()
1954            .await?;
1955
1956        // Initial state.
1957        {
1958            let server_response = assign!(http::Response::new("0".to_owned()), {
1959                rooms: BTreeMap::from([(
1960                    room.clone(),
1961                    http::response::Room::default(),
1962                )])
1963            });
1964
1965            let _summary = {
1966                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
1967                sliding_sync
1968                    .handle_response(
1969                        server_response.clone(),
1970                        &mut pos_guard,
1971                        RequestedRequiredStates::default(),
1972                    )
1973                    .await?
1974            };
1975        }
1976
1977        let server_response = assign!(http::Response::new("1".to_owned()), {
1978            extensions: assign!(http::response::Extensions::default(), {
1979                account_data: assign!(http::response::AccountData::default(), {
1980                    rooms: BTreeMap::from([
1981                        (
1982                            room.clone(),
1983                            vec![
1984                                Raw::from_json_string(
1985                                    json!({
1986                                        "content": {
1987                                            "tags": {
1988                                                "u.work": {
1989                                                    "order": 0.9
1990                                                }
1991                                            }
1992                                        },
1993                                        "type": "m.tag"
1994                                    })
1995                                    .to_string(),
1996                                ).unwrap()
1997                            ]
1998                        )
1999                    ])
2000                })
2001            })
2002        });
2003        let summary = {
2004            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2005            sliding_sync
2006                .handle_response(
2007                    server_response.clone(),
2008                    &mut pos_guard,
2009                    RequestedRequiredStates::default(),
2010                )
2011                .await?
2012        };
2013
2014        assert!(summary.rooms.contains(&room));
2015
2016        Ok(())
2017    }
2018
2019    #[async_test]
2020    #[cfg(feature = "e2e-encryption")]
2021    async fn test_process_only_encryption_events() -> Result<()> {
2022        use ruma::OneTimeKeyAlgorithm;
2023
2024        let room = owned_room_id!("!croissant:example.org");
2025
2026        let server = MockServer::start().await;
2027        let client = logged_in_client(Some(server.uri())).await;
2028
2029        let server_response = assign!(http::Response::new("0".to_owned()), {
2030            rooms: BTreeMap::from([(
2031                room.clone(),
2032                assign!(http::response::Room::default(), {
2033                    name: Some("Croissants lovers".to_owned()),
2034                    timeline: Vec::new(),
2035                }),
2036            )]),
2037
2038            extensions: assign!(http::response::Extensions::default(), {
2039                e2ee: assign!(http::response::E2EE::default(), {
2040                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2041                }),
2042                to_device: Some(assign!(http::response::ToDevice::default(), {
2043                    next_batch: "to-device-token".to_owned(),
2044                })),
2045            })
2046        });
2047
2048        // Don't process non-encryption events if the sliding sync is configured for
2049        // encryption only.
2050
2051        let sliding_sync = client
2052            .sliding_sync("test")?
2053            .with_to_device_extension(
2054                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2055            )
2056            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2057            .build()
2058            .await?;
2059
2060        {
2061            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2062
2063            sliding_sync
2064                .handle_response(
2065                    server_response.clone(),
2066                    &mut position_guard,
2067                    RequestedRequiredStates::default(),
2068                )
2069                .await?;
2070        }
2071
2072        // E2EE has been properly handled.
2073        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2074        assert_eq!(uploaded_key_count, 42);
2075
2076        {
2077            let olm_machine = &*client.olm_machine_for_testing().await;
2078            assert_eq!(
2079                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2080                Some("to-device-token")
2081            );
2082        }
2083
2084        // Room events haven't.
2085        assert!(client.get_room(&room).is_none());
2086
2087        // Conversely, only process room lists events if the sliding sync was configured
2088        // as so.
2089        let client = logged_in_client(Some(server.uri())).await;
2090
2091        let sliding_sync = client
2092            .sliding_sync("test")?
2093            .add_list(SlidingSyncList::builder("thelist"))
2094            .build()
2095            .await?;
2096
2097        {
2098            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2099
2100            sliding_sync
2101                .handle_response(
2102                    server_response.clone(),
2103                    &mut position_guard,
2104                    RequestedRequiredStates::default(),
2105                )
2106                .await?;
2107        }
2108
2109        // E2EE response has been ignored.
2110        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2111        assert_eq!(uploaded_key_count, 0);
2112
2113        {
2114            let olm_machine = &*client.olm_machine_for_testing().await;
2115            assert_eq!(
2116                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2117                None
2118            );
2119        }
2120
2121        // The room is now known.
2122        assert!(client.get_room(&room).is_some());
2123
2124        // And it's also possible to set up both.
2125        let client = logged_in_client(Some(server.uri())).await;
2126
2127        let sliding_sync = client
2128            .sliding_sync("test")?
2129            .add_list(SlidingSyncList::builder("thelist"))
2130            .with_to_device_extension(
2131                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2132            )
2133            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2134            .build()
2135            .await?;
2136
2137        {
2138            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2139
2140            sliding_sync
2141                .handle_response(
2142                    server_response.clone(),
2143                    &mut position_guard,
2144                    RequestedRequiredStates::default(),
2145                )
2146                .await?;
2147        }
2148
2149        // E2EE has been properly handled.
2150        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2151        assert_eq!(uploaded_key_count, 42);
2152
2153        {
2154            let olm_machine = &*client.olm_machine_for_testing().await;
2155            assert_eq!(
2156                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2157                Some("to-device-token")
2158            );
2159        }
2160
2161        // The room is now known.
2162        assert!(client.get_room(&room).is_some());
2163
2164        Ok(())
2165    }
2166
2167    #[async_test]
2168    async fn test_lock_multiple_requests() -> Result<()> {
2169        let server = MockServer::start().await;
2170        let client = logged_in_client(Some(server.uri())).await;
2171
2172        let pos = Arc::new(Mutex::new(0));
2173        let _mock_guard = Mock::given(SlidingSyncMatcher)
2174            .respond_with(move |_: &Request| {
2175                let mut pos = pos.lock().unwrap();
2176                *pos += 1;
2177                ResponseTemplate::new(200).set_body_json(json!({
2178                    "pos": pos.to_string(),
2179                    "lists": {},
2180                    "rooms": {}
2181                }))
2182            })
2183            .mount_as_scoped(&server)
2184            .await;
2185
2186        let sliding_sync = client
2187            .sliding_sync("test")?
2188            .with_to_device_extension(
2189                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2190            )
2191            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2192            .build()
2193            .await?;
2194
2195        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2196        // test would never terminate.
2197        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2198
2199        for result in requests.await {
2200            result?;
2201        }
2202
2203        Ok(())
2204    }
2205
2206    #[async_test]
2207    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2208        let server = MockServer::start().await;
2209        let client = logged_in_client(Some(server.uri())).await;
2210
2211        let pos = Arc::new(Mutex::new(0));
2212        let _mock_guard = Mock::given(SlidingSyncMatcher)
2213            .respond_with(move |_: &Request| {
2214                let mut pos = pos.lock().unwrap();
2215                *pos += 1;
2216                // Respond slowly enough that we can skip one iteration.
2217                ResponseTemplate::new(200)
2218                    .set_body_json(json!({
2219                        "pos": pos.to_string(),
2220                        "lists": {},
2221                        "rooms": {}
2222                    }))
2223                    .set_delay(Duration::from_secs(2))
2224            })
2225            .mount_as_scoped(&server)
2226            .await;
2227
2228        let sliding_sync =
2229            client
2230                .sliding_sync("test")?
2231                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2232                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2233                ))
2234                .add_list(
2235                    SlidingSyncList::builder("another-list")
2236                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2237                )
2238                .build()
2239                .await?;
2240
2241        let stream = sliding_sync.sync();
2242        pin_mut!(stream);
2243
2244        let cloned_sync = sliding_sync.clone();
2245        spawn(async move {
2246            tokio::time::sleep(Duration::from_millis(100)).await;
2247
2248            cloned_sync
2249                .on_list("another-list", |list| {
2250                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2251                    ready(())
2252                })
2253                .await;
2254        });
2255
2256        assert_matches!(stream.next().await, Some(Ok(_)));
2257
2258        sliding_sync.stop_sync().unwrap();
2259
2260        assert_matches!(stream.next().await, None);
2261
2262        let mut num_requests = 0;
2263
2264        for request in server.received_requests().await.unwrap() {
2265            if !SlidingSyncMatcher.matches(&request) {
2266                continue;
2267            }
2268
2269            let another_list_ranges = if num_requests == 0 {
2270                // First request
2271                json!([[0, 10]])
2272            } else {
2273                // Second request
2274                json!([[10, 20]])
2275            };
2276
2277            num_requests += 1;
2278            assert!(num_requests <= 2, "more than one request hit the server");
2279
2280            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2281
2282            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2283                &json_value,
2284                &json!({
2285                    "conn_id": "test",
2286                    "lists": {
2287                        "room-list": {
2288                            "ranges": [[0, 9]],
2289                            "required_state": [
2290                                ["m.room.encryption", ""],
2291                                ["m.room.tombstone", ""]
2292                            ],
2293                        },
2294                        "another-list": {
2295                            "ranges": another_list_ranges,
2296                            "required_state": [
2297                                ["m.room.encryption", ""],
2298                                ["m.room.tombstone", ""]
2299                            ],
2300                        },
2301                    }
2302                }),
2303                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2304            ) {
2305                dbg!(json_value);
2306                panic!("json differ: {err}");
2307            }
2308        }
2309
2310        assert_eq!(num_requests, 2);
2311
2312        Ok(())
2313    }
2314
2315    #[async_test]
2316    async fn test_timeout_zero_list() -> Result<()> {
2317        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2318
2319        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2320
2321        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2322        // on new update to pop.
2323        assert!(request.timeout.is_some());
2324
2325        Ok(())
2326    }
2327
2328    #[async_test]
2329    async fn test_timeout_one_list() -> Result<()> {
2330        let (_server, sliding_sync) = new_sliding_sync(vec![
2331            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2332        ])
2333        .await?;
2334
2335        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2336
2337        // The list does not require a timeout.
2338        assert!(request.timeout.is_none());
2339
2340        // Simulate a response.
2341        {
2342            let server_response = assign!(http::Response::new("0".to_owned()), {
2343                lists: BTreeMap::from([(
2344                    "foo".to_owned(),
2345                    assign!(http::response::List::default(), {
2346                        count: uint!(7),
2347                    })
2348                 )])
2349            });
2350
2351            let _summary = {
2352                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2353                sliding_sync
2354                    .handle_response(
2355                        server_response.clone(),
2356                        &mut pos_guard,
2357                        RequestedRequiredStates::default(),
2358                    )
2359                    .await?
2360            };
2361        }
2362
2363        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2364
2365        // The list is now fully loaded, so it requires a timeout.
2366        assert!(request.timeout.is_some());
2367
2368        Ok(())
2369    }
2370
2371    #[async_test]
2372    async fn test_timeout_three_lists() -> Result<()> {
2373        let (_server, sliding_sync) = new_sliding_sync(vec![
2374            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2375            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2376            SlidingSyncList::builder("baz")
2377                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2378        ])
2379        .await?;
2380
2381        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2382
2383        // Two lists don't require a timeout.
2384        assert!(request.timeout.is_none());
2385
2386        // Simulate a response.
2387        {
2388            let server_response = assign!(http::Response::new("0".to_owned()), {
2389                lists: BTreeMap::from([(
2390                    "foo".to_owned(),
2391                    assign!(http::response::List::default(), {
2392                        count: uint!(7),
2393                    })
2394                 )])
2395            });
2396
2397            let _summary = {
2398                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2399                sliding_sync
2400                    .handle_response(
2401                        server_response.clone(),
2402                        &mut pos_guard,
2403                        RequestedRequiredStates::default(),
2404                    )
2405                    .await?
2406            };
2407        }
2408
2409        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2410
2411        // One don't require a timeout.
2412        assert!(request.timeout.is_none());
2413
2414        // Simulate a response.
2415        {
2416            let server_response = assign!(http::Response::new("1".to_owned()), {
2417                lists: BTreeMap::from([(
2418                    "bar".to_owned(),
2419                    assign!(http::response::List::default(), {
2420                        count: uint!(7),
2421                    })
2422                 )])
2423            });
2424
2425            let _summary = {
2426                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2427                sliding_sync
2428                    .handle_response(
2429                        server_response.clone(),
2430                        &mut pos_guard,
2431                        RequestedRequiredStates::default(),
2432                    )
2433                    .await?
2434            };
2435        }
2436
2437        let (request, _, _) = sliding_sync.generate_sync_request().await?;
2438
2439        // All lists require a timeout.
2440        assert!(request.timeout.is_some());
2441
2442        Ok(())
2443    }
2444
2445    #[async_test]
2446    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2447        let server = MockServer::start().await;
2448        let client = logged_in_client(Some(server.uri())).await;
2449
2450        let _mock_guard = Mock::given(SlidingSyncMatcher)
2451            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2452                "pos": "0",
2453                "lists": {},
2454                "rooms": {}
2455            })))
2456            .mount_as_scoped(&server)
2457            .await;
2458
2459        let sliding_sync = client
2460            .sliding_sync("test")?
2461            .with_to_device_extension(
2462                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2463            )
2464            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2465            .build()
2466            .await?;
2467
2468        let sliding_sync = Arc::new(sliding_sync);
2469
2470        // Create the listener and perform a sync request
2471        let sync_beat_listener = client.inner.sync_beat.listen();
2472        sliding_sync.sync_once().await?;
2473
2474        // The sync beat listener should be notified shortly after
2475        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2476        Ok(())
2477    }
2478
2479    #[async_test]
2480    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2481        let server = MockServer::start().await;
2482        let client = logged_in_client(Some(server.uri())).await;
2483
2484        let _mock_guard = Mock::given(SlidingSyncMatcher)
2485            .respond_with(ResponseTemplate::new(404))
2486            .mount_as_scoped(&server)
2487            .await;
2488
2489        let sliding_sync = client
2490            .sliding_sync("test")?
2491            .with_to_device_extension(
2492                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2493            )
2494            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2495            .build()
2496            .await?;
2497
2498        let sliding_sync = Arc::new(sliding_sync);
2499
2500        // Create the listener and perform a sync request
2501        let sync_beat_listener = client.inner.sync_beat.listen();
2502        let sync_result = sliding_sync.sync_once().await;
2503        assert!(sync_result.is_err());
2504
2505        // The sync beat listener won't be notified in this case
2506        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2507
2508        Ok(())
2509    }
2510
2511    #[async_test]
2512    async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2513        let server = MatrixMockServer::new().await;
2514        let client = server.client_builder().build().await;
2515        let room_id = room_id!("!mu5hr00m:example.org");
2516
2517        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2518            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2519                "pos": "0",
2520                "lists": {},
2521                "extensions": {
2522                    "account_data": {
2523                        "global": [
2524                            {
2525                                "type": "m.direct",
2526                                "content": {
2527                                    "@de4dlockh0lmes:example.org": [
2528                                        "!mu5hr00m:example.org"
2529                                    ]
2530                                }
2531                            }
2532                        ]
2533                    }
2534                },
2535                "rooms": {
2536                    room_id: {
2537                        "name": "Mario Bros Fanbase Room",
2538                        "initial": true,
2539                    },
2540                }
2541            })))
2542            .mount_as_scoped(server.server())
2543            .await;
2544
2545        let f = EventFactory::new().room(room_id);
2546
2547        Mock::given(method("GET"))
2548            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2549            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2550                "chunk": [
2551                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2552                ]
2553            })))
2554            .mount(server.server())
2555            .await;
2556
2557        let (tx, rx) = tokio::sync::oneshot::channel();
2558
2559        let tx = Arc::new(Mutex::new(Some(tx)));
2560        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2561            // Try to run a /members query while in a event handler.
2562            let members =
2563                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2564            assert_eq!(members.len(), 1);
2565            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2566        });
2567
2568        let sliding_sync = client
2569            .sliding_sync("test")?
2570            .add_list(SlidingSyncList::builder("thelist"))
2571            .with_account_data_extension(
2572                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2573            )
2574            .build()
2575            .await?;
2576
2577        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2578            .await
2579            .expect("Sync did not complete in time")
2580            .expect("Sync failed");
2581
2582        // Wait for the event handler to complete.
2583        tokio::time::timeout(Duration::from_secs(5), rx)
2584            .await
2585            .expect("Event handler did not complete in time")
2586            .expect("Event handler failed");
2587
2588        Ok(())
2589    }
2590}