matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28    collections::{btree_map::Entry, BTreeMap},
29    fmt::Debug,
30    future::Future,
31    sync::{Arc, RwLock as StdRwLock},
32    time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
39use ruma::{
40    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
41    assign, OwnedRoomId, RoomId,
42};
43use serde::{Deserialize, Serialize};
44use tokio::{
45    select,
46    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
47};
48use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
49
50#[cfg(feature = "e2e-encryption")]
51use self::utils::JoinHandleExt as _;
52pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
53use self::{
54    cache::restore_sliding_sync_state,
55    client::SlidingSyncResponseProcessor,
56    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
57};
58use crate::{config::RequestConfig, Client, Result};
59
60/// The Sliding Sync instance.
61///
62/// It is OK to clone this type as much as you need: cloning it is cheap.
63#[derive(Clone, Debug)]
64pub struct SlidingSync {
65    /// The Sliding Sync data.
66    inner: Arc<SlidingSyncInner>,
67}
68
69#[derive(Debug)]
70pub(super) struct SlidingSyncInner {
71    /// A unique identifier for this instance of sliding sync.
72    ///
73    /// Used to distinguish different connections to sliding sync.
74    id: String,
75
76    /// The HTTP Matrix client.
77    client: Client,
78
79    /// Long-polling timeout that appears in sliding sync request.
80    poll_timeout: Duration,
81
82    /// Extra duration for the sliding sync request to timeout. This is added to
83    /// the [`Self::proxy_timeout`].
84    network_timeout: Duration,
85
86    /// The storage key to keep this cache at and load it from.
87    storage_key: String,
88
89    /// Should this sliding sync instance try to restore its sync position
90    /// from the database?
91    ///
92    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
93    /// keep it even so, to avoid sparkling cfg statements everywhere
94    /// throughout this file.
95    share_pos: bool,
96
97    /// Position markers.
98    ///
99    /// The `pos` marker represents a progression when exchanging requests and
100    /// responses with the server: the server acknowledges the request by
101    /// responding with a new `pos`. If the client sends two non-necessarily
102    /// consecutive requests with the same `pos`, the server has to reply with
103    /// the same identical response.
104    ///
105    /// `position` is behind a mutex so that a new request starts after the
106    /// previous request trip has fully ended (successfully or not). This
107    /// mechanism exists to wait for the response to be handled and to see the
108    /// `position` being updated, before sending a new request.
109    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
110
111    /// The lists of this Sliding Sync instance.
112    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
113
114    /// All the rooms synced with Sliding Sync.
115    rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
116
117    /// Request parameters that are sticky.
118    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
119
120    /// Internal channel used to pass messages between Sliding Sync and other
121    /// types.
122    internal_channel: Sender<SlidingSyncInternalMessage>,
123}
124
125impl SlidingSync {
126    pub(super) fn new(inner: SlidingSyncInner) -> Self {
127        Self { inner: Arc::new(inner) }
128    }
129
130    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
131        cache::store_sliding_sync_state(self, position).await
132    }
133
134    /// Create a new [`SlidingSyncBuilder`].
135    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
136        SlidingSyncBuilder::new(id, client)
137    }
138
139    /// Subscribe to many rooms.
140    ///
141    /// If the associated `Room`s exist, it will be marked as
142    /// members are missing, so that it ensures to re-fetch all members.
143    ///
144    /// A subscription to an already subscribed room is ignored.
145    pub fn subscribe_to_rooms(
146        &self,
147        room_ids: &[&RoomId],
148        settings: Option<http::request::RoomSubscription>,
149        cancel_in_flight_request: bool,
150    ) {
151        let settings = settings.unwrap_or_default();
152        let mut sticky = self.inner.sticky.write().unwrap();
153        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
154
155        let mut skip_over_current_sync_loop_iteration = false;
156
157        for room_id in room_ids {
158            // If the room subscription already exists, let's not
159            // override it with a new one. First, it would reset its
160            // state (`RoomSubscriptionState`), and second it would try to
161            // re-subscribe with the next request. We don't want that. A room
162            // subscription should happen once, and next subscriptions should
163            // be ignored.
164            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
165                if let Some(room) = self.inner.client.get_room(room_id) {
166                    room.mark_members_missing();
167                }
168
169                entry.insert((RoomSubscriptionState::default(), settings.clone()));
170
171                skip_over_current_sync_loop_iteration = true;
172            }
173        }
174
175        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
176            self.inner.internal_channel_send_if_possible(
177                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
178            );
179        }
180    }
181
182    /// Lookup a specific room
183    pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
184        self.inner.rooms.read().await.get(room_id).cloned()
185    }
186
187    /// Check the number of rooms.
188    pub fn get_number_of_rooms(&self) -> usize {
189        self.inner.rooms.blocking_read().len()
190    }
191
192    /// Find a list by its name, and do something on it if it exists.
193    pub async fn on_list<Function, FunctionOutput, R>(
194        &self,
195        list_name: &str,
196        function: Function,
197    ) -> Option<R>
198    where
199        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
200        FunctionOutput: Future<Output = R>,
201    {
202        let lists = self.inner.lists.read().await;
203
204        match lists.get(list_name) {
205            Some(list) => Some(function(list).await),
206            None => None,
207        }
208    }
209
210    /// Add the list to the list of lists.
211    ///
212    /// As lists need to have a unique `.name`, if a list with the same name
213    /// is found the new list will replace the old one and the return it or
214    /// `None`.
215    pub async fn add_list(
216        &self,
217        list_builder: SlidingSyncListBuilder,
218    ) -> Result<Option<SlidingSyncList>> {
219        let list = list_builder.build(self.inner.internal_channel.clone());
220
221        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
222
223        self.inner.internal_channel_send_if_possible(
224            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
225        );
226
227        Ok(old_list)
228    }
229
230    /// Add a list that will be cached and reloaded from the cache.
231    ///
232    /// This will raise an error if a storage key was not set, or if there
233    /// was a I/O error reading from the cache.
234    ///
235    /// The rest of the semantics is the same as [`Self::add_list`].
236    pub async fn add_cached_list(
237        &self,
238        mut list_builder: SlidingSyncListBuilder,
239    ) -> Result<Option<SlidingSyncList>> {
240        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
241
242        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
243
244        self.add_list(list_builder).await
245    }
246
247    /// Lookup a set of rooms
248    pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
249        &self,
250        room_ids: I,
251    ) -> Vec<Option<SlidingSyncRoom>> {
252        let rooms = self.inner.rooms.read().await;
253
254        room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
255    }
256
257    /// Get all rooms.
258    pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
259        self.inner.rooms.read().await.values().cloned().collect()
260    }
261
262    /// Handle the HTTP response.
263    #[instrument(skip_all)]
264    async fn handle_response(
265        &self,
266        sliding_sync_response: http::Response,
267        position: &mut SlidingSyncPositionMarkers,
268    ) -> Result<UpdateSummary, crate::Error> {
269        let pos = Some(sliding_sync_response.pos.clone());
270
271        let must_process_rooms_response = self.must_process_rooms_response().await;
272
273        trace!(yes = must_process_rooms_response, "Must process rooms response?");
274
275        // Transform a Sliding Sync Response to a `SyncResponse`.
276        //
277        // We may not need the `sync_response` in the future (once `SyncResponse` will
278        // move to Sliding Sync, i.e. to `http::Response`), but processing the
279        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
280        // happens here.
281
282        let mut sync_response = {
283            // Take the lock to avoid concurrent sliding syncs overwriting each other's room
284            // infos.
285            let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
286
287            let rooms = &*self.inner.rooms.read().await;
288            let mut response_processor =
289                SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
290
291            #[cfg(feature = "e2e-encryption")]
292            if self.is_e2ee_enabled() {
293                response_processor.handle_encryption(&sliding_sync_response.extensions).await?
294            }
295
296            // Only handle the room's subsection of the response, if this sliding sync was
297            // configured to do so.
298            if must_process_rooms_response {
299                response_processor.handle_room_response(&sliding_sync_response).await?;
300            }
301
302            response_processor.process_and_take_response().await?
303        };
304
305        debug!(?sync_response, "Sliding Sync response has been handled by the client");
306
307        // Commit sticky parameters, if needed.
308        if let Some(ref txn_id) = sliding_sync_response.txn_id {
309            let txn_id = txn_id.as_str().into();
310            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
311            let mut lists = self.inner.lists.write().await;
312            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
313        }
314
315        let update_summary = {
316            // Update the rooms.
317            let updated_rooms = {
318                let mut rooms_map = self.inner.rooms.write().await;
319
320                let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
321
322                for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
323                    // `sync_response` contains the rooms with decrypted events if any, so look at
324                    // the timeline events here first if the room exists.
325                    // Otherwise, let's look at the timeline inside the `sliding_sync_response`.
326                    let timeline =
327                        if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
328                            joined_room.timeline.events
329                        } else {
330                            room_data.timeline.drain(..).map(TimelineEvent::new).collect()
331                        };
332
333                    match rooms_map.get_mut(&room_id) {
334                        // The room existed before, let's update it.
335                        Some(room) => {
336                            room.update(room_data, timeline);
337                        }
338
339                        // First time we need this room, let's create it.
340                        None => {
341                            rooms_map.insert(
342                                room_id.clone(),
343                                SlidingSyncRoom::new(
344                                    room_id.clone(),
345                                    room_data.prev_batch,
346                                    timeline,
347                                ),
348                            );
349                        }
350                    }
351
352                    updated_rooms.push(room_id);
353                }
354
355                // There might be other rooms that were only mentioned in the sliding sync
356                // extensions part of the response, and thus would result in rooms present in
357                // the `sync_response.join`. Mark them as updated too.
358                //
359                // Since we've removed rooms that were in the room subsection from
360                // `sync_response.rooms.join`, the remaining ones aren't already present in
361                // `updated_rooms` and wouldn't cause any duplicates.
362                updated_rooms.extend(sync_response.rooms.join.keys().cloned());
363
364                updated_rooms
365            };
366
367            // Update the lists.
368            let updated_lists = {
369                debug!(
370                    lists = ?sliding_sync_response.lists,
371                    "Update lists"
372                );
373
374                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
375                let mut lists = self.inner.lists.write().await;
376
377                // Iterate on known lists, not on lists in the response. Rooms may have been
378                // updated that were not involved in any list update.
379                for (name, list) in lists.iter_mut() {
380                    if let Some(updates) = sliding_sync_response.lists.get(name) {
381                        let maximum_number_of_rooms: u32 =
382                            updates.count.try_into().expect("failed to convert `count` to `u32`");
383
384                        if list.update(Some(maximum_number_of_rooms))? {
385                            updated_lists.push(name.clone());
386                        }
387                    } else if list.update(None)? {
388                        updated_lists.push(name.clone());
389                    }
390                }
391
392                // Report about unknown lists.
393                for name in sliding_sync_response.lists.keys() {
394                    if !lists.contains_key(name) {
395                        error!("Response for list `{name}` - unknown to us; skipping");
396                    }
397                }
398
399                updated_lists
400            };
401
402            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
403        };
404
405        // Everything went well, we can update the position markers.
406        //
407        // Save the new position markers.
408        position.pos = pos;
409
410        Ok(update_summary)
411    }
412
413    async fn generate_sync_request(
414        &self,
415        txn_id: &mut LazyTransactionId,
416    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
417        // Collect requests for lists.
418        let mut requests_lists = BTreeMap::new();
419
420        let require_timeout = {
421            let lists = self.inner.lists.read().await;
422
423            // Start at `true` in case there is zero list.
424            let mut require_timeout = true;
425
426            for (name, list) in lists.iter() {
427                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
428                require_timeout = require_timeout && list.requires_timeout();
429            }
430
431            require_timeout
432        };
433
434        // Collect the `pos`.
435        //
436        // Wait on the `position` mutex to be available. It means no request nor
437        // response is running. The `position` mutex is released whether the response
438        // has been fully handled successfully, in this case the `pos` is updated, or
439        // the response handling has failed, in this case the `pos` hasn't been updated
440        // and the same `pos` will be used for this new request.
441        let mut position_guard = self.inner.position.clone().lock_owned().await;
442
443        let to_device_enabled =
444            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
445
446        let restored_fields = if self.inner.share_pos || to_device_enabled {
447            let lists = self.inner.lists.read().await;
448            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
449        } else {
450            None
451        };
452
453        // Update pos: either the one restored from the database, if any and the sliding
454        // sync was configured so, or read it from the memory cache.
455        let pos = if self.inner.share_pos {
456            if let Some(fields) = &restored_fields {
457                // Override the memory one with the database one, for consistency.
458                if fields.pos != position_guard.pos {
459                    info!(
460                        "Pos from previous request ('{:?}') was different from \
461                         pos in database ('{:?}').",
462                        position_guard.pos, fields.pos
463                    );
464                    position_guard.pos = fields.pos.clone();
465                }
466                fields.pos.clone()
467            } else {
468                position_guard.pos.clone()
469            }
470        } else {
471            position_guard.pos.clone()
472        };
473
474        Span::current().record("pos", &pos);
475
476        // When the client sends a request with no `pos`, MSC4186 returns no device
477        // lists updates, as it only returns changes since the provided `pos`
478        // (which is `null` in this case); this is in line with sync v2.
479        //
480        // Therefore, with MSC4186, the device list cache must be marked as to be
481        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
482        // device lists updates that happened between the previous request and the new
483        // “initial” request.
484        #[cfg(feature = "e2e-encryption")]
485        if pos.is_none() && self.is_e2ee_enabled() {
486            info!("Marking all tracked users as dirty");
487
488            let olm_machine = self.inner.client.olm_machine().await;
489            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
490            olm_machine.mark_all_tracked_users_as_dirty().await?;
491        }
492
493        // Configure the timeout.
494        //
495        // The `timeout` query is necessary when all lists require it. Please see
496        // [`SlidingSyncList::requires_timeout`].
497        let timeout = require_timeout.then(|| self.inner.poll_timeout);
498
499        let mut request = assign!(http::Request::new(), {
500            conn_id: Some(self.inner.id.clone()),
501            pos,
502            timeout,
503            lists: requests_lists,
504        });
505
506        // Apply sticky parameters, if needs be.
507        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
508
509        // Extensions are now applied (via sticky parameters).
510        //
511        // Override the to-device token if the extension is enabled.
512        if to_device_enabled {
513            request.extensions.to_device.since =
514                restored_fields.and_then(|fields| fields.to_device_token);
515        }
516
517        // Apply the transaction id if one was generated.
518        if let Some(txn_id) = txn_id.get() {
519            request.txn_id = Some(txn_id.to_string());
520        }
521
522        Ok((
523            // The request itself.
524            request,
525            // Configure long-polling. We need some time for the long-poll itself,
526            // and extra time for the network delays.
527            RequestConfig::default()
528                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
529                .retry_limit(3),
530            position_guard,
531        ))
532    }
533
534    /// Send a sliding sync request.
535    ///
536    /// This method contains the sending logic.
537    async fn send_sync_request(
538        &self,
539        request: http::Request,
540        request_config: RequestConfig,
541        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
542    ) -> Result<UpdateSummary> {
543        debug!("Sending request");
544
545        // Prepare the request.
546        let request = self.inner.client.send(request).with_request_config(request_config);
547
548        // Send the request and get a response with end-to-end encryption support.
549        //
550        // Sending the `/sync` request out when end-to-end encryption is enabled means
551        // that we need to also send out any outgoing e2ee related request out
552        // coming from the `OlmMachine::outgoing_requests()` method.
553
554        #[cfg(feature = "e2e-encryption")]
555        let response = {
556            if self.is_e2ee_enabled() {
557                // Here, we need to run 2 things:
558                //
559                // 1. Send the sliding sync request and get a response,
560                // 2. Send the E2EE requests.
561                //
562                // We don't want to use a `join` or `try_join` because we want to fail if and
563                // only if sending the sliding sync request fails. Failing to send the E2EE
564                // requests should just result in a log.
565                //
566                // We also want to give the priority to sliding sync request. E2EE requests are
567                // sent concurrently to the sliding sync request, but the priority is on waiting
568                // a sliding sync response.
569                //
570                // If sending sliding sync request fails, the sending of E2EE requests must be
571                // aborted as soon as possible.
572
573                let client = self.inner.client.clone();
574                let e2ee_uploads = spawn(async move {
575                    if let Err(error) = client.send_outgoing_requests().await {
576                        error!(?error, "Error while sending outgoing E2EE requests");
577                    }
578                })
579                // Ensure that the task is not running in detached mode. It is aborted when it's
580                // dropped.
581                .abort_on_drop();
582
583                // Wait on the sliding sync request success or failure early.
584                let response = request.await?;
585
586                // At this point, if `request` has been resolved successfully, we wait on
587                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
588                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
589                // been dropped, so aborted.
590                e2ee_uploads.await.map_err(|error| Error::JoinError {
591                    task_description: "e2ee_uploads".to_owned(),
592                    error,
593                })?;
594
595                response
596            } else {
597                request.await?
598            }
599        };
600
601        // Send the request and get a response _without_ end-to-end encryption support.
602        #[cfg(not(feature = "e2e-encryption"))]
603        let response = request.await?;
604
605        debug!("Received response");
606
607        // At this point, the request has been sent, and a response has been received.
608        //
609        // We must ensure the handling of the response cannot be stopped/
610        // cancelled. It must be done entirely, otherwise we can have
611        // corrupted/incomplete states for Sliding Sync and other parts of
612        // the code.
613        //
614        // That's why we are running the handling of the response in a spawned
615        // future that cannot be cancelled by anything.
616        let this = self.clone();
617
618        // Spawn a new future to ensure that the code inside this future cannot be
619        // cancelled if this method is cancelled.
620        let future = async move {
621            debug!("Start handling response");
622
623            // In case the task running this future is detached, we must
624            // ensure responses are handled one at a time. At this point we still own
625            // `position_guard`, so we're fine.
626
627            // Handle the response.
628            let updates = this.handle_response(response, &mut position_guard).await?;
629
630            this.cache_to_storage(&position_guard).await?;
631
632            // Release the position guard lock.
633            // It means that other responses can be generated and then handled later.
634            drop(position_guard);
635
636            debug!("Done handling response");
637
638            Ok(updates)
639        };
640
641        spawn(future.instrument(Span::current())).await.unwrap()
642    }
643
644    /// Is the e2ee extension enabled for this sliding sync instance?
645    #[cfg(feature = "e2e-encryption")]
646    fn is_e2ee_enabled(&self) -> bool {
647        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
648    }
649
650    #[cfg(not(feature = "e2e-encryption"))]
651    fn is_e2ee_enabled(&self) -> bool {
652        false
653    }
654
655    /// Should we process the room's subpart of a response?
656    async fn must_process_rooms_response(&self) -> bool {
657        // We consider that we must, if there's any room subscription or there's any
658        // list.
659        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
660            || !self.inner.lists.read().await.is_empty()
661    }
662
663    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
664    async fn sync_once(&self) -> Result<UpdateSummary> {
665        let (request, request_config, position_guard) =
666            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
667
668        // Send the request, kaboom.
669        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
670
671        // Notify a new sync was received
672        self.inner.client.inner.sync_beat.notify(usize::MAX);
673
674        Ok(summaries)
675    }
676
677    /// Create a _new_ Sliding Sync sync loop.
678    ///
679    /// This method returns a `Stream`, which will send requests and will handle
680    /// responses automatically. Lists and rooms are updated automatically.
681    ///
682    /// This function returns `Ok(…)` if everything went well, otherwise it will
683    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
684    /// termination.
685    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
686    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
687    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
688        debug!("Starting sync stream");
689
690        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
691
692        stream! {
693            loop {
694                debug!("Sync stream is running");
695
696                select! {
697                    biased;
698
699                    internal_message = internal_channel_receiver.recv() => {
700                        use SlidingSyncInternalMessage::*;
701
702                        debug!(?internal_message, "Sync stream has received an internal message");
703
704                        match internal_message {
705                            Err(_) | Ok(SyncLoopStop) => {
706                                break;
707                            }
708
709                            Ok(SyncLoopSkipOverCurrentIteration) => {
710                                continue;
711                            }
712                        }
713                    }
714
715                    update_summary = self.sync_once() => {
716                        match update_summary {
717                            Ok(updates) => {
718                                yield Ok(updates);
719                            }
720
721                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
722                            Err(error) => {
723                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
724                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
725                                    self.expire_session().await;
726                                }
727
728                                yield Err(error);
729
730                                // Terminates the loop, and terminates the stream.
731                                break;
732                            }
733                        }
734                    }
735                }
736            }
737
738            debug!("Sync stream has exited.");
739        }
740    }
741
742    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
743    ///
744    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
745    /// enough to “stop” it, but depending of how this `Stream` is used, it
746    /// might not be obvious to drop it immediately (thinking of using this API
747    /// over FFI; the foreign-language might not be able to drop a value
748    /// immediately). Thus, calling this method will ensure that the sync loop
749    /// stops gracefully and as soon as it returns.
750    pub fn stop_sync(&self) -> Result<()> {
751        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
752    }
753
754    /// Expire the current Sliding Sync session on the client-side.
755    ///
756    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
757    /// sticky parameters.
758    ///
759    /// This should only be used when it's clear that this session was about to
760    /// expire anyways, and should be used only in very specific cases (e.g.
761    /// multiple sliding syncs being run in parallel, and one of them has
762    /// expired).
763    ///
764    /// This method **MUST** be called when the sync loop is stopped.
765    #[doc(hidden)]
766    pub async fn expire_session(&self) {
767        info!("Session expired; resetting `pos` and sticky parameters");
768
769        {
770            let mut position = self.inner.position.lock().await;
771            position.pos = None;
772
773            if let Err(err) = self.cache_to_storage(&position).await {
774                error!(
775                    "couldn't invalidate sliding sync frozen state when expiring session: {err}"
776                );
777            }
778        }
779
780        {
781            let mut sticky = self.inner.sticky.write().unwrap();
782
783            // Clear all room subscriptions: we don't want to resend all room subscriptions
784            // when the session will restart.
785            sticky.data_mut().room_subscriptions.clear();
786        }
787
788        self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
789    }
790}
791
792impl SlidingSyncInner {
793    /// Send a message over the internal channel.
794    #[instrument]
795    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
796        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
797    }
798
799    /// Send a message over the internal channel if there is a receiver, i.e. if
800    /// the sync loop is running.
801    #[instrument]
802    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
803        // If there is no receiver, the send will fail, but that's OK here.
804        let _ = self.internal_channel.send(message);
805    }
806}
807
808#[derive(Copy, Clone, Debug, PartialEq)]
809enum SlidingSyncInternalMessage {
810    /// Instruct the sync loop to stop.
811    SyncLoopStop,
812
813    /// Instruct the sync loop to skip over any remaining work in its iteration,
814    /// and to jump to the next iteration.
815    SyncLoopSkipOverCurrentIteration,
816}
817
818#[cfg(any(test, feature = "testing"))]
819impl SlidingSync {
820    /// Set a new value for `pos`.
821    pub async fn set_pos(&self, new_pos: String) {
822        let mut position_lock = self.inner.position.lock().await;
823        position_lock.pos = Some(new_pos);
824    }
825
826    /// Read the static extension configuration for this Sliding Sync.
827    ///
828    /// Note: this is not the next content of the sticky parameters, but rightly
829    /// the static configuration that was set during creation of this
830    /// Sliding Sync.
831    pub fn extensions_config(&self) -> http::request::Extensions {
832        let sticky = self.inner.sticky.read().unwrap();
833        sticky.data().extensions.clone()
834    }
835}
836
837#[derive(Clone, Debug)]
838pub(super) struct SlidingSyncPositionMarkers {
839    /// An ephemeral position in the current stream, as received from the
840    /// previous `/sync` response, or `None` for the first request.
841    ///
842    /// Should not be persisted.
843    pos: Option<String>,
844}
845
846/// Frozen bits of a Sliding Sync that are stored in the *state* store.
847#[derive(Debug, Serialize, Deserialize)]
848struct FrozenSlidingSync {
849    /// Deprecated: prefer storing in the crypto store.
850    #[serde(skip_serializing_if = "Option::is_none")]
851    to_device_since: Option<String>,
852    #[serde(default, skip_serializing_if = "Vec::is_empty")]
853    rooms: Vec<FrozenSlidingSyncRoom>,
854}
855
856impl FrozenSlidingSync {
857    fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
858        // The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
859        Self {
860            to_device_since: None,
861            rooms: rooms
862                .iter()
863                .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
864                .collect::<Vec<_>>(),
865        }
866    }
867}
868
869#[derive(Serialize, Deserialize)]
870struct FrozenSlidingSyncPos {
871    #[serde(skip_serializing_if = "Option::is_none")]
872    pos: Option<String>,
873}
874
875/// A summary of the updates received after a sync (like in
876/// [`SlidingSync::sync`]).
877#[derive(Debug, Clone)]
878pub struct UpdateSummary {
879    /// The names of the lists that have seen an update.
880    pub lists: Vec<String>,
881    /// The rooms that have seen updates
882    pub rooms: Vec<OwnedRoomId>,
883}
884
885/// A very basic bool-ish enum to represent the state of a
886/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
887/// once should ideally not being sent again, to mostly save bandwidth.
888#[derive(Debug, Default)]
889enum RoomSubscriptionState {
890    /// The `RoomSubscription` has not been sent or received correctly from the
891    /// server, i.e. the `RoomSubscription` —which is part of the sticky
892    /// parameters— has not been committed.
893    #[default]
894    Pending,
895
896    /// The `RoomSubscription` has been sent and received correctly by the
897    /// server.
898    Applied,
899}
900
901/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
902/// sent in the request.
903#[derive(Debug)]
904pub(super) struct SlidingSyncStickyParameters {
905    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
906    /// but one wants to receive updates.
907    room_subscriptions:
908        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
909
910    /// The intended state of the extensions being supplied to sliding /sync
911    /// calls.
912    extensions: http::request::Extensions,
913}
914
915impl SlidingSyncStickyParameters {
916    /// Create a new set of sticky parameters.
917    pub fn new(
918        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
919        extensions: http::request::Extensions,
920    ) -> Self {
921        Self {
922            room_subscriptions: room_subscriptions
923                .into_iter()
924                .map(|(room_id, room_subscription)| {
925                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
926                })
927                .collect(),
928            extensions,
929        }
930    }
931}
932
933impl StickyData for SlidingSyncStickyParameters {
934    type Request = http::Request;
935
936    fn apply(&self, request: &mut Self::Request) {
937        request.room_subscriptions = self
938            .room_subscriptions
939            .iter()
940            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
941            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
942            .collect();
943        request.extensions = self.extensions.clone();
944    }
945
946    fn on_commit(&mut self) {
947        // All room subscriptions are marked as `Applied`.
948        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
949            if matches!(state, RoomSubscriptionState::Pending) {
950                *state = RoomSubscriptionState::Applied;
951            }
952        }
953    }
954}
955
956#[cfg(all(test, not(target_family = "wasm")))]
957#[allow(clippy::dbg_macro)]
958mod tests {
959    use std::{
960        collections::BTreeMap,
961        future::ready,
962        ops::Not,
963        sync::{Arc, Mutex},
964        time::Duration,
965    };
966
967    use assert_matches::assert_matches;
968    use event_listener::Listener;
969    use futures_util::{future::join_all, pin_mut, StreamExt};
970    use matrix_sdk_test::async_test;
971    use ruma::{
972        api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
973        OwnedRoomId, TransactionId,
974    };
975    use serde::Deserialize;
976    use serde_json::json;
977    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
978
979    use super::{
980        http,
981        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
982        FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
983        SlidingSyncStickyParameters,
984    };
985    use crate::{
986        sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
987    };
988
989    #[derive(Copy, Clone)]
990    struct SlidingSyncMatcher;
991
992    impl Match for SlidingSyncMatcher {
993        fn matches(&self, request: &Request) -> bool {
994            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
995                && request.method == Method::POST
996        }
997    }
998
999    async fn new_sliding_sync(
1000        lists: Vec<SlidingSyncListBuilder>,
1001    ) -> Result<(MockServer, SlidingSync)> {
1002        let server = MockServer::start().await;
1003        let client = logged_in_client(Some(server.uri())).await;
1004
1005        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1006
1007        for list in lists {
1008            sliding_sync_builder = sliding_sync_builder.add_list(list);
1009        }
1010
1011        let sliding_sync = sliding_sync_builder.build().await?;
1012
1013        Ok((server, sliding_sync))
1014    }
1015
1016    #[async_test]
1017    async fn test_subscribe_to_rooms() -> Result<()> {
1018        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1019            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1020        .await?;
1021
1022        let stream = sliding_sync.sync();
1023        pin_mut!(stream);
1024
1025        let room_id_0 = room_id!("!r0:bar.org");
1026        let room_id_1 = room_id!("!r1:bar.org");
1027        let room_id_2 = room_id!("!r2:bar.org");
1028
1029        {
1030            let _mock_guard = Mock::given(SlidingSyncMatcher)
1031                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1032                    "pos": "1",
1033                    "lists": {},
1034                    "rooms": {
1035                        room_id_0: {
1036                            "name": "Room #0",
1037                            "initial": true,
1038                        },
1039                        room_id_1: {
1040                            "name": "Room #1",
1041                            "initial": true,
1042                        },
1043                        room_id_2: {
1044                            "name": "Room #2",
1045                            "initial": true,
1046                        },
1047                    }
1048                })))
1049                .mount_as_scoped(&server)
1050                .await;
1051
1052            let _ = stream.next().await.unwrap()?;
1053        }
1054
1055        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1056
1057        // Members aren't synced.
1058        // We need to make them synced, so that we can test that subscribing to a room
1059        // make members not synced. That's a desired feature.
1060        assert!(room0.are_members_synced().not());
1061
1062        {
1063            struct MemberMatcher(OwnedRoomId);
1064
1065            impl Match for MemberMatcher {
1066                fn matches(&self, request: &Request) -> bool {
1067                    request.url.path()
1068                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1069                        && request.method == Method::GET
1070                }
1071            }
1072
1073            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1074                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1075                    "chunk": [],
1076                })))
1077                .mount_as_scoped(&server)
1078                .await;
1079
1080            assert_matches!(room0.request_members().await, Ok(()));
1081        }
1082
1083        // Members are now synced! We can start subscribing and see how it goes.
1084        assert!(room0.are_members_synced());
1085
1086        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1087
1088        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1089        // now marked as not synced.
1090        assert!(room0.are_members_synced().not());
1091
1092        {
1093            let sticky = sliding_sync.inner.sticky.read().unwrap();
1094            let room_subscriptions = &sticky.data().room_subscriptions;
1095
1096            assert!(room_subscriptions.contains_key(room_id_0));
1097            assert!(room_subscriptions.contains_key(room_id_1));
1098            assert!(!room_subscriptions.contains_key(room_id_2));
1099        }
1100
1101        // Subscribing to the same room doesn't reset the member sync state.
1102
1103        {
1104            struct MemberMatcher(OwnedRoomId);
1105
1106            impl Match for MemberMatcher {
1107                fn matches(&self, request: &Request) -> bool {
1108                    request.url.path()
1109                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1110                        && request.method == Method::GET
1111                }
1112            }
1113
1114            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1115                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1116                    "chunk": [],
1117                })))
1118                .mount_as_scoped(&server)
1119                .await;
1120
1121            assert_matches!(room0.request_members().await, Ok(()));
1122        }
1123
1124        // Members are synced, good, good.
1125        assert!(room0.are_members_synced());
1126
1127        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1128
1129        // Members are still synced: because we have already subscribed to the
1130        // room, the members aren't marked as unsynced.
1131        assert!(room0.are_members_synced());
1132
1133        Ok(())
1134    }
1135
1136    #[async_test]
1137    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1138        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1139            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1140        .await?;
1141
1142        let room_id_0 = room_id!("!r0:bar.org");
1143        let room_id_1 = room_id!("!r1:bar.org");
1144        let room_id_2 = room_id!("!r2:bar.org");
1145
1146        // Subscribe to two rooms.
1147        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1148
1149        {
1150            let sticky = sliding_sync.inner.sticky.read().unwrap();
1151            let room_subscriptions = &sticky.data().room_subscriptions;
1152
1153            assert!(room_subscriptions.contains_key(room_id_0));
1154            assert!(room_subscriptions.contains_key(room_id_1));
1155            assert!(room_subscriptions.contains_key(room_id_2).not());
1156        }
1157
1158        // Subscribe to one more room.
1159        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1160
1161        {
1162            let sticky = sliding_sync.inner.sticky.read().unwrap();
1163            let room_subscriptions = &sticky.data().room_subscriptions;
1164
1165            assert!(room_subscriptions.contains_key(room_id_0));
1166            assert!(room_subscriptions.contains_key(room_id_1));
1167            assert!(room_subscriptions.contains_key(room_id_2));
1168        }
1169
1170        // Suddenly, the session expires!
1171        sliding_sync.expire_session().await;
1172
1173        {
1174            let sticky = sliding_sync.inner.sticky.read().unwrap();
1175            let room_subscriptions = &sticky.data().room_subscriptions;
1176
1177            assert!(room_subscriptions.is_empty());
1178        }
1179
1180        // Subscribe to one room again.
1181        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1182
1183        {
1184            let sticky = sliding_sync.inner.sticky.read().unwrap();
1185            let room_subscriptions = &sticky.data().room_subscriptions;
1186
1187            assert!(room_subscriptions.contains_key(room_id_0).not());
1188            assert!(room_subscriptions.contains_key(room_id_1).not());
1189            assert!(room_subscriptions.contains_key(room_id_2));
1190        }
1191
1192        Ok(())
1193    }
1194
1195    #[async_test]
1196    async fn test_to_device_token_properly_cached() -> Result<()> {
1197        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1198            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1199        .await?;
1200
1201        // FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved
1202        // in the crypto store since PR #2323.
1203        let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1204        assert!(frozen.to_device_since.is_none());
1205
1206        Ok(())
1207    }
1208
1209    #[async_test]
1210    async fn test_add_list() -> Result<()> {
1211        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1212            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1213        .await?;
1214
1215        let _stream = sliding_sync.sync();
1216        pin_mut!(_stream);
1217
1218        sliding_sync
1219            .add_list(
1220                SlidingSyncList::builder("bar")
1221                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1222            )
1223            .await?;
1224
1225        let lists = sliding_sync.inner.lists.read().await;
1226
1227        assert!(lists.contains_key("foo"));
1228        assert!(lists.contains_key("bar"));
1229
1230        // this test also ensures that Tokio is not panicking when calling `add_list`.
1231
1232        Ok(())
1233    }
1234
1235    #[test]
1236    fn test_sticky_parameters_api_invalidated_flow() {
1237        let r0 = room_id!("!r0.matrix.org");
1238        let r1 = room_id!("!r1:matrix.org");
1239
1240        let mut room_subscriptions = BTreeMap::new();
1241        room_subscriptions.insert(r0.to_owned(), Default::default());
1242
1243        // At first it's invalidated.
1244        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1245            room_subscriptions,
1246            Default::default(),
1247        ));
1248        assert!(sticky.is_invalidated());
1249
1250        // Then when we create a request, the sticky parameters are applied.
1251        let txn_id: &TransactionId = "tid123".into();
1252
1253        let mut request = http::Request::default();
1254        request.txn_id = Some(txn_id.to_string());
1255
1256        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1257
1258        assert!(request.txn_id.is_some());
1259        assert_eq!(request.room_subscriptions.len(), 1);
1260        assert!(request.room_subscriptions.contains_key(r0));
1261
1262        let tid = request.txn_id.unwrap();
1263
1264        sticky.maybe_commit(tid.as_str().into());
1265        assert!(!sticky.is_invalidated());
1266
1267        // Applying new parameters will invalidate again.
1268        sticky
1269            .data_mut()
1270            .room_subscriptions
1271            .insert(r1.to_owned(), (Default::default(), Default::default()));
1272        assert!(sticky.is_invalidated());
1273
1274        // Committing with the wrong transaction id will keep it invalidated.
1275        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1276        assert!(sticky.is_invalidated());
1277
1278        // Restarting a request will only remember the last generated transaction id.
1279        let txn_id1: &TransactionId = "tid456".into();
1280        let mut request1 = http::Request::default();
1281        request1.txn_id = Some(txn_id1.to_string());
1282        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1283
1284        assert!(sticky.is_invalidated());
1285        // The first room subscription has been applied to `request`, so it's not
1286        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1287        // related to the sticky design.
1288        assert_eq!(request1.room_subscriptions.len(), 1);
1289        assert!(request1.room_subscriptions.contains_key(r1));
1290
1291        let txn_id2: &TransactionId = "tid789".into();
1292        let mut request2 = http::Request::default();
1293        request2.txn_id = Some(txn_id2.to_string());
1294
1295        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1296        assert!(sticky.is_invalidated());
1297        // `request2` contains `r1` because the sticky parameters have not been
1298        // committed, so it's still marked as pending.
1299        assert_eq!(request2.room_subscriptions.len(), 1);
1300        assert!(request2.room_subscriptions.contains_key(r1));
1301
1302        // Here we commit with the not most-recent TID, so it keeps the invalidated
1303        // status.
1304        sticky.maybe_commit(txn_id1);
1305        assert!(sticky.is_invalidated());
1306
1307        // But here we use the latest TID, so the commit is effective.
1308        sticky.maybe_commit(txn_id2);
1309        assert!(!sticky.is_invalidated());
1310    }
1311
1312    #[test]
1313    fn test_room_subscriptions_are_sticky() {
1314        let r0 = room_id!("!r0.matrix.org");
1315        let r1 = room_id!("!r1:matrix.org");
1316
1317        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1318            BTreeMap::new(),
1319            Default::default(),
1320        ));
1321
1322        // A room subscription is added, applied, and committed.
1323        {
1324            // Insert `r0`.
1325            sticky
1326                .data_mut()
1327                .room_subscriptions
1328                .insert(r0.to_owned(), (Default::default(), Default::default()));
1329
1330            // Then the sticky parameters are applied.
1331            let txn_id: &TransactionId = "tid0".into();
1332            let mut request = http::Request::default();
1333            request.txn_id = Some(txn_id.to_string());
1334
1335            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1336
1337            assert!(request.txn_id.is_some());
1338            assert_eq!(request.room_subscriptions.len(), 1);
1339            assert!(request.room_subscriptions.contains_key(r0));
1340
1341            // Then the sticky parameters are committed.
1342            let tid = request.txn_id.unwrap();
1343
1344            sticky.maybe_commit(tid.as_str().into());
1345        }
1346
1347        // A room subscription is added, applied, but NOT committed.
1348        {
1349            // Insert `r1`.
1350            sticky
1351                .data_mut()
1352                .room_subscriptions
1353                .insert(r1.to_owned(), (Default::default(), Default::default()));
1354
1355            // Then the sticky parameters are applied.
1356            let txn_id: &TransactionId = "tid1".into();
1357            let mut request = http::Request::default();
1358            request.txn_id = Some(txn_id.to_string());
1359
1360            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1361
1362            assert!(request.txn_id.is_some());
1363            assert_eq!(request.room_subscriptions.len(), 1);
1364            // `r0` is not present, it's only `r1`.
1365            assert!(request.room_subscriptions.contains_key(r1));
1366
1367            // Then the sticky parameters are NOT committed.
1368            // It can happen if the request has failed to be sent for example,
1369            // or if the response didn't match.
1370        }
1371
1372        // A previously added room subscription is re-added, applied, and committed.
1373        {
1374            // Then the sticky parameters are applied.
1375            let txn_id: &TransactionId = "tid2".into();
1376            let mut request = http::Request::default();
1377            request.txn_id = Some(txn_id.to_string());
1378
1379            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1380
1381            assert!(request.txn_id.is_some());
1382            assert_eq!(request.room_subscriptions.len(), 1);
1383            // `r0` is not present, it's only `r1`.
1384            assert!(request.room_subscriptions.contains_key(r1));
1385
1386            // Then the sticky parameters are committed.
1387            let tid = request.txn_id.unwrap();
1388
1389            sticky.maybe_commit(tid.as_str().into());
1390        }
1391
1392        // All room subscriptions have been committed.
1393        {
1394            // Then the sticky parameters are applied.
1395            let txn_id: &TransactionId = "tid3".into();
1396            let mut request = http::Request::default();
1397            request.txn_id = Some(txn_id.to_string());
1398
1399            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1400
1401            assert!(request.txn_id.is_some());
1402            // All room subscriptions have been sent.
1403            assert!(request.room_subscriptions.is_empty());
1404        }
1405    }
1406
1407    #[test]
1408    fn test_extensions_are_sticky() {
1409        let mut extensions = http::request::Extensions::default();
1410        extensions.account_data.enabled = Some(true);
1411
1412        // At first it's invalidated.
1413        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1414            Default::default(),
1415            extensions,
1416        ));
1417
1418        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1419
1420        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1421        // to-device.
1422        let extensions = &sticky.data().extensions;
1423        assert_eq!(extensions.e2ee.enabled, None);
1424        assert_eq!(extensions.to_device.enabled, None);
1425        assert_eq!(extensions.to_device.since, None);
1426
1427        // What the user explicitly enabled is… enabled.
1428        assert_eq!(extensions.account_data.enabled, Some(true));
1429
1430        let txn_id: &TransactionId = "tid123".into();
1431        let mut request = http::Request::default();
1432        request.txn_id = Some(txn_id.to_string());
1433        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1434        assert!(sticky.is_invalidated());
1435        assert_eq!(request.extensions.to_device.enabled, None);
1436        assert_eq!(request.extensions.to_device.since, None);
1437        assert_eq!(request.extensions.e2ee.enabled, None);
1438        assert_eq!(request.extensions.account_data.enabled, Some(true));
1439    }
1440
1441    #[async_test]
1442    async fn test_sticky_extensions_plus_since() -> Result<()> {
1443        let server = MockServer::start().await;
1444        let client = logged_in_client(Some(server.uri())).await;
1445
1446        let sync = client
1447            .sliding_sync("test-slidingsync")?
1448            .add_list(SlidingSyncList::builder("new_list"))
1449            .build()
1450            .await?;
1451
1452        // No extensions have been explicitly enabled here.
1453        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1454        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1455        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1456
1457        // Now enable e2ee and to-device.
1458        let sync = client
1459            .sliding_sync("test-slidingsync")?
1460            .add_list(SlidingSyncList::builder("new_list"))
1461            .with_to_device_extension(
1462                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1463            )
1464            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1465            .build()
1466            .await?;
1467
1468        // Even without a since token, the first request will contain the extensions
1469        // configuration, at least.
1470        let txn_id = TransactionId::new();
1471        let (request, _, _) = sync
1472            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1473            .await?;
1474
1475        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1476        assert_eq!(request.extensions.to_device.enabled, Some(true));
1477        assert!(request.extensions.to_device.since.is_none());
1478
1479        {
1480            // Committing with another transaction id doesn't validate anything.
1481            let mut sticky = sync.inner.sticky.write().unwrap();
1482            assert!(sticky.is_invalidated());
1483            sticky.maybe_commit(
1484                "hopefully the rng won't generate this very specific transaction id".into(),
1485            );
1486            assert!(sticky.is_invalidated());
1487        }
1488
1489        // Regenerating a request will yield the same one.
1490        let txn_id2 = TransactionId::new();
1491        let (request, _, _) = sync
1492            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1493            .await?;
1494
1495        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1496        assert_eq!(request.extensions.to_device.enabled, Some(true));
1497        assert!(request.extensions.to_device.since.is_none());
1498
1499        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1500
1501        {
1502            // Committing with the expected transaction id will validate it.
1503            let mut sticky = sync.inner.sticky.write().unwrap();
1504            assert!(sticky.is_invalidated());
1505            sticky.maybe_commit(txn_id2.as_str().into());
1506            assert!(!sticky.is_invalidated());
1507        }
1508
1509        // The next request should contain no sticky parameters.
1510        let txn_id = TransactionId::new();
1511        let (request, _, _) = sync
1512            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1513            .await?;
1514        assert!(request.extensions.e2ee.enabled.is_none());
1515        assert!(request.extensions.to_device.enabled.is_none());
1516        assert!(request.extensions.to_device.since.is_none());
1517
1518        // If there's a to-device `since` token, we make sure we put the token
1519        // into the extension config. The rest doesn't need to be re-enabled due to
1520        // stickiness.
1521        let _since_token = "since";
1522
1523        #[cfg(feature = "e2e-encryption")]
1524        {
1525            use matrix_sdk_base::crypto::store::Changes;
1526            if let Some(olm_machine) = &*client.olm_machine().await {
1527                olm_machine
1528                    .store()
1529                    .save_changes(Changes {
1530                        next_batch_token: Some(_since_token.to_owned()),
1531                        ..Default::default()
1532                    })
1533                    .await?;
1534            }
1535        }
1536
1537        let txn_id = TransactionId::new();
1538        let (request, _, _) = sync
1539            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1540            .await?;
1541
1542        assert!(request.extensions.e2ee.enabled.is_none());
1543        assert!(request.extensions.to_device.enabled.is_none());
1544
1545        #[cfg(feature = "e2e-encryption")]
1546        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1547
1548        Ok(())
1549    }
1550
1551    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1552    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1553    // `/key/query` requests must be sent. See the code to see the details.
1554    //
1555    // This test is asserting that.
1556    #[async_test]
1557    #[cfg(feature = "e2e-encryption")]
1558    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1559        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1560        use matrix_sdk_test::ruma_response_from_json;
1561        use ruma::user_id;
1562
1563        let server = MockServer::start().await;
1564        let client = logged_in_client(Some(server.uri())).await;
1565
1566        let alice = user_id!("@alice:localhost");
1567        let bob = user_id!("@bob:localhost");
1568        let me = user_id!("@example:localhost");
1569
1570        // Track and mark users are not dirty, so that we can check they are “dirty”
1571        // after that. Dirty here means that a `/key/query` must be sent.
1572        {
1573            let olm_machine = client.olm_machine().await;
1574            let olm_machine = olm_machine.as_ref().unwrap();
1575
1576            olm_machine.update_tracked_users([alice, bob]).await?;
1577
1578            // Assert requests.
1579            let outgoing_requests = olm_machine.outgoing_requests().await?;
1580
1581            assert_eq!(outgoing_requests.len(), 2);
1582            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1583            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1584
1585            // Fake responses.
1586            olm_machine
1587                .mark_request_as_sent(
1588                    outgoing_requests[0].request_id(),
1589                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1590                        "one_time_key_counts": {}
1591                    }))),
1592                )
1593                .await?;
1594
1595            olm_machine
1596                .mark_request_as_sent(
1597                    outgoing_requests[1].request_id(),
1598                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1599                        "device_keys": {
1600                            alice: {},
1601                            bob: {},
1602                        }
1603                    }))),
1604                )
1605                .await?;
1606
1607            // Once more.
1608            let outgoing_requests = olm_machine.outgoing_requests().await?;
1609
1610            assert_eq!(outgoing_requests.len(), 1);
1611            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1612
1613            olm_machine
1614                .mark_request_as_sent(
1615                    outgoing_requests[0].request_id(),
1616                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1617                        "device_keys": {
1618                            me: {},
1619                        }
1620                    }))),
1621                )
1622                .await?;
1623
1624            // No more.
1625            let outgoing_requests = olm_machine.outgoing_requests().await?;
1626
1627            assert!(outgoing_requests.is_empty());
1628        }
1629
1630        let sync = client
1631            .sliding_sync("test-slidingsync")?
1632            .add_list(SlidingSyncList::builder("new_list"))
1633            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1634            .build()
1635            .await?;
1636
1637        // First request: no `pos`.
1638        let txn_id = TransactionId::new();
1639        let (_request, _, _) = sync
1640            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1641            .await?;
1642
1643        // Now, tracked users must be dirty.
1644        {
1645            let olm_machine = client.olm_machine().await;
1646            let olm_machine = olm_machine.as_ref().unwrap();
1647
1648            // Assert requests.
1649            let outgoing_requests = olm_machine.outgoing_requests().await?;
1650
1651            assert_eq!(outgoing_requests.len(), 1);
1652            assert_matches!(
1653                outgoing_requests[0].request(),
1654                AnyOutgoingRequest::KeysQuery(request) => {
1655                    assert!(request.device_keys.contains_key(alice));
1656                    assert!(request.device_keys.contains_key(bob));
1657                    assert!(request.device_keys.contains_key(me));
1658                }
1659            );
1660
1661            // Fake responses.
1662            olm_machine
1663                .mark_request_as_sent(
1664                    outgoing_requests[0].request_id(),
1665                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1666                        "device_keys": {
1667                            alice: {},
1668                            bob: {},
1669                            me: {},
1670                        }
1671                    }))),
1672                )
1673                .await?;
1674        }
1675
1676        // Second request: with a `pos` this time.
1677        sync.set_pos("chocolat".to_owned()).await;
1678
1679        let txn_id = TransactionId::new();
1680        let (_request, _, _) = sync
1681            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1682            .await?;
1683
1684        // Tracked users are not marked as dirty.
1685        {
1686            let olm_machine = client.olm_machine().await;
1687            let olm_machine = olm_machine.as_ref().unwrap();
1688
1689            // Assert requests.
1690            let outgoing_requests = olm_machine.outgoing_requests().await?;
1691
1692            assert!(outgoing_requests.is_empty());
1693        }
1694
1695        Ok(())
1696    }
1697
1698    #[async_test]
1699    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1700        let server = MockServer::start().await;
1701        let client = logged_in_client(Some(server.uri())).await;
1702
1703        let sliding_sync = client
1704            .sliding_sync("test-slidingsync")?
1705            .with_to_device_extension(
1706                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1707            )
1708            .build()
1709            .await?;
1710
1711        // First request asks to enable the extension.
1712        let (request, _, _) =
1713            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1714        assert!(request.extensions.to_device.enabled.is_some());
1715
1716        let sync = sliding_sync.sync();
1717        pin_mut!(sync);
1718
1719        // `pos` is `None` to start with.
1720        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1721
1722        #[derive(Deserialize)]
1723        struct PartialRequest {
1724            txn_id: Option<String>,
1725        }
1726
1727        {
1728            let _mock_guard = Mock::given(SlidingSyncMatcher)
1729                .respond_with(|request: &Request| {
1730                    // Repeat the txn_id in the response, if set.
1731                    let request: PartialRequest = request.body_json().unwrap();
1732
1733                    ResponseTemplate::new(200).set_body_json(json!({
1734                        "txn_id": request.txn_id,
1735                        "pos": "0",
1736                    }))
1737                })
1738                .mount_as_scoped(&server)
1739                .await;
1740
1741            let next = sync.next().await;
1742            assert_matches!(next, Some(Ok(_update_summary)));
1743
1744            // `pos` has been updated.
1745            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1746        }
1747
1748        // Next request doesn't ask to enable the extension.
1749        let (request, _, _) =
1750            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1751        assert!(request.extensions.to_device.enabled.is_none());
1752
1753        // Next request is successful.
1754        {
1755            let _mock_guard = Mock::given(SlidingSyncMatcher)
1756                .respond_with(|request: &Request| {
1757                    // Repeat the txn_id in the response, if set.
1758                    let request: PartialRequest = request.body_json().unwrap();
1759
1760                    ResponseTemplate::new(200).set_body_json(json!({
1761                        "txn_id": request.txn_id,
1762                        "pos": "1",
1763                    }))
1764                })
1765                .mount_as_scoped(&server)
1766                .await;
1767
1768            let next = sync.next().await;
1769            assert_matches!(next, Some(Ok(_update_summary)));
1770
1771            // `pos` has been updated.
1772            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1773        }
1774
1775        // Next request is successful despite it receives an already
1776        // received `pos` from the server.
1777        {
1778            let _mock_guard = Mock::given(SlidingSyncMatcher)
1779                .respond_with(|request: &Request| {
1780                    // Repeat the txn_id in the response, if set.
1781                    let request: PartialRequest = request.body_json().unwrap();
1782
1783                    ResponseTemplate::new(200).set_body_json(json!({
1784                        "txn_id": request.txn_id,
1785                        "pos": "0", // <- already received!
1786                    }))
1787                })
1788                .up_to_n_times(1) // run this mock only once.
1789                .mount_as_scoped(&server)
1790                .await;
1791
1792            let next = sync.next().await;
1793            assert_matches!(next, Some(Ok(_update_summary)));
1794
1795            // `pos` has been updated.
1796            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1797        }
1798
1799        // Stop responding with successful requests!
1800        //
1801        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1802        // so they're reset. It also resets the `pos`.
1803        {
1804            let _mock_guard = Mock::given(SlidingSyncMatcher)
1805                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1806                    "error": "foo",
1807                    "errcode": "M_UNKNOWN_POS",
1808                })))
1809                .mount_as_scoped(&server)
1810                .await;
1811
1812            let next = sync.next().await;
1813
1814            // The expected error is returned.
1815            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1816
1817            // `pos` has been reset.
1818            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1819
1820            // Next request asks to enable the extension again.
1821            let (request, _, _) =
1822                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1823
1824            assert!(request.extensions.to_device.enabled.is_some());
1825
1826            // `sync` has been stopped.
1827            assert!(sync.next().await.is_none());
1828        }
1829
1830        Ok(())
1831    }
1832
1833    #[cfg(feature = "e2e-encryption")]
1834    #[async_test]
1835    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1836        let server = MockServer::start().await;
1837
1838        #[derive(Deserialize)]
1839        struct PartialRequest {
1840            txn_id: Option<String>,
1841        }
1842
1843        let server_pos = Arc::new(Mutex::new(0));
1844        let _mock_guard = Mock::given(SlidingSyncMatcher)
1845            .respond_with(move |request: &Request| {
1846                // Repeat the txn_id in the response, if set.
1847                let request: PartialRequest = request.body_json().unwrap();
1848                let pos = {
1849                    let mut pos = server_pos.lock().unwrap();
1850                    let prev = *pos;
1851                    *pos += 1;
1852                    prev
1853                };
1854
1855                ResponseTemplate::new(200).set_body_json(json!({
1856                    "txn_id": request.txn_id,
1857                    "pos": pos.to_string(),
1858                }))
1859            })
1860            .mount_as_scoped(&server)
1861            .await;
1862
1863        let client = logged_in_client(Some(server.uri())).await;
1864
1865        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1866
1867        // `pos` is `None` to start with.
1868        {
1869            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1870
1871            let (request, _, _) =
1872                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1873            assert!(request.pos.is_none());
1874        }
1875
1876        let sync = sliding_sync.sync();
1877        pin_mut!(sync);
1878
1879        // Sync goes well, and then the position is saved both into the internal memory
1880        // and the database.
1881        let next = sync.next().await;
1882        assert_matches!(next, Some(Ok(_update_summary)));
1883
1884        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1885
1886        let restored_fields = restore_sliding_sync_state(
1887            &client,
1888            &sliding_sync.inner.storage_key,
1889            &*sliding_sync.inner.lists.read().await,
1890        )
1891        .await?
1892        .expect("must have restored fields");
1893
1894        // While it has been saved into the database, it's not necessarily going to be
1895        // used later!
1896        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1897
1898        // Now, even if we mess with the position stored in the database, the sliding
1899        // sync instance isn't configured to reload the stream position from the
1900        // database, so it won't be changed.
1901        {
1902            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1903
1904            let mut position_guard = other_sync.inner.position.lock().await;
1905            position_guard.pos = Some("yolo".to_owned());
1906
1907            other_sync.cache_to_storage(&position_guard).await?;
1908        }
1909
1910        // It's still 0, not "yolo".
1911        {
1912            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1913            let (request, _, _) =
1914                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1915            assert_eq!(request.pos.as_deref(), Some("0"));
1916        }
1917
1918        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1919        // asked to.
1920        {
1921            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1922            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1923        }
1924
1925        Ok(())
1926    }
1927
1928    #[cfg(feature = "e2e-encryption")]
1929    #[async_test]
1930    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1931        let server = MockServer::start().await;
1932
1933        #[derive(Deserialize)]
1934        struct PartialRequest {
1935            txn_id: Option<String>,
1936        }
1937
1938        let server_pos = Arc::new(Mutex::new(0));
1939        let _mock_guard = Mock::given(SlidingSyncMatcher)
1940            .respond_with(move |request: &Request| {
1941                // Repeat the txn_id in the response, if set.
1942                let request: PartialRequest = request.body_json().unwrap();
1943                let pos = {
1944                    let mut pos = server_pos.lock().unwrap();
1945                    let prev = *pos;
1946                    *pos += 1;
1947                    prev
1948                };
1949
1950                ResponseTemplate::new(200).set_body_json(json!({
1951                    "txn_id": request.txn_id,
1952                    "pos": pos.to_string(),
1953                }))
1954            })
1955            .mount_as_scoped(&server)
1956            .await;
1957
1958        let client = logged_in_client(Some(server.uri())).await;
1959
1960        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1961
1962        // `pos` is `None` to start with.
1963        {
1964            let (request, _, _) =
1965                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1966
1967            assert!(request.pos.is_none());
1968            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1969        }
1970
1971        let sync = sliding_sync.sync();
1972        pin_mut!(sync);
1973
1974        // Sync goes well, and then the position is saved both into the internal memory
1975        // and the database.
1976        let next = sync.next().await;
1977        assert_matches!(next, Some(Ok(_update_summary)));
1978
1979        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1980
1981        let restored_fields = restore_sliding_sync_state(
1982            &client,
1983            &sliding_sync.inner.storage_key,
1984            &*sliding_sync.inner.lists.read().await,
1985        )
1986        .await?
1987        .expect("must have restored fields");
1988
1989        // While it has been saved into the database, it's not necessarily going to be
1990        // used later!
1991        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1992
1993        // Another process modifies the stream position under our feet...
1994        {
1995            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1996
1997            let mut position_guard = other_sync.inner.position.lock().await;
1998            position_guard.pos = Some("42".to_owned());
1999
2000            other_sync.cache_to_storage(&position_guard).await?;
2001        }
2002
2003        // It's alright, the next request will load it from the database.
2004        {
2005            let (request, _, _) =
2006                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2007            assert_eq!(request.pos.as_deref(), Some("42"));
2008            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2009        }
2010
2011        // Recreating a sliding sync with the same ID will reload it too.
2012        {
2013            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2014            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2015
2016            let (request, _, _) =
2017                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2018            assert_eq!(request.pos.as_deref(), Some("42"));
2019        }
2020
2021        // Invalidating the session will remove the in-memory value AND the database
2022        // value.
2023        sliding_sync.expire_session().await;
2024
2025        {
2026            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2027
2028            let (request, _, _) =
2029                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2030            assert!(request.pos.is_none());
2031        }
2032
2033        // And new sliding syncs with the same ID won't find it either.
2034        {
2035            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2036            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2037
2038            let (request, _, _) =
2039                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2040            assert!(request.pos.is_none());
2041        }
2042
2043        Ok(())
2044    }
2045
2046    #[async_test]
2047    async fn test_stop_sync_loop() -> Result<()> {
2048        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2049            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2050        .await?;
2051
2052        // Start the sync loop.
2053        let stream = sliding_sync.sync();
2054        pin_mut!(stream);
2055
2056        // The sync loop is actually running.
2057        assert!(stream.next().await.is_some());
2058
2059        // Stop the sync loop.
2060        sliding_sync.stop_sync()?;
2061
2062        // The sync loop is actually stopped.
2063        assert!(stream.next().await.is_none());
2064
2065        // Start a new sync loop.
2066        let stream = sliding_sync.sync();
2067        pin_mut!(stream);
2068
2069        // The sync loop is actually running.
2070        assert!(stream.next().await.is_some());
2071
2072        Ok(())
2073    }
2074
2075    #[async_test]
2076    async fn test_process_read_receipts() -> Result<()> {
2077        let room = owned_room_id!("!pony:example.org");
2078
2079        let server = MockServer::start().await;
2080        let client = logged_in_client(Some(server.uri())).await;
2081
2082        let sliding_sync = client
2083            .sliding_sync("test")?
2084            .with_receipt_extension(
2085                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2086            )
2087            .add_list(
2088                SlidingSyncList::builder("all")
2089                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2090            )
2091            .build()
2092            .await?;
2093
2094        // Initial state.
2095        {
2096            let server_response = assign!(http::Response::new("0".to_owned()), {
2097                rooms: BTreeMap::from([(
2098                    room.clone(),
2099                    http::response::Room::default(),
2100                )])
2101            });
2102
2103            let _summary = {
2104                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2105                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2106            };
2107        }
2108
2109        let server_response = assign!(http::Response::new("1".to_owned()), {
2110            extensions: assign!(http::response::Extensions::default(), {
2111                receipts: assign!(http::response::Receipts::default(), {
2112                    rooms: BTreeMap::from([
2113                        (
2114                            room.clone(),
2115                            Raw::from_json_string(
2116                                json!({
2117                                    "room_id": room,
2118                                    "type": "m.receipt",
2119                                    "content": {
2120                                        "$event:bar.org": {
2121                                            "m.read": {
2122                                                client.user_id().unwrap(): {
2123                                                    "ts": 1436451550,
2124                                                }
2125                                            }
2126                                        }
2127                                    }
2128                                })
2129                                .to_string(),
2130                            ).unwrap()
2131                        )
2132                    ])
2133                })
2134            })
2135        });
2136
2137        let summary = {
2138            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2139            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2140        };
2141
2142        assert!(summary.rooms.contains(&room));
2143
2144        Ok(())
2145    }
2146
2147    #[async_test]
2148    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2149        let room_id = owned_room_id!("!unicorn:example.org");
2150
2151        let server = MockServer::start().await;
2152        let client = logged_in_client(Some(server.uri())).await;
2153
2154        // Setup sliding sync with with one room and one list
2155
2156        let sliding_sync = client
2157            .sliding_sync("test")?
2158            .with_account_data_extension(
2159                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2160            )
2161            .add_list(
2162                SlidingSyncList::builder("all")
2163                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2164            )
2165            .build()
2166            .await?;
2167
2168        // Initial state.
2169        {
2170            let server_response = assign!(http::Response::new("0".to_owned()), {
2171                rooms: BTreeMap::from([(
2172                    room_id.clone(),
2173                    http::response::Room::default(),
2174                )])
2175            });
2176
2177            let _summary = {
2178                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2179                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2180            };
2181        }
2182
2183        // Simulate a response that only changes the marked unread state of the room to
2184        // true
2185
2186        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2187
2188        let update_summary = {
2189            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2190            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2191        };
2192
2193        // Check that the list list and entry received the update
2194
2195        assert!(update_summary.rooms.contains(&room_id));
2196
2197        let room = client.get_room(&room_id).unwrap();
2198
2199        // Check the actual room data, this powers RoomInfo
2200
2201        assert!(room.is_marked_unread());
2202
2203        // Change it back to false and check if it updates
2204
2205        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2206
2207        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2208        sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?;
2209
2210        let room = client.get_room(&room_id).unwrap();
2211
2212        assert!(!room.is_marked_unread());
2213
2214        Ok(())
2215    }
2216
2217    fn make_mark_unread_response(
2218        response_number: &str,
2219        room_id: OwnedRoomId,
2220        unread: bool,
2221        add_rooms_section: bool,
2222    ) -> http::Response {
2223        let rooms = if add_rooms_section {
2224            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2225        } else {
2226            BTreeMap::new()
2227        };
2228
2229        let extensions = assign!(http::response::Extensions::default(), {
2230            account_data: assign!(http::response::AccountData::default(), {
2231                rooms: BTreeMap::from([
2232                    (
2233                        room_id,
2234                        vec![
2235                            Raw::from_json_string(
2236                                json!({
2237                                    "content": {
2238                                        "unread": unread
2239                                    },
2240                                    "type": "com.famedly.marked_unread"
2241                                })
2242                                .to_string(),
2243                            ).unwrap()
2244                        ]
2245                    )
2246                ])
2247            })
2248        });
2249
2250        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2251    }
2252
2253    #[async_test]
2254    async fn test_process_rooms_account_data() -> Result<()> {
2255        let room = owned_room_id!("!pony:example.org");
2256
2257        let server = MockServer::start().await;
2258        let client = logged_in_client(Some(server.uri())).await;
2259
2260        let sliding_sync = client
2261            .sliding_sync("test")?
2262            .with_account_data_extension(
2263                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2264            )
2265            .add_list(
2266                SlidingSyncList::builder("all")
2267                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2268            )
2269            .build()
2270            .await?;
2271
2272        // Initial state.
2273        {
2274            let server_response = assign!(http::Response::new("0".to_owned()), {
2275                rooms: BTreeMap::from([(
2276                    room.clone(),
2277                    http::response::Room::default(),
2278                )])
2279            });
2280
2281            let _summary = {
2282                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2283                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2284            };
2285        }
2286
2287        let server_response = assign!(http::Response::new("1".to_owned()), {
2288            extensions: assign!(http::response::Extensions::default(), {
2289                account_data: assign!(http::response::AccountData::default(), {
2290                    rooms: BTreeMap::from([
2291                        (
2292                            room.clone(),
2293                            vec![
2294                                Raw::from_json_string(
2295                                    json!({
2296                                        "content": {
2297                                            "tags": {
2298                                                "u.work": {
2299                                                    "order": 0.9
2300                                                }
2301                                            }
2302                                        },
2303                                        "type": "m.tag"
2304                                    })
2305                                    .to_string(),
2306                                ).unwrap()
2307                            ]
2308                        )
2309                    ])
2310                })
2311            })
2312        });
2313        let summary = {
2314            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2315            sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2316        };
2317
2318        assert!(summary.rooms.contains(&room));
2319
2320        Ok(())
2321    }
2322
2323    #[async_test]
2324    #[cfg(feature = "e2e-encryption")]
2325    async fn test_process_only_encryption_events() -> Result<()> {
2326        use ruma::OneTimeKeyAlgorithm;
2327
2328        let room = owned_room_id!("!croissant:example.org");
2329
2330        let server = MockServer::start().await;
2331        let client = logged_in_client(Some(server.uri())).await;
2332
2333        let server_response = assign!(http::Response::new("0".to_owned()), {
2334            rooms: BTreeMap::from([(
2335                room.clone(),
2336                assign!(http::response::Room::default(), {
2337                    name: Some("Croissants lovers".to_owned()),
2338                    timeline: Vec::new(),
2339                }),
2340            )]),
2341
2342            extensions: assign!(http::response::Extensions::default(), {
2343                e2ee: assign!(http::response::E2EE::default(), {
2344                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2345                }),
2346                to_device: Some(assign!(http::response::ToDevice::default(), {
2347                    next_batch: "to-device-token".to_owned(),
2348                })),
2349            })
2350        });
2351
2352        // Don't process non-encryption events if the sliding sync is configured for
2353        // encryption only.
2354
2355        let sliding_sync = client
2356            .sliding_sync("test")?
2357            .with_to_device_extension(
2358                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2359            )
2360            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2361            .build()
2362            .await?;
2363
2364        {
2365            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2366
2367            sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2368        }
2369
2370        // E2EE has been properly handled.
2371        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2372        assert_eq!(uploaded_key_count, 42);
2373
2374        {
2375            let olm_machine = &*client.olm_machine_for_testing().await;
2376            assert_eq!(
2377                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2378                Some("to-device-token")
2379            );
2380        }
2381
2382        // Room events haven't.
2383        assert!(client.get_room(&room).is_none());
2384
2385        // Conversely, only process room lists events if the sliding sync was configured
2386        // as so.
2387        let client = logged_in_client(Some(server.uri())).await;
2388
2389        let sliding_sync = client
2390            .sliding_sync("test")?
2391            .add_list(SlidingSyncList::builder("thelist"))
2392            .build()
2393            .await?;
2394
2395        {
2396            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2397
2398            sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2399        }
2400
2401        // E2EE response has been ignored.
2402        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2403        assert_eq!(uploaded_key_count, 0);
2404
2405        {
2406            let olm_machine = &*client.olm_machine_for_testing().await;
2407            assert_eq!(
2408                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2409                None
2410            );
2411        }
2412
2413        // The room is now known.
2414        assert!(client.get_room(&room).is_some());
2415
2416        // And it's also possible to set up both.
2417        let client = logged_in_client(Some(server.uri())).await;
2418
2419        let sliding_sync = client
2420            .sliding_sync("test")?
2421            .add_list(SlidingSyncList::builder("thelist"))
2422            .with_to_device_extension(
2423                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2424            )
2425            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2426            .build()
2427            .await?;
2428
2429        {
2430            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2431
2432            sliding_sync.handle_response(server_response.clone(), &mut position_guard).await?;
2433        }
2434
2435        // E2EE has been properly handled.
2436        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2437        assert_eq!(uploaded_key_count, 42);
2438
2439        {
2440            let olm_machine = &*client.olm_machine_for_testing().await;
2441            assert_eq!(
2442                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2443                Some("to-device-token")
2444            );
2445        }
2446
2447        // The room is now known.
2448        assert!(client.get_room(&room).is_some());
2449
2450        Ok(())
2451    }
2452
2453    #[async_test]
2454    async fn test_lock_multiple_requests() -> Result<()> {
2455        let server = MockServer::start().await;
2456        let client = logged_in_client(Some(server.uri())).await;
2457
2458        let pos = Arc::new(Mutex::new(0));
2459        let _mock_guard = Mock::given(SlidingSyncMatcher)
2460            .respond_with(move |_: &Request| {
2461                let mut pos = pos.lock().unwrap();
2462                *pos += 1;
2463                ResponseTemplate::new(200).set_body_json(json!({
2464                    "pos": pos.to_string(),
2465                    "lists": {},
2466                    "rooms": {}
2467                }))
2468            })
2469            .mount_as_scoped(&server)
2470            .await;
2471
2472        let sliding_sync = client
2473            .sliding_sync("test")?
2474            .with_to_device_extension(
2475                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2476            )
2477            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2478            .build()
2479            .await?;
2480
2481        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2482        // test would never terminate.
2483        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2484
2485        for result in requests.await {
2486            result?;
2487        }
2488
2489        Ok(())
2490    }
2491
2492    #[async_test]
2493    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2494        let server = MockServer::start().await;
2495        let client = logged_in_client(Some(server.uri())).await;
2496
2497        let pos = Arc::new(Mutex::new(0));
2498        let _mock_guard = Mock::given(SlidingSyncMatcher)
2499            .respond_with(move |_: &Request| {
2500                let mut pos = pos.lock().unwrap();
2501                *pos += 1;
2502                // Respond slowly enough that we can skip one iteration.
2503                ResponseTemplate::new(200)
2504                    .set_body_json(json!({
2505                        "pos": pos.to_string(),
2506                        "lists": {},
2507                        "rooms": {}
2508                    }))
2509                    .set_delay(Duration::from_secs(2))
2510            })
2511            .mount_as_scoped(&server)
2512            .await;
2513
2514        let sliding_sync =
2515            client
2516                .sliding_sync("test")?
2517                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2518                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2519                ))
2520                .add_list(
2521                    SlidingSyncList::builder("another-list")
2522                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2523                )
2524                .build()
2525                .await?;
2526
2527        let stream = sliding_sync.sync();
2528        pin_mut!(stream);
2529
2530        let cloned_sync = sliding_sync.clone();
2531        tokio::spawn(async move {
2532            tokio::time::sleep(Duration::from_millis(100)).await;
2533
2534            cloned_sync
2535                .on_list("another-list", |list| {
2536                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2537                    ready(())
2538                })
2539                .await;
2540        });
2541
2542        assert_matches!(stream.next().await, Some(Ok(_)));
2543
2544        sliding_sync.stop_sync().unwrap();
2545
2546        assert_matches!(stream.next().await, None);
2547
2548        let mut num_requests = 0;
2549
2550        for request in server.received_requests().await.unwrap() {
2551            if !SlidingSyncMatcher.matches(&request) {
2552                continue;
2553            }
2554
2555            let another_list_ranges = if num_requests == 0 {
2556                // First request
2557                json!([[0, 10]])
2558            } else {
2559                // Second request
2560                json!([[10, 20]])
2561            };
2562
2563            num_requests += 1;
2564            assert!(num_requests <= 2, "more than one request hit the server");
2565
2566            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2567
2568            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2569                &json_value,
2570                &json!({
2571                    "conn_id": "test",
2572                    "lists": {
2573                        "room-list": {
2574                            "ranges": [[0, 9]],
2575                            "required_state": [
2576                                ["m.room.encryption", ""],
2577                                ["m.room.tombstone", ""]
2578                            ],
2579                        },
2580                        "another-list": {
2581                            "ranges": another_list_ranges,
2582                            "required_state": [
2583                                ["m.room.encryption", ""],
2584                                ["m.room.tombstone", ""]
2585                            ],
2586                        },
2587                    }
2588                }),
2589                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2590            ) {
2591                dbg!(json_value);
2592                panic!("json differ: {err}");
2593            }
2594        }
2595
2596        assert_eq!(num_requests, 2);
2597
2598        Ok(())
2599    }
2600
2601    #[async_test]
2602    async fn test_timeout_zero_list() -> Result<()> {
2603        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2604
2605        let (request, _, _) =
2606            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2607
2608        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2609        // on new update to pop.
2610        assert!(request.timeout.is_some());
2611
2612        Ok(())
2613    }
2614
2615    #[async_test]
2616    async fn test_timeout_one_list() -> Result<()> {
2617        let (_server, sliding_sync) = new_sliding_sync(vec![
2618            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2619        ])
2620        .await?;
2621
2622        let (request, _, _) =
2623            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2624
2625        // The list does not require a timeout.
2626        assert!(request.timeout.is_none());
2627
2628        // Simulate a response.
2629        {
2630            let server_response = assign!(http::Response::new("0".to_owned()), {
2631                lists: BTreeMap::from([(
2632                    "foo".to_owned(),
2633                    assign!(http::response::List::default(), {
2634                        count: uint!(7),
2635                    })
2636                 )])
2637            });
2638
2639            let _summary = {
2640                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2641                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2642            };
2643        }
2644
2645        let (request, _, _) =
2646            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2647
2648        // The list is now fully loaded, so it requires a timeout.
2649        assert!(request.timeout.is_some());
2650
2651        Ok(())
2652    }
2653
2654    #[async_test]
2655    async fn test_timeout_three_lists() -> Result<()> {
2656        let (_server, sliding_sync) = new_sliding_sync(vec![
2657            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2658            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2659            SlidingSyncList::builder("baz")
2660                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2661        ])
2662        .await?;
2663
2664        let (request, _, _) =
2665            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2666
2667        // Two lists don't require a timeout.
2668        assert!(request.timeout.is_none());
2669
2670        // Simulate a response.
2671        {
2672            let server_response = assign!(http::Response::new("0".to_owned()), {
2673                lists: BTreeMap::from([(
2674                    "foo".to_owned(),
2675                    assign!(http::response::List::default(), {
2676                        count: uint!(7),
2677                    })
2678                 )])
2679            });
2680
2681            let _summary = {
2682                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2683                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2684            };
2685        }
2686
2687        let (request, _, _) =
2688            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2689
2690        // One don't require a timeout.
2691        assert!(request.timeout.is_none());
2692
2693        // Simulate a response.
2694        {
2695            let server_response = assign!(http::Response::new("1".to_owned()), {
2696                lists: BTreeMap::from([(
2697                    "bar".to_owned(),
2698                    assign!(http::response::List::default(), {
2699                        count: uint!(7),
2700                    })
2701                 )])
2702            });
2703
2704            let _summary = {
2705                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2706                sliding_sync.handle_response(server_response.clone(), &mut pos_guard).await?
2707            };
2708        }
2709
2710        let (request, _, _) =
2711            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2712
2713        // All lists require a timeout.
2714        assert!(request.timeout.is_some());
2715
2716        Ok(())
2717    }
2718
2719    #[async_test]
2720    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2721        let server = MockServer::start().await;
2722        let client = logged_in_client(Some(server.uri())).await;
2723
2724        let _mock_guard = Mock::given(SlidingSyncMatcher)
2725            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2726                "pos": "0",
2727                "lists": {},
2728                "rooms": {}
2729            })))
2730            .mount_as_scoped(&server)
2731            .await;
2732
2733        let sliding_sync = client
2734            .sliding_sync("test")?
2735            .with_to_device_extension(
2736                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2737            )
2738            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2739            .build()
2740            .await?;
2741
2742        let sliding_sync = Arc::new(sliding_sync);
2743
2744        // Create the listener and perform a sync request
2745        let sync_beat_listener = client.inner.sync_beat.listen();
2746        sliding_sync.sync_once().await?;
2747
2748        // The sync beat listener should be notified shortly after
2749        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2750        Ok(())
2751    }
2752
2753    #[async_test]
2754    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2755        let server = MockServer::start().await;
2756        let client = logged_in_client(Some(server.uri())).await;
2757
2758        let _mock_guard = Mock::given(SlidingSyncMatcher)
2759            .respond_with(ResponseTemplate::new(404))
2760            .mount_as_scoped(&server)
2761            .await;
2762
2763        let sliding_sync = client
2764            .sliding_sync("test")?
2765            .with_to_device_extension(
2766                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2767            )
2768            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2769            .build()
2770            .await?;
2771
2772        let sliding_sync = Arc::new(sliding_sync);
2773
2774        // Create the listener and perform a sync request
2775        let sync_beat_listener = client.inner.sync_beat.listen();
2776        let sync_result = sliding_sync.sync_once().await;
2777        assert!(sync_result.is_err());
2778
2779        // The sync beat listener won't be notified in this case
2780        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2781
2782        Ok(())
2783    }
2784}