matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26    collections::{btree_map::Entry, BTreeMap},
27    fmt::Debug,
28    future::Future,
29    sync::{Arc, RwLock as StdRwLock},
30    time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42    assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46    select,
47    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53    cache::restore_sliding_sync_state,
54    client::SlidingSyncResponseProcessor,
55    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{config::RequestConfig, Client, Result};
58
59/// The Sliding Sync instance.
60///
61/// It is OK to clone this type as much as you need: cloning it is cheap.
62#[derive(Clone, Debug)]
63pub struct SlidingSync {
64    /// The Sliding Sync data.
65    inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70    /// A unique identifier for this instance of sliding sync.
71    ///
72    /// Used to distinguish different connections to sliding sync.
73    id: String,
74
75    /// The HTTP Matrix client.
76    client: Client,
77
78    /// Long-polling timeout that appears in sliding sync request.
79    poll_timeout: Duration,
80
81    /// Extra duration for the sliding sync request to timeout. This is added to
82    /// the [`Self::poll_timeout`].
83    network_timeout: Duration,
84
85    /// The storage key to keep this cache at and load it from.
86    storage_key: String,
87
88    /// Should this sliding sync instance try to restore its sync position
89    /// from the database?
90    ///
91    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
92    /// keep it even so, to avoid sparkling cfg statements everywhere
93    /// throughout this file.
94    share_pos: bool,
95
96    /// Position markers.
97    ///
98    /// The `pos` marker represents a progression when exchanging requests and
99    /// responses with the server: the server acknowledges the request by
100    /// responding with a new `pos`. If the client sends two non-necessarily
101    /// consecutive requests with the same `pos`, the server has to reply with
102    /// the same identical response.
103    ///
104    /// `position` is behind a mutex so that a new request starts after the
105    /// previous request trip has fully ended (successfully or not). This
106    /// mechanism exists to wait for the response to be handled and to see the
107    /// `position` being updated, before sending a new request.
108    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110    /// The lists of this Sliding Sync instance.
111    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113    /// Request parameters that are sticky.
114    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    /// Whether the current sliding sync instance has set a sync position
127    /// marker.
128    pub async fn has_pos(&self) -> bool {
129        self.inner.position.lock().await.pos.is_some()
130    }
131
132    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
133        cache::store_sliding_sync_state(self, position).await
134    }
135
136    /// Create a new [`SlidingSyncBuilder`].
137    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
138        SlidingSyncBuilder::new(id, client)
139    }
140
141    /// Subscribe to many rooms.
142    ///
143    /// If the associated `Room`s exist, it will be marked as
144    /// members are missing, so that it ensures to re-fetch all members.
145    ///
146    /// A subscription to an already subscribed room is ignored.
147    pub fn subscribe_to_rooms(
148        &self,
149        room_ids: &[&RoomId],
150        settings: Option<http::request::RoomSubscription>,
151        cancel_in_flight_request: bool,
152    ) {
153        let settings = settings.unwrap_or_default();
154        let mut sticky = self.inner.sticky.write().unwrap();
155        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
156
157        let mut skip_over_current_sync_loop_iteration = false;
158
159        for room_id in room_ids {
160            // If the room subscription already exists, let's not
161            // override it with a new one. First, it would reset its
162            // state (`RoomSubscriptionState`), and second it would try to
163            // re-subscribe with the next request. We don't want that. A room
164            // subscription should happen once, and next subscriptions should
165            // be ignored.
166            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
167                if let Some(room) = self.inner.client.get_room(room_id) {
168                    room.mark_members_missing();
169                }
170
171                entry.insert((RoomSubscriptionState::default(), settings.clone()));
172
173                skip_over_current_sync_loop_iteration = true;
174            }
175        }
176
177        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
178            self.inner.internal_channel_send_if_possible(
179                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
180            );
181        }
182    }
183
184    /// Find a list by its name, and do something on it if it exists.
185    pub async fn on_list<Function, FunctionOutput, R>(
186        &self,
187        list_name: &str,
188        function: Function,
189    ) -> Option<R>
190    where
191        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
192        FunctionOutput: Future<Output = R>,
193    {
194        let lists = self.inner.lists.read().await;
195
196        match lists.get(list_name) {
197            Some(list) => Some(function(list).await),
198            None => None,
199        }
200    }
201
202    /// Add the list to the list of lists.
203    ///
204    /// As lists need to have a unique `.name`, if a list with the same name
205    /// is found the new list will replace the old one and the return it or
206    /// `None`.
207    pub async fn add_list(
208        &self,
209        list_builder: SlidingSyncListBuilder,
210    ) -> Result<Option<SlidingSyncList>> {
211        let list = list_builder.build(self.inner.internal_channel.clone());
212
213        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
214
215        self.inner.internal_channel_send_if_possible(
216            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
217        );
218
219        Ok(old_list)
220    }
221
222    /// Add a list that will be cached and reloaded from the cache.
223    ///
224    /// This will raise an error if a storage key was not set, or if there
225    /// was a I/O error reading from the cache.
226    ///
227    /// The rest of the semantics is the same as [`Self::add_list`].
228    pub async fn add_cached_list(
229        &self,
230        mut list_builder: SlidingSyncListBuilder,
231    ) -> Result<Option<SlidingSyncList>> {
232        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
233
234        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
235
236        self.add_list(list_builder).await
237    }
238
239    /// Handle the HTTP response.
240    #[instrument(skip_all)]
241    async fn handle_response(
242        &self,
243        sliding_sync_response: http::Response,
244        position: &mut SlidingSyncPositionMarkers,
245        requested_required_states: RequestedRequiredStates,
246    ) -> Result<UpdateSummary, crate::Error> {
247        let pos = Some(sliding_sync_response.pos.clone());
248
249        let must_process_rooms_response = self.must_process_rooms_response().await;
250
251        trace!(yes = must_process_rooms_response, "Must process rooms response?");
252
253        // Transform a Sliding Sync Response to a `SyncResponse`.
254        //
255        // We may not need the `sync_response` in the future (once `SyncResponse` will
256        // move to Sliding Sync, i.e. to `http::Response`), but processing the
257        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
258        // happens here.
259
260        let sync_response = {
261            let _timer = timer!("response processor");
262
263            let response_processor = {
264                // Take the lock to avoid concurrent sliding syncs overwriting each other's room
265                // infos.
266                let _sync_lock = {
267                    let _timer = timer!("acquiring the `sync_lock`");
268
269                    self.inner.client.base_client().sync_lock().lock().await
270                };
271
272                let mut response_processor =
273                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
274
275                #[cfg(feature = "e2e-encryption")]
276                if self.is_e2ee_enabled() {
277                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
278                }
279
280                // Only handle the room's subsection of the response, if this sliding sync was
281                // configured to do so.
282                if must_process_rooms_response {
283                    response_processor
284                        .handle_room_response(&sliding_sync_response, &requested_required_states)
285                        .await?;
286                }
287
288                response_processor
289            };
290
291            // Release the lock before calling event handlers
292            response_processor.process_and_take_response().await?
293        };
294
295        debug!("Sliding Sync response has been handled by the client");
296        trace!(?sync_response);
297
298        // Commit sticky parameters, if needed.
299        if let Some(ref txn_id) = sliding_sync_response.txn_id {
300            let txn_id = txn_id.as_str().into();
301            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
302            let mut lists = self.inner.lists.write().await;
303            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
304        }
305
306        let update_summary = {
307            // Update the rooms.
308            let updated_rooms = {
309                let mut updated_rooms = Vec::with_capacity(
310                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
311                );
312
313                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
314
315                // There might be other rooms that were only mentioned in the sliding sync
316                // extensions part of the response, and thus would result in rooms present in
317                // the `sync_response.joined`. Mark them as updated too.
318                //
319                // Since we've removed rooms that were in the room subsection from
320                // `sync_response.rooms.joined`, the remaining ones aren't already present in
321                // `updated_rooms` and wouldn't cause any duplicates.
322                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
323
324                updated_rooms
325            };
326
327            // Update the lists.
328            let updated_lists = {
329                debug!(
330                    lists = ?sliding_sync_response.lists,
331                    "Update lists"
332                );
333
334                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
335                let mut lists = self.inner.lists.write().await;
336
337                // Iterate on known lists, not on lists in the response. Rooms may have been
338                // updated that were not involved in any list update.
339                for (name, list) in lists.iter_mut() {
340                    if let Some(updates) = sliding_sync_response.lists.get(name) {
341                        let maximum_number_of_rooms: u32 =
342                            updates.count.try_into().expect("failed to convert `count` to `u32`");
343
344                        if list.update(Some(maximum_number_of_rooms))? {
345                            updated_lists.push(name.clone());
346                        }
347                    } else if list.update(None)? {
348                        updated_lists.push(name.clone());
349                    }
350                }
351
352                // Report about unknown lists.
353                for name in sliding_sync_response.lists.keys() {
354                    if !lists.contains_key(name) {
355                        error!("Response for list `{name}` - unknown to us; skipping");
356                    }
357                }
358
359                updated_lists
360            };
361
362            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
363        };
364
365        // Everything went well, we can update the position markers.
366        //
367        // Save the new position markers.
368        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
369
370        position.pos = pos;
371
372        Ok(update_summary)
373    }
374
375    #[instrument(skip_all)]
376    async fn generate_sync_request(
377        &self,
378        txn_id: &mut LazyTransactionId,
379    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
380        // Collect requests for lists.
381        let mut requests_lists = BTreeMap::new();
382
383        let require_timeout = {
384            let lists = self.inner.lists.read().await;
385
386            // Start at `true` in case there is zero list.
387            let mut require_timeout = true;
388
389            for (name, list) in lists.iter() {
390                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
391                require_timeout = require_timeout && list.requires_timeout();
392            }
393
394            require_timeout
395        };
396
397        // Collect the `pos`.
398        //
399        // Wait on the `position` mutex to be available. It means no request nor
400        // response is running. The `position` mutex is released whether the response
401        // has been fully handled successfully, in this case the `pos` is updated, or
402        // the response handling has failed, in this case the `pos` hasn't been updated
403        // and the same `pos` will be used for this new request.
404        let mut position_guard = {
405            debug!("Waiting to acquire the `position` lock");
406
407            let _timer = timer!("acquiring the `position` lock");
408
409            self.inner.position.clone().lock_owned().await
410        };
411
412        debug!(pos = ?position_guard.pos, "Got a position");
413
414        let to_device_enabled =
415            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
416
417        let restored_fields = if self.inner.share_pos || to_device_enabled {
418            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
419        } else {
420            None
421        };
422
423        // Update pos: either the one restored from the database, if any and the sliding
424        // sync was configured so, or read it from the memory cache.
425        let pos = if self.inner.share_pos {
426            if let Some(fields) = &restored_fields {
427                // Override the memory one with the database one, for consistency.
428                if fields.pos != position_guard.pos {
429                    info!(
430                        "Pos from previous request ('{:?}') was different from \
431                         pos in database ('{:?}').",
432                        position_guard.pos, fields.pos
433                    );
434                    position_guard.pos = fields.pos.clone();
435                }
436                fields.pos.clone()
437            } else {
438                position_guard.pos.clone()
439            }
440        } else {
441            position_guard.pos.clone()
442        };
443
444        Span::current().record("pos", &pos);
445
446        // When the client sends a request with no `pos`, MSC4186 returns no device
447        // lists updates, as it only returns changes since the provided `pos`
448        // (which is `null` in this case); this is in line with sync v2.
449        //
450        // Therefore, with MSC4186, the device list cache must be marked as to be
451        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
452        // device lists updates that happened between the previous request and the new
453        // “initial” request.
454        #[cfg(feature = "e2e-encryption")]
455        if pos.is_none() && self.is_e2ee_enabled() {
456            info!("Marking all tracked users as dirty");
457
458            let olm_machine = self.inner.client.olm_machine().await;
459            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
460            olm_machine.mark_all_tracked_users_as_dirty().await?;
461        }
462
463        // Configure the timeout.
464        //
465        // The `timeout` query is necessary when all lists require it. Please see
466        // [`SlidingSyncList::requires_timeout`].
467        let timeout = require_timeout.then(|| self.inner.poll_timeout);
468
469        let mut request = assign!(http::Request::new(), {
470            conn_id: Some(self.inner.id.clone()),
471            pos,
472            timeout,
473            lists: requests_lists,
474        });
475
476        // Apply sticky parameters, if needs be.
477        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
478
479        // Extensions are now applied (via sticky parameters).
480        //
481        // Override the to-device token if the extension is enabled.
482        if to_device_enabled {
483            request.extensions.to_device.since =
484                restored_fields.and_then(|fields| fields.to_device_token);
485        }
486
487        // Apply the transaction id if one was generated.
488        if let Some(txn_id) = txn_id.get() {
489            request.txn_id = Some(txn_id.to_string());
490        }
491
492        Ok((
493            // The request itself.
494            request,
495            // Configure long-polling. We need some time for the long-poll itself,
496            // and extra time for the network delays.
497            RequestConfig::default()
498                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
499                .retry_limit(3),
500            position_guard,
501        ))
502    }
503
504    /// Send a sliding sync request.
505    ///
506    /// This method contains the sending logic.
507    async fn send_sync_request(
508        &self,
509        request: http::Request,
510        request_config: RequestConfig,
511        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
512    ) -> Result<UpdateSummary> {
513        debug!("Sending request");
514
515        // Prepare the request.
516        let requested_required_states = RequestedRequiredStates::from(&request);
517        let request = self.inner.client.send(request).with_request_config(request_config);
518
519        // Send the request and get a response with end-to-end encryption support.
520        //
521        // Sending the `/sync` request out when end-to-end encryption is enabled means
522        // that we need to also send out any outgoing e2ee related request out
523        // coming from the `OlmMachine::outgoing_requests()` method.
524
525        #[cfg(feature = "e2e-encryption")]
526        let response = {
527            if self.is_e2ee_enabled() {
528                // Here, we need to run 2 things:
529                //
530                // 1. Send the sliding sync request and get a response,
531                // 2. Send the E2EE requests.
532                //
533                // We don't want to use a `join` or `try_join` because we want to fail if and
534                // only if sending the sliding sync request fails. Failing to send the E2EE
535                // requests should just result in a log.
536                //
537                // We also want to give the priority to sliding sync request. E2EE requests are
538                // sent concurrently to the sliding sync request, but the priority is on waiting
539                // a sliding sync response.
540                //
541                // If sending sliding sync request fails, the sending of E2EE requests must be
542                // aborted as soon as possible.
543
544                let client = self.inner.client.clone();
545                let e2ee_uploads = spawn(
546                    async move {
547                        if let Err(error) = client.send_outgoing_requests().await {
548                            error!(?error, "Error while sending outgoing E2EE requests");
549                        }
550                    }
551                    .instrument(Span::current()),
552                )
553                // Ensure that the task is not running in detached mode. It is aborted when it's
554                // dropped.
555                .abort_on_drop();
556
557                // Wait on the sliding sync request success or failure early.
558                let response = request.await?;
559
560                // At this point, if `request` has been resolved successfully, we wait on
561                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
562                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
563                // been dropped, so aborted.
564                e2ee_uploads.await.map_err(|error| Error::JoinError {
565                    task_description: "e2ee_uploads".to_owned(),
566                    error,
567                })?;
568
569                response
570            } else {
571                request.await?
572            }
573        };
574
575        // Send the request and get a response _without_ end-to-end encryption support.
576        #[cfg(not(feature = "e2e-encryption"))]
577        let response = request.await?;
578
579        debug!("Received response");
580
581        // At this point, the request has been sent, and a response has been received.
582        //
583        // We must ensure the handling of the response cannot be stopped/
584        // cancelled. It must be done entirely, otherwise we can have
585        // corrupted/incomplete states for Sliding Sync and other parts of
586        // the code.
587        //
588        // That's why we are running the handling of the response in a spawned
589        // future that cannot be cancelled by anything.
590        let this = self.clone();
591
592        // Spawn a new future to ensure that the code inside this future cannot be
593        // cancelled if this method is cancelled.
594        let future = async move {
595            debug!("Start handling response");
596
597            // In case the task running this future is detached, we must
598            // ensure responses are handled one at a time. At this point we still own
599            // `position_guard`, so we're fine.
600
601            // Handle the response.
602            let updates = this
603                .handle_response(response, &mut position_guard, requested_required_states)
604                .await?;
605
606            this.cache_to_storage(&position_guard).await?;
607
608            // Release the position guard lock.
609            // It means that other responses can be generated and then handled later.
610            drop(position_guard);
611
612            debug!("Done handling response");
613
614            Ok(updates)
615        };
616
617        spawn(future.instrument(Span::current())).await.unwrap()
618    }
619
620    /// Is the e2ee extension enabled for this sliding sync instance?
621    #[cfg(feature = "e2e-encryption")]
622    fn is_e2ee_enabled(&self) -> bool {
623        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
624    }
625
626    #[cfg(not(feature = "e2e-encryption"))]
627    fn is_e2ee_enabled(&self) -> bool {
628        false
629    }
630
631    /// Should we process the room's subpart of a response?
632    async fn must_process_rooms_response(&self) -> bool {
633        // We consider that we must, if there's any room subscription or there's any
634        // list.
635        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
636            || !self.inner.lists.read().await.is_empty()
637    }
638
639    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
640    async fn sync_once(&self) -> Result<UpdateSummary> {
641        let (request, request_config, position_guard) =
642            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
643
644        // Send the request, kaboom.
645        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
646
647        // Notify a new sync was received
648        self.inner.client.inner.sync_beat.notify(usize::MAX);
649
650        Ok(summaries)
651    }
652
653    /// Create a _new_ Sliding Sync sync loop.
654    ///
655    /// This method returns a `Stream`, which will send requests and will handle
656    /// responses automatically. Lists and rooms are updated automatically.
657    ///
658    /// This function returns `Ok(…)` if everything went well, otherwise it will
659    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
660    /// termination.
661    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
662    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
663    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
664        debug!("Starting sync stream");
665
666        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
667
668        stream! {
669            loop {
670                debug!("Sync stream is running");
671
672                select! {
673                    biased;
674
675                    internal_message = internal_channel_receiver.recv() => {
676                        use SlidingSyncInternalMessage::*;
677
678                        debug!(?internal_message, "Sync stream has received an internal message");
679
680                        match internal_message {
681                            Err(_) | Ok(SyncLoopStop) => {
682                                break;
683                            }
684
685                            Ok(SyncLoopSkipOverCurrentIteration) => {
686                                continue;
687                            }
688                        }
689                    }
690
691                    update_summary = self.sync_once() => {
692                        match update_summary {
693                            Ok(updates) => {
694                                yield Ok(updates);
695                            }
696
697                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
698                            Err(error) => {
699                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
700                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
701                                    self.expire_session().await;
702                                }
703
704                                yield Err(error);
705
706                                // Terminates the loop, and terminates the stream.
707                                break;
708                            }
709                        }
710                    }
711                }
712            }
713
714            debug!("Sync stream has exited.");
715        }
716    }
717
718    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
719    ///
720    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
721    /// enough to “stop” it, but depending of how this `Stream` is used, it
722    /// might not be obvious to drop it immediately (thinking of using this API
723    /// over FFI; the foreign-language might not be able to drop a value
724    /// immediately). Thus, calling this method will ensure that the sync loop
725    /// stops gracefully and as soon as it returns.
726    pub fn stop_sync(&self) -> Result<()> {
727        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
728    }
729
730    /// Expire the current Sliding Sync session on the client-side.
731    ///
732    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
733    /// sticky parameters.
734    ///
735    /// This should only be used when it's clear that this session was about to
736    /// expire anyways, and should be used only in very specific cases (e.g.
737    /// multiple sliding syncs being run in parallel, and one of them has
738    /// expired).
739    ///
740    /// This method **MUST** be called when the sync loop is stopped.
741    #[doc(hidden)]
742    pub async fn expire_session(&self) {
743        info!("Session expired; resetting `pos` and sticky parameters");
744
745        {
746            let lists = self.inner.lists.read().await;
747            for list in lists.values() {
748                // Invalidate in-memory data that would be persisted on disk.
749                list.set_maximum_number_of_rooms(None);
750
751                // Invalidate the sticky data for this list.
752                list.invalidate_sticky_data();
753            }
754        }
755
756        // Remove the cached sliding sync state as well.
757        {
758            let mut position = self.inner.position.lock().await;
759
760            // Invalidate in memory.
761            position.pos = None;
762
763            // Propagate to disk.
764            // Note: this propagates both the sliding sync state and the cached lists'
765            // state to disk.
766            if let Err(err) = self.cache_to_storage(&position).await {
767                warn!("Failed to invalidate cached sliding sync state: {err}");
768            }
769        }
770
771        {
772            let mut sticky = self.inner.sticky.write().unwrap();
773
774            // Clear all room subscriptions: we don't want to resend all room subscriptions
775            // when the session will restart.
776            sticky.data_mut().room_subscriptions.clear();
777        }
778    }
779}
780
781impl SlidingSyncInner {
782    /// Send a message over the internal channel.
783    #[instrument]
784    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
785        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
786    }
787
788    /// Send a message over the internal channel if there is a receiver, i.e. if
789    /// the sync loop is running.
790    #[instrument]
791    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
792        // If there is no receiver, the send will fail, but that's OK here.
793        let _ = self.internal_channel.send(message);
794    }
795}
796
797#[derive(Copy, Clone, Debug, PartialEq)]
798enum SlidingSyncInternalMessage {
799    /// Instruct the sync loop to stop.
800    SyncLoopStop,
801
802    /// Instruct the sync loop to skip over any remaining work in its iteration,
803    /// and to jump to the next iteration.
804    SyncLoopSkipOverCurrentIteration,
805}
806
807#[cfg(any(test, feature = "testing"))]
808impl SlidingSync {
809    /// Set a new value for `pos`.
810    pub async fn set_pos(&self, new_pos: String) {
811        let mut position_lock = self.inner.position.lock().await;
812        position_lock.pos = Some(new_pos);
813    }
814
815    /// Read the static extension configuration for this Sliding Sync.
816    ///
817    /// Note: this is not the next content of the sticky parameters, but rightly
818    /// the static configuration that was set during creation of this
819    /// Sliding Sync.
820    pub fn extensions_config(&self) -> http::request::Extensions {
821        let sticky = self.inner.sticky.read().unwrap();
822        sticky.data().extensions.clone()
823    }
824}
825
826#[derive(Clone, Debug)]
827pub(super) struct SlidingSyncPositionMarkers {
828    /// An ephemeral position in the current stream, as received from the
829    /// previous `/sync` response, or `None` for the first request.
830    pos: Option<String>,
831}
832
833#[derive(Serialize, Deserialize)]
834struct FrozenSlidingSyncPos {
835    #[serde(skip_serializing_if = "Option::is_none")]
836    pos: Option<String>,
837}
838
839/// A summary of the updates received after a sync (like in
840/// [`SlidingSync::sync`]).
841#[derive(Debug, Clone)]
842pub struct UpdateSummary {
843    /// The names of the lists that have seen an update.
844    pub lists: Vec<String>,
845    /// The rooms that have seen updates
846    pub rooms: Vec<OwnedRoomId>,
847}
848
849/// A very basic bool-ish enum to represent the state of a
850/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
851/// once should ideally not being sent again, to mostly save bandwidth.
852#[derive(Debug, Default)]
853enum RoomSubscriptionState {
854    /// The `RoomSubscription` has not been sent or received correctly from the
855    /// server, i.e. the `RoomSubscription` —which is part of the sticky
856    /// parameters— has not been committed.
857    #[default]
858    Pending,
859
860    /// The `RoomSubscription` has been sent and received correctly by the
861    /// server.
862    Applied,
863}
864
865/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
866/// sent in the request.
867#[derive(Debug)]
868pub(super) struct SlidingSyncStickyParameters {
869    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
870    /// but one wants to receive updates.
871    room_subscriptions:
872        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
873
874    /// The intended state of the extensions being supplied to sliding /sync
875    /// calls.
876    extensions: http::request::Extensions,
877}
878
879impl SlidingSyncStickyParameters {
880    /// Create a new set of sticky parameters.
881    pub fn new(
882        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
883        extensions: http::request::Extensions,
884    ) -> Self {
885        Self {
886            room_subscriptions: room_subscriptions
887                .into_iter()
888                .map(|(room_id, room_subscription)| {
889                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
890                })
891                .collect(),
892            extensions,
893        }
894    }
895}
896
897impl StickyData for SlidingSyncStickyParameters {
898    type Request = http::Request;
899
900    fn apply(&self, request: &mut Self::Request) {
901        request.room_subscriptions = self
902            .room_subscriptions
903            .iter()
904            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
905            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
906            .collect();
907        request.extensions = self.extensions.clone();
908    }
909
910    fn on_commit(&mut self) {
911        // All room subscriptions are marked as `Applied`.
912        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
913            if matches!(state, RoomSubscriptionState::Pending) {
914                *state = RoomSubscriptionState::Applied;
915            }
916        }
917    }
918}
919
920#[cfg(all(test, not(target_family = "wasm")))]
921#[allow(clippy::dbg_macro)]
922mod tests {
923    use std::{
924        collections::BTreeMap,
925        future::ready,
926        ops::Not,
927        sync::{Arc, Mutex},
928        time::Duration,
929    };
930
931    use assert_matches::assert_matches;
932    use event_listener::Listener;
933    use futures_util::{future::join_all, pin_mut, StreamExt};
934    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
935    use matrix_sdk_common::executor::spawn;
936    use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE};
937    use ruma::{
938        api::client::error::ErrorKind,
939        assign,
940        events::{direct::DirectEvent, room::member::MembershipState},
941        owned_room_id, room_id,
942        serde::Raw,
943        uint, OwnedRoomId, TransactionId,
944    };
945    use serde::Deserialize;
946    use serde_json::json;
947    use wiremock::{
948        http::Method, matchers::method, Match, Mock, MockServer, Request, ResponseTemplate,
949    };
950
951    use super::{
952        http,
953        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
954        SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
955        SlidingSyncStickyParameters,
956    };
957    use crate::{
958        sliding_sync::cache::restore_sliding_sync_state,
959        test_utils::{logged_in_client, mocks::MatrixMockServer},
960        Client, Result,
961    };
962
963    #[derive(Copy, Clone)]
964    struct SlidingSyncMatcher;
965
966    impl Match for SlidingSyncMatcher {
967        fn matches(&self, request: &Request) -> bool {
968            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
969                && request.method == Method::POST
970        }
971    }
972
973    async fn new_sliding_sync(
974        lists: Vec<SlidingSyncListBuilder>,
975    ) -> Result<(MockServer, SlidingSync)> {
976        let server = MockServer::start().await;
977        let client = logged_in_client(Some(server.uri())).await;
978
979        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
980
981        for list in lists {
982            sliding_sync_builder = sliding_sync_builder.add_list(list);
983        }
984
985        let sliding_sync = sliding_sync_builder.build().await?;
986
987        Ok((server, sliding_sync))
988    }
989
990    #[async_test]
991    async fn test_subscribe_to_rooms() -> Result<()> {
992        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
993            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
994        .await?;
995
996        let stream = sliding_sync.sync();
997        pin_mut!(stream);
998
999        let room_id_0 = room_id!("!r0:bar.org");
1000        let room_id_1 = room_id!("!r1:bar.org");
1001        let room_id_2 = room_id!("!r2:bar.org");
1002
1003        {
1004            let _mock_guard = Mock::given(SlidingSyncMatcher)
1005                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1006                    "pos": "1",
1007                    "lists": {},
1008                    "rooms": {
1009                        room_id_0: {
1010                            "name": "Room #0",
1011                            "initial": true,
1012                        },
1013                        room_id_1: {
1014                            "name": "Room #1",
1015                            "initial": true,
1016                        },
1017                        room_id_2: {
1018                            "name": "Room #2",
1019                            "initial": true,
1020                        },
1021                    }
1022                })))
1023                .mount_as_scoped(&server)
1024                .await;
1025
1026            let _ = stream.next().await.unwrap()?;
1027        }
1028
1029        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1030
1031        // Members aren't synced.
1032        // We need to make them synced, so that we can test that subscribing to a room
1033        // make members not synced. That's a desired feature.
1034        assert!(room0.are_members_synced().not());
1035
1036        {
1037            struct MemberMatcher(OwnedRoomId);
1038
1039            impl Match for MemberMatcher {
1040                fn matches(&self, request: &Request) -> bool {
1041                    request.url.path()
1042                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1043                        && request.method == Method::GET
1044                }
1045            }
1046
1047            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1048                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1049                    "chunk": [],
1050                })))
1051                .mount_as_scoped(&server)
1052                .await;
1053
1054            assert_matches!(room0.request_members().await, Ok(()));
1055        }
1056
1057        // Members are now synced! We can start subscribing and see how it goes.
1058        assert!(room0.are_members_synced());
1059
1060        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1061
1062        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1063        // now marked as not synced.
1064        assert!(room0.are_members_synced().not());
1065
1066        {
1067            let sticky = sliding_sync.inner.sticky.read().unwrap();
1068            let room_subscriptions = &sticky.data().room_subscriptions;
1069
1070            assert!(room_subscriptions.contains_key(room_id_0));
1071            assert!(room_subscriptions.contains_key(room_id_1));
1072            assert!(!room_subscriptions.contains_key(room_id_2));
1073        }
1074
1075        // Subscribing to the same room doesn't reset the member sync state.
1076
1077        {
1078            struct MemberMatcher(OwnedRoomId);
1079
1080            impl Match for MemberMatcher {
1081                fn matches(&self, request: &Request) -> bool {
1082                    request.url.path()
1083                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1084                        && request.method == Method::GET
1085                }
1086            }
1087
1088            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1089                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1090                    "chunk": [],
1091                })))
1092                .mount_as_scoped(&server)
1093                .await;
1094
1095            assert_matches!(room0.request_members().await, Ok(()));
1096        }
1097
1098        // Members are synced, good, good.
1099        assert!(room0.are_members_synced());
1100
1101        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1102
1103        // Members are still synced: because we have already subscribed to the
1104        // room, the members aren't marked as unsynced.
1105        assert!(room0.are_members_synced());
1106
1107        Ok(())
1108    }
1109
1110    #[async_test]
1111    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1112        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1113            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1114        .await?;
1115
1116        let room_id_0 = room_id!("!r0:bar.org");
1117        let room_id_1 = room_id!("!r1:bar.org");
1118        let room_id_2 = room_id!("!r2:bar.org");
1119
1120        // Subscribe to two rooms.
1121        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1122
1123        {
1124            let sticky = sliding_sync.inner.sticky.read().unwrap();
1125            let room_subscriptions = &sticky.data().room_subscriptions;
1126
1127            assert!(room_subscriptions.contains_key(room_id_0));
1128            assert!(room_subscriptions.contains_key(room_id_1));
1129            assert!(room_subscriptions.contains_key(room_id_2).not());
1130        }
1131
1132        // Subscribe to one more room.
1133        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1134
1135        {
1136            let sticky = sliding_sync.inner.sticky.read().unwrap();
1137            let room_subscriptions = &sticky.data().room_subscriptions;
1138
1139            assert!(room_subscriptions.contains_key(room_id_0));
1140            assert!(room_subscriptions.contains_key(room_id_1));
1141            assert!(room_subscriptions.contains_key(room_id_2));
1142        }
1143
1144        // Suddenly, the session expires!
1145        sliding_sync.expire_session().await;
1146
1147        {
1148            let sticky = sliding_sync.inner.sticky.read().unwrap();
1149            let room_subscriptions = &sticky.data().room_subscriptions;
1150
1151            assert!(room_subscriptions.is_empty());
1152        }
1153
1154        // Subscribe to one room again.
1155        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1156
1157        {
1158            let sticky = sliding_sync.inner.sticky.read().unwrap();
1159            let room_subscriptions = &sticky.data().room_subscriptions;
1160
1161            assert!(room_subscriptions.contains_key(room_id_0).not());
1162            assert!(room_subscriptions.contains_key(room_id_1).not());
1163            assert!(room_subscriptions.contains_key(room_id_2));
1164        }
1165
1166        Ok(())
1167    }
1168
1169    #[async_test]
1170    async fn test_add_list() -> Result<()> {
1171        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1172            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1173        .await?;
1174
1175        let _stream = sliding_sync.sync();
1176        pin_mut!(_stream);
1177
1178        sliding_sync
1179            .add_list(
1180                SlidingSyncList::builder("bar")
1181                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1182            )
1183            .await?;
1184
1185        let lists = sliding_sync.inner.lists.read().await;
1186
1187        assert!(lists.contains_key("foo"));
1188        assert!(lists.contains_key("bar"));
1189
1190        // this test also ensures that Tokio is not panicking when calling `add_list`.
1191
1192        Ok(())
1193    }
1194
1195    #[test]
1196    fn test_sticky_parameters_api_invalidated_flow() {
1197        let r0 = room_id!("!r0.matrix.org");
1198        let r1 = room_id!("!r1:matrix.org");
1199
1200        let mut room_subscriptions = BTreeMap::new();
1201        room_subscriptions.insert(r0.to_owned(), Default::default());
1202
1203        // At first it's invalidated.
1204        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1205            room_subscriptions,
1206            Default::default(),
1207        ));
1208        assert!(sticky.is_invalidated());
1209
1210        // Then when we create a request, the sticky parameters are applied.
1211        let txn_id: &TransactionId = "tid123".into();
1212
1213        let mut request = http::Request::default();
1214        request.txn_id = Some(txn_id.to_string());
1215
1216        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1217
1218        assert!(request.txn_id.is_some());
1219        assert_eq!(request.room_subscriptions.len(), 1);
1220        assert!(request.room_subscriptions.contains_key(r0));
1221
1222        let tid = request.txn_id.unwrap();
1223
1224        sticky.maybe_commit(tid.as_str().into());
1225        assert!(!sticky.is_invalidated());
1226
1227        // Applying new parameters will invalidate again.
1228        sticky
1229            .data_mut()
1230            .room_subscriptions
1231            .insert(r1.to_owned(), (Default::default(), Default::default()));
1232        assert!(sticky.is_invalidated());
1233
1234        // Committing with the wrong transaction id will keep it invalidated.
1235        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1236        assert!(sticky.is_invalidated());
1237
1238        // Restarting a request will only remember the last generated transaction id.
1239        let txn_id1: &TransactionId = "tid456".into();
1240        let mut request1 = http::Request::default();
1241        request1.txn_id = Some(txn_id1.to_string());
1242        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1243
1244        assert!(sticky.is_invalidated());
1245        // The first room subscription has been applied to `request`, so it's not
1246        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1247        // related to the sticky design.
1248        assert_eq!(request1.room_subscriptions.len(), 1);
1249        assert!(request1.room_subscriptions.contains_key(r1));
1250
1251        let txn_id2: &TransactionId = "tid789".into();
1252        let mut request2 = http::Request::default();
1253        request2.txn_id = Some(txn_id2.to_string());
1254
1255        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1256        assert!(sticky.is_invalidated());
1257        // `request2` contains `r1` because the sticky parameters have not been
1258        // committed, so it's still marked as pending.
1259        assert_eq!(request2.room_subscriptions.len(), 1);
1260        assert!(request2.room_subscriptions.contains_key(r1));
1261
1262        // Here we commit with the not most-recent TID, so it keeps the invalidated
1263        // status.
1264        sticky.maybe_commit(txn_id1);
1265        assert!(sticky.is_invalidated());
1266
1267        // But here we use the latest TID, so the commit is effective.
1268        sticky.maybe_commit(txn_id2);
1269        assert!(!sticky.is_invalidated());
1270    }
1271
1272    #[test]
1273    fn test_room_subscriptions_are_sticky() {
1274        let r0 = room_id!("!r0.matrix.org");
1275        let r1 = room_id!("!r1:matrix.org");
1276
1277        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1278            BTreeMap::new(),
1279            Default::default(),
1280        ));
1281
1282        // A room subscription is added, applied, and committed.
1283        {
1284            // Insert `r0`.
1285            sticky
1286                .data_mut()
1287                .room_subscriptions
1288                .insert(r0.to_owned(), (Default::default(), Default::default()));
1289
1290            // Then the sticky parameters are applied.
1291            let txn_id: &TransactionId = "tid0".into();
1292            let mut request = http::Request::default();
1293            request.txn_id = Some(txn_id.to_string());
1294
1295            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1296
1297            assert!(request.txn_id.is_some());
1298            assert_eq!(request.room_subscriptions.len(), 1);
1299            assert!(request.room_subscriptions.contains_key(r0));
1300
1301            // Then the sticky parameters are committed.
1302            let tid = request.txn_id.unwrap();
1303
1304            sticky.maybe_commit(tid.as_str().into());
1305        }
1306
1307        // A room subscription is added, applied, but NOT committed.
1308        {
1309            // Insert `r1`.
1310            sticky
1311                .data_mut()
1312                .room_subscriptions
1313                .insert(r1.to_owned(), (Default::default(), Default::default()));
1314
1315            // Then the sticky parameters are applied.
1316            let txn_id: &TransactionId = "tid1".into();
1317            let mut request = http::Request::default();
1318            request.txn_id = Some(txn_id.to_string());
1319
1320            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1321
1322            assert!(request.txn_id.is_some());
1323            assert_eq!(request.room_subscriptions.len(), 1);
1324            // `r0` is not present, it's only `r1`.
1325            assert!(request.room_subscriptions.contains_key(r1));
1326
1327            // Then the sticky parameters are NOT committed.
1328            // It can happen if the request has failed to be sent for example,
1329            // or if the response didn't match.
1330        }
1331
1332        // A previously added room subscription is re-added, applied, and committed.
1333        {
1334            // Then the sticky parameters are applied.
1335            let txn_id: &TransactionId = "tid2".into();
1336            let mut request = http::Request::default();
1337            request.txn_id = Some(txn_id.to_string());
1338
1339            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1340
1341            assert!(request.txn_id.is_some());
1342            assert_eq!(request.room_subscriptions.len(), 1);
1343            // `r0` is not present, it's only `r1`.
1344            assert!(request.room_subscriptions.contains_key(r1));
1345
1346            // Then the sticky parameters are committed.
1347            let tid = request.txn_id.unwrap();
1348
1349            sticky.maybe_commit(tid.as_str().into());
1350        }
1351
1352        // All room subscriptions have been committed.
1353        {
1354            // Then the sticky parameters are applied.
1355            let txn_id: &TransactionId = "tid3".into();
1356            let mut request = http::Request::default();
1357            request.txn_id = Some(txn_id.to_string());
1358
1359            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1360
1361            assert!(request.txn_id.is_some());
1362            // All room subscriptions have been sent.
1363            assert!(request.room_subscriptions.is_empty());
1364        }
1365    }
1366
1367    #[test]
1368    fn test_extensions_are_sticky() {
1369        let mut extensions = http::request::Extensions::default();
1370        extensions.account_data.enabled = Some(true);
1371
1372        // At first it's invalidated.
1373        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1374            Default::default(),
1375            extensions,
1376        ));
1377
1378        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1379
1380        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1381        // to-device.
1382        let extensions = &sticky.data().extensions;
1383        assert_eq!(extensions.e2ee.enabled, None);
1384        assert_eq!(extensions.to_device.enabled, None);
1385        assert_eq!(extensions.to_device.since, None);
1386
1387        // What the user explicitly enabled is… enabled.
1388        assert_eq!(extensions.account_data.enabled, Some(true));
1389
1390        let txn_id: &TransactionId = "tid123".into();
1391        let mut request = http::Request::default();
1392        request.txn_id = Some(txn_id.to_string());
1393        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1394        assert!(sticky.is_invalidated());
1395        assert_eq!(request.extensions.to_device.enabled, None);
1396        assert_eq!(request.extensions.to_device.since, None);
1397        assert_eq!(request.extensions.e2ee.enabled, None);
1398        assert_eq!(request.extensions.account_data.enabled, Some(true));
1399    }
1400
1401    #[async_test]
1402    async fn test_sticky_extensions_plus_since() -> Result<()> {
1403        let server = MockServer::start().await;
1404        let client = logged_in_client(Some(server.uri())).await;
1405
1406        let sync = client
1407            .sliding_sync("test-slidingsync")?
1408            .add_list(SlidingSyncList::builder("new_list"))
1409            .build()
1410            .await?;
1411
1412        // No extensions have been explicitly enabled here.
1413        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1414        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1415        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1416
1417        // Now enable e2ee and to-device.
1418        let sync = client
1419            .sliding_sync("test-slidingsync")?
1420            .add_list(SlidingSyncList::builder("new_list"))
1421            .with_to_device_extension(
1422                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1423            )
1424            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1425            .build()
1426            .await?;
1427
1428        // Even without a since token, the first request will contain the extensions
1429        // configuration, at least.
1430        let txn_id = TransactionId::new();
1431        let (request, _, _) = sync
1432            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1433            .await?;
1434
1435        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1436        assert_eq!(request.extensions.to_device.enabled, Some(true));
1437        assert!(request.extensions.to_device.since.is_none());
1438
1439        {
1440            // Committing with another transaction id doesn't validate anything.
1441            let mut sticky = sync.inner.sticky.write().unwrap();
1442            assert!(sticky.is_invalidated());
1443            sticky.maybe_commit(
1444                "hopefully the rng won't generate this very specific transaction id".into(),
1445            );
1446            assert!(sticky.is_invalidated());
1447        }
1448
1449        // Regenerating a request will yield the same one.
1450        let txn_id2 = TransactionId::new();
1451        let (request, _, _) = sync
1452            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1453            .await?;
1454
1455        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1456        assert_eq!(request.extensions.to_device.enabled, Some(true));
1457        assert!(request.extensions.to_device.since.is_none());
1458
1459        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1460
1461        {
1462            // Committing with the expected transaction id will validate it.
1463            let mut sticky = sync.inner.sticky.write().unwrap();
1464            assert!(sticky.is_invalidated());
1465            sticky.maybe_commit(txn_id2.as_str().into());
1466            assert!(!sticky.is_invalidated());
1467        }
1468
1469        // The next request should contain no sticky parameters.
1470        let txn_id = TransactionId::new();
1471        let (request, _, _) = sync
1472            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1473            .await?;
1474        assert!(request.extensions.e2ee.enabled.is_none());
1475        assert!(request.extensions.to_device.enabled.is_none());
1476        assert!(request.extensions.to_device.since.is_none());
1477
1478        // If there's a to-device `since` token, we make sure we put the token
1479        // into the extension config. The rest doesn't need to be re-enabled due to
1480        // stickiness.
1481        let _since_token = "since";
1482
1483        #[cfg(feature = "e2e-encryption")]
1484        {
1485            use matrix_sdk_base::crypto::store::types::Changes;
1486            if let Some(olm_machine) = &*client.olm_machine().await {
1487                olm_machine
1488                    .store()
1489                    .save_changes(Changes {
1490                        next_batch_token: Some(_since_token.to_owned()),
1491                        ..Default::default()
1492                    })
1493                    .await?;
1494            }
1495        }
1496
1497        let txn_id = TransactionId::new();
1498        let (request, _, _) = sync
1499            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1500            .await?;
1501
1502        assert!(request.extensions.e2ee.enabled.is_none());
1503        assert!(request.extensions.to_device.enabled.is_none());
1504
1505        #[cfg(feature = "e2e-encryption")]
1506        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1507
1508        Ok(())
1509    }
1510
1511    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1512    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1513    // `/key/query` requests must be sent. See the code to see the details.
1514    //
1515    // This test is asserting that.
1516    #[async_test]
1517    #[cfg(feature = "e2e-encryption")]
1518    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1519        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1520        use matrix_sdk_test::ruma_response_from_json;
1521        use ruma::user_id;
1522
1523        let server = MockServer::start().await;
1524        let client = logged_in_client(Some(server.uri())).await;
1525
1526        let alice = user_id!("@alice:localhost");
1527        let bob = user_id!("@bob:localhost");
1528        let me = user_id!("@example:localhost");
1529
1530        // Track and mark users are not dirty, so that we can check they are “dirty”
1531        // after that. Dirty here means that a `/key/query` must be sent.
1532        {
1533            let olm_machine = client.olm_machine().await;
1534            let olm_machine = olm_machine.as_ref().unwrap();
1535
1536            olm_machine.update_tracked_users([alice, bob]).await?;
1537
1538            // Assert requests.
1539            let outgoing_requests = olm_machine.outgoing_requests().await?;
1540
1541            assert_eq!(outgoing_requests.len(), 2);
1542            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1543            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1544
1545            // Fake responses.
1546            olm_machine
1547                .mark_request_as_sent(
1548                    outgoing_requests[0].request_id(),
1549                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1550                        "one_time_key_counts": {}
1551                    }))),
1552                )
1553                .await?;
1554
1555            olm_machine
1556                .mark_request_as_sent(
1557                    outgoing_requests[1].request_id(),
1558                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1559                        "device_keys": {
1560                            alice: {},
1561                            bob: {},
1562                        }
1563                    }))),
1564                )
1565                .await?;
1566
1567            // Once more.
1568            let outgoing_requests = olm_machine.outgoing_requests().await?;
1569
1570            assert_eq!(outgoing_requests.len(), 1);
1571            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1572
1573            olm_machine
1574                .mark_request_as_sent(
1575                    outgoing_requests[0].request_id(),
1576                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1577                        "device_keys": {
1578                            me: {},
1579                        }
1580                    }))),
1581                )
1582                .await?;
1583
1584            // No more.
1585            let outgoing_requests = olm_machine.outgoing_requests().await?;
1586
1587            assert!(outgoing_requests.is_empty());
1588        }
1589
1590        let sync = client
1591            .sliding_sync("test-slidingsync")?
1592            .add_list(SlidingSyncList::builder("new_list"))
1593            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1594            .build()
1595            .await?;
1596
1597        // First request: no `pos`.
1598        let txn_id = TransactionId::new();
1599        let (_request, _, _) = sync
1600            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1601            .await?;
1602
1603        // Now, tracked users must be dirty.
1604        {
1605            let olm_machine = client.olm_machine().await;
1606            let olm_machine = olm_machine.as_ref().unwrap();
1607
1608            // Assert requests.
1609            let outgoing_requests = olm_machine.outgoing_requests().await?;
1610
1611            assert_eq!(outgoing_requests.len(), 1);
1612            assert_matches!(
1613                outgoing_requests[0].request(),
1614                AnyOutgoingRequest::KeysQuery(request) => {
1615                    assert!(request.device_keys.contains_key(alice));
1616                    assert!(request.device_keys.contains_key(bob));
1617                    assert!(request.device_keys.contains_key(me));
1618                }
1619            );
1620
1621            // Fake responses.
1622            olm_machine
1623                .mark_request_as_sent(
1624                    outgoing_requests[0].request_id(),
1625                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1626                        "device_keys": {
1627                            alice: {},
1628                            bob: {},
1629                            me: {},
1630                        }
1631                    }))),
1632                )
1633                .await?;
1634        }
1635
1636        // Second request: with a `pos` this time.
1637        sync.set_pos("chocolat".to_owned()).await;
1638
1639        let txn_id = TransactionId::new();
1640        let (_request, _, _) = sync
1641            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1642            .await?;
1643
1644        // Tracked users are not marked as dirty.
1645        {
1646            let olm_machine = client.olm_machine().await;
1647            let olm_machine = olm_machine.as_ref().unwrap();
1648
1649            // Assert requests.
1650            let outgoing_requests = olm_machine.outgoing_requests().await?;
1651
1652            assert!(outgoing_requests.is_empty());
1653        }
1654
1655        Ok(())
1656    }
1657
1658    #[async_test]
1659    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1660        let server = MockServer::start().await;
1661        let client = logged_in_client(Some(server.uri())).await;
1662
1663        let sliding_sync = client
1664            .sliding_sync("test-slidingsync")?
1665            .with_to_device_extension(
1666                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1667            )
1668            .build()
1669            .await?;
1670
1671        // First request asks to enable the extension.
1672        let (request, _, _) =
1673            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1674        assert!(request.extensions.to_device.enabled.is_some());
1675
1676        let sync = sliding_sync.sync();
1677        pin_mut!(sync);
1678
1679        // `pos` is `None` to start with.
1680        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1681
1682        #[derive(Deserialize)]
1683        struct PartialRequest {
1684            txn_id: Option<String>,
1685        }
1686
1687        {
1688            let _mock_guard = Mock::given(SlidingSyncMatcher)
1689                .respond_with(|request: &Request| {
1690                    // Repeat the txn_id in the response, if set.
1691                    let request: PartialRequest = request.body_json().unwrap();
1692
1693                    ResponseTemplate::new(200).set_body_json(json!({
1694                        "txn_id": request.txn_id,
1695                        "pos": "0",
1696                    }))
1697                })
1698                .mount_as_scoped(&server)
1699                .await;
1700
1701            let next = sync.next().await;
1702            assert_matches!(next, Some(Ok(_update_summary)));
1703
1704            // `pos` has been updated.
1705            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1706        }
1707
1708        // Next request doesn't ask to enable the extension.
1709        let (request, _, _) =
1710            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1711        assert!(request.extensions.to_device.enabled.is_none());
1712
1713        // Next request is successful.
1714        {
1715            let _mock_guard = Mock::given(SlidingSyncMatcher)
1716                .respond_with(|request: &Request| {
1717                    // Repeat the txn_id in the response, if set.
1718                    let request: PartialRequest = request.body_json().unwrap();
1719
1720                    ResponseTemplate::new(200).set_body_json(json!({
1721                        "txn_id": request.txn_id,
1722                        "pos": "1",
1723                    }))
1724                })
1725                .mount_as_scoped(&server)
1726                .await;
1727
1728            let next = sync.next().await;
1729            assert_matches!(next, Some(Ok(_update_summary)));
1730
1731            // `pos` has been updated.
1732            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1733        }
1734
1735        // Next request is successful despite it receives an already
1736        // received `pos` from the server.
1737        {
1738            let _mock_guard = Mock::given(SlidingSyncMatcher)
1739                .respond_with(|request: &Request| {
1740                    // Repeat the txn_id in the response, if set.
1741                    let request: PartialRequest = request.body_json().unwrap();
1742
1743                    ResponseTemplate::new(200).set_body_json(json!({
1744                        "txn_id": request.txn_id,
1745                        "pos": "0", // <- already received!
1746                    }))
1747                })
1748                .up_to_n_times(1) // run this mock only once.
1749                .mount_as_scoped(&server)
1750                .await;
1751
1752            let next = sync.next().await;
1753            assert_matches!(next, Some(Ok(_update_summary)));
1754
1755            // `pos` has been updated.
1756            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1757        }
1758
1759        // Stop responding with successful requests!
1760        //
1761        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1762        // so they're reset. It also resets the `pos`.
1763        {
1764            let _mock_guard = Mock::given(SlidingSyncMatcher)
1765                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1766                    "error": "foo",
1767                    "errcode": "M_UNKNOWN_POS",
1768                })))
1769                .mount_as_scoped(&server)
1770                .await;
1771
1772            let next = sync.next().await;
1773
1774            // The expected error is returned.
1775            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1776
1777            // `pos` has been reset.
1778            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1779
1780            // Next request asks to enable the extension again.
1781            let (request, _, _) =
1782                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1783
1784            assert!(request.extensions.to_device.enabled.is_some());
1785
1786            // `sync` has been stopped.
1787            assert!(sync.next().await.is_none());
1788        }
1789
1790        Ok(())
1791    }
1792
1793    #[cfg(feature = "e2e-encryption")]
1794    #[async_test]
1795    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1796        let server = MockServer::start().await;
1797
1798        #[derive(Deserialize)]
1799        struct PartialRequest {
1800            txn_id: Option<String>,
1801        }
1802
1803        let server_pos = Arc::new(Mutex::new(0));
1804        let _mock_guard = Mock::given(SlidingSyncMatcher)
1805            .respond_with(move |request: &Request| {
1806                // Repeat the txn_id in the response, if set.
1807                let request: PartialRequest = request.body_json().unwrap();
1808                let pos = {
1809                    let mut pos = server_pos.lock().unwrap();
1810                    let prev = *pos;
1811                    *pos += 1;
1812                    prev
1813                };
1814
1815                ResponseTemplate::new(200).set_body_json(json!({
1816                    "txn_id": request.txn_id,
1817                    "pos": pos.to_string(),
1818                }))
1819            })
1820            .mount_as_scoped(&server)
1821            .await;
1822
1823        let client = logged_in_client(Some(server.uri())).await;
1824
1825        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1826
1827        // `pos` is `None` to start with.
1828        {
1829            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1830
1831            let (request, _, _) =
1832                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1833            assert!(request.pos.is_none());
1834        }
1835
1836        let sync = sliding_sync.sync();
1837        pin_mut!(sync);
1838
1839        // Sync goes well, and then the position is saved both into the internal memory
1840        // and the database.
1841        let next = sync.next().await;
1842        assert_matches!(next, Some(Ok(_update_summary)));
1843
1844        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1845
1846        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1847            .await?
1848            .expect("must have restored fields");
1849
1850        // While it has been saved into the database, it's not necessarily going to be
1851        // used later!
1852        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1853
1854        // Now, even if we mess with the position stored in the database, the sliding
1855        // sync instance isn't configured to reload the stream position from the
1856        // database, so it won't be changed.
1857        {
1858            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1859
1860            let mut position_guard = other_sync.inner.position.lock().await;
1861            position_guard.pos = Some("yolo".to_owned());
1862
1863            other_sync.cache_to_storage(&position_guard).await?;
1864        }
1865
1866        // It's still 0, not "yolo".
1867        {
1868            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1869            let (request, _, _) =
1870                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1871            assert_eq!(request.pos.as_deref(), Some("0"));
1872        }
1873
1874        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1875        // asked to.
1876        {
1877            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1878            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1879        }
1880
1881        Ok(())
1882    }
1883
1884    #[cfg(feature = "e2e-encryption")]
1885    #[async_test]
1886    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1887        let server = MockServer::start().await;
1888
1889        #[derive(Deserialize)]
1890        struct PartialRequest {
1891            txn_id: Option<String>,
1892        }
1893
1894        let server_pos = Arc::new(Mutex::new(0));
1895        let _mock_guard = Mock::given(SlidingSyncMatcher)
1896            .respond_with(move |request: &Request| {
1897                // Repeat the txn_id in the response, if set.
1898                let request: PartialRequest = request.body_json().unwrap();
1899                let pos = {
1900                    let mut pos = server_pos.lock().unwrap();
1901                    let prev = *pos;
1902                    *pos += 1;
1903                    prev
1904                };
1905
1906                ResponseTemplate::new(200).set_body_json(json!({
1907                    "txn_id": request.txn_id,
1908                    "pos": pos.to_string(),
1909                }))
1910            })
1911            .mount_as_scoped(&server)
1912            .await;
1913
1914        let client = logged_in_client(Some(server.uri())).await;
1915
1916        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1917
1918        // `pos` is `None` to start with.
1919        {
1920            let (request, _, _) =
1921                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1922
1923            assert!(request.pos.is_none());
1924            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1925        }
1926
1927        let sync = sliding_sync.sync();
1928        pin_mut!(sync);
1929
1930        // Sync goes well, and then the position is saved both into the internal memory
1931        // and the database.
1932        let next = sync.next().await;
1933        assert_matches!(next, Some(Ok(_update_summary)));
1934
1935        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1936
1937        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1938            .await?
1939            .expect("must have restored fields");
1940
1941        // While it has been saved into the database, it's not necessarily going to be
1942        // used later!
1943        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1944
1945        // Another process modifies the stream position under our feet...
1946        {
1947            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1948
1949            let mut position_guard = other_sync.inner.position.lock().await;
1950            position_guard.pos = Some("42".to_owned());
1951
1952            other_sync.cache_to_storage(&position_guard).await?;
1953        }
1954
1955        // It's alright, the next request will load it from the database.
1956        {
1957            let (request, _, _) =
1958                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1959            assert_eq!(request.pos.as_deref(), Some("42"));
1960            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1961        }
1962
1963        // Recreating a sliding sync with the same ID will reload it too.
1964        {
1965            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1966            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1967
1968            let (request, _, _) =
1969                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1970            assert_eq!(request.pos.as_deref(), Some("42"));
1971        }
1972
1973        // Invalidating the session will remove the in-memory value AND the database
1974        // value.
1975        sliding_sync.expire_session().await;
1976
1977        {
1978            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1979
1980            let (request, _, _) =
1981                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1982            assert!(request.pos.is_none());
1983        }
1984
1985        // And new sliding syncs with the same ID won't find it either.
1986        {
1987            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1988            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1989
1990            let (request, _, _) =
1991                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1992            assert!(request.pos.is_none());
1993        }
1994
1995        Ok(())
1996    }
1997
1998    #[async_test]
1999    async fn test_stop_sync_loop() -> Result<()> {
2000        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2001            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2002        .await?;
2003
2004        // Start the sync loop.
2005        let stream = sliding_sync.sync();
2006        pin_mut!(stream);
2007
2008        // The sync loop is actually running.
2009        assert!(stream.next().await.is_some());
2010
2011        // Stop the sync loop.
2012        sliding_sync.stop_sync()?;
2013
2014        // The sync loop is actually stopped.
2015        assert!(stream.next().await.is_none());
2016
2017        // Start a new sync loop.
2018        let stream = sliding_sync.sync();
2019        pin_mut!(stream);
2020
2021        // The sync loop is actually running.
2022        assert!(stream.next().await.is_some());
2023
2024        Ok(())
2025    }
2026
2027    #[async_test]
2028    async fn test_process_read_receipts() -> Result<()> {
2029        let room = owned_room_id!("!pony:example.org");
2030
2031        let server = MockServer::start().await;
2032        let client = logged_in_client(Some(server.uri())).await;
2033        client.event_cache().subscribe().unwrap();
2034
2035        let sliding_sync = client
2036            .sliding_sync("test")?
2037            .with_receipt_extension(
2038                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2039            )
2040            .add_list(
2041                SlidingSyncList::builder("all")
2042                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2043            )
2044            .build()
2045            .await?;
2046
2047        // Initial state.
2048        {
2049            let server_response = assign!(http::Response::new("0".to_owned()), {
2050                rooms: BTreeMap::from([(
2051                    room.clone(),
2052                    http::response::Room::default(),
2053                )])
2054            });
2055
2056            let _summary = {
2057                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2058                sliding_sync
2059                    .handle_response(
2060                        server_response.clone(),
2061                        &mut pos_guard,
2062                        RequestedRequiredStates::default(),
2063                    )
2064                    .await?
2065            };
2066        }
2067
2068        let server_response = assign!(http::Response::new("1".to_owned()), {
2069            extensions: assign!(http::response::Extensions::default(), {
2070                receipts: assign!(http::response::Receipts::default(), {
2071                    rooms: BTreeMap::from([
2072                        (
2073                            room.clone(),
2074                            Raw::from_json_string(
2075                                json!({
2076                                    "room_id": room,
2077                                    "type": "m.receipt",
2078                                    "content": {
2079                                        "$event:bar.org": {
2080                                            "m.read": {
2081                                                client.user_id().unwrap(): {
2082                                                    "ts": 1436451550,
2083                                                }
2084                                            }
2085                                        }
2086                                    }
2087                                })
2088                                .to_string(),
2089                            ).unwrap()
2090                        )
2091                    ])
2092                })
2093            })
2094        });
2095
2096        let summary = {
2097            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2098            sliding_sync
2099                .handle_response(
2100                    server_response.clone(),
2101                    &mut pos_guard,
2102                    RequestedRequiredStates::default(),
2103                )
2104                .await?
2105        };
2106
2107        assert!(summary.rooms.contains(&room));
2108
2109        Ok(())
2110    }
2111
2112    #[async_test]
2113    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2114        let room_id = owned_room_id!("!unicorn:example.org");
2115
2116        let server = MockServer::start().await;
2117        let client = logged_in_client(Some(server.uri())).await;
2118
2119        // Setup sliding sync with with one room and one list
2120
2121        let sliding_sync = client
2122            .sliding_sync("test")?
2123            .with_account_data_extension(
2124                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2125            )
2126            .add_list(
2127                SlidingSyncList::builder("all")
2128                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2129            )
2130            .build()
2131            .await?;
2132
2133        // Initial state.
2134        {
2135            let server_response = assign!(http::Response::new("0".to_owned()), {
2136                rooms: BTreeMap::from([(
2137                    room_id.clone(),
2138                    http::response::Room::default(),
2139                )])
2140            });
2141
2142            let _summary = {
2143                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2144                sliding_sync
2145                    .handle_response(
2146                        server_response.clone(),
2147                        &mut pos_guard,
2148                        RequestedRequiredStates::default(),
2149                    )
2150                    .await?
2151            };
2152        }
2153
2154        // Simulate a response that only changes the marked unread state of the room to
2155        // true
2156
2157        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2158
2159        let update_summary = {
2160            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2161            sliding_sync
2162                .handle_response(
2163                    server_response.clone(),
2164                    &mut pos_guard,
2165                    RequestedRequiredStates::default(),
2166                )
2167                .await?
2168        };
2169
2170        // Check that the list list and entry received the update
2171
2172        assert!(update_summary.rooms.contains(&room_id));
2173
2174        let room = client.get_room(&room_id).unwrap();
2175
2176        // Check the actual room data, this powers RoomInfo
2177
2178        assert!(room.is_marked_unread());
2179
2180        // Change it back to false and check if it updates
2181
2182        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2183
2184        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2185        sliding_sync
2186            .handle_response(
2187                server_response.clone(),
2188                &mut pos_guard,
2189                RequestedRequiredStates::default(),
2190            )
2191            .await?;
2192
2193        let room = client.get_room(&room_id).unwrap();
2194
2195        assert!(!room.is_marked_unread());
2196
2197        Ok(())
2198    }
2199
2200    fn make_mark_unread_response(
2201        response_number: &str,
2202        room_id: OwnedRoomId,
2203        unread: bool,
2204        add_rooms_section: bool,
2205    ) -> http::Response {
2206        let rooms = if add_rooms_section {
2207            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2208        } else {
2209            BTreeMap::new()
2210        };
2211
2212        let extensions = assign!(http::response::Extensions::default(), {
2213            account_data: assign!(http::response::AccountData::default(), {
2214                rooms: BTreeMap::from([
2215                    (
2216                        room_id,
2217                        vec![
2218                            Raw::from_json_string(
2219                                json!({
2220                                    "content": {
2221                                        "unread": unread
2222                                    },
2223                                    "type": "m.marked_unread"
2224                                })
2225                                .to_string(),
2226                            ).unwrap()
2227                        ]
2228                    )
2229                ])
2230            })
2231        });
2232
2233        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2234    }
2235
2236    #[async_test]
2237    async fn test_process_rooms_account_data() -> Result<()> {
2238        let room = owned_room_id!("!pony:example.org");
2239
2240        let server = MockServer::start().await;
2241        let client = logged_in_client(Some(server.uri())).await;
2242
2243        let sliding_sync = client
2244            .sliding_sync("test")?
2245            .with_account_data_extension(
2246                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2247            )
2248            .add_list(
2249                SlidingSyncList::builder("all")
2250                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2251            )
2252            .build()
2253            .await?;
2254
2255        // Initial state.
2256        {
2257            let server_response = assign!(http::Response::new("0".to_owned()), {
2258                rooms: BTreeMap::from([(
2259                    room.clone(),
2260                    http::response::Room::default(),
2261                )])
2262            });
2263
2264            let _summary = {
2265                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2266                sliding_sync
2267                    .handle_response(
2268                        server_response.clone(),
2269                        &mut pos_guard,
2270                        RequestedRequiredStates::default(),
2271                    )
2272                    .await?
2273            };
2274        }
2275
2276        let server_response = assign!(http::Response::new("1".to_owned()), {
2277            extensions: assign!(http::response::Extensions::default(), {
2278                account_data: assign!(http::response::AccountData::default(), {
2279                    rooms: BTreeMap::from([
2280                        (
2281                            room.clone(),
2282                            vec![
2283                                Raw::from_json_string(
2284                                    json!({
2285                                        "content": {
2286                                            "tags": {
2287                                                "u.work": {
2288                                                    "order": 0.9
2289                                                }
2290                                            }
2291                                        },
2292                                        "type": "m.tag"
2293                                    })
2294                                    .to_string(),
2295                                ).unwrap()
2296                            ]
2297                        )
2298                    ])
2299                })
2300            })
2301        });
2302        let summary = {
2303            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2304            sliding_sync
2305                .handle_response(
2306                    server_response.clone(),
2307                    &mut pos_guard,
2308                    RequestedRequiredStates::default(),
2309                )
2310                .await?
2311        };
2312
2313        assert!(summary.rooms.contains(&room));
2314
2315        Ok(())
2316    }
2317
2318    #[async_test]
2319    #[cfg(feature = "e2e-encryption")]
2320    async fn test_process_only_encryption_events() -> Result<()> {
2321        use ruma::OneTimeKeyAlgorithm;
2322
2323        let room = owned_room_id!("!croissant:example.org");
2324
2325        let server = MockServer::start().await;
2326        let client = logged_in_client(Some(server.uri())).await;
2327
2328        let server_response = assign!(http::Response::new("0".to_owned()), {
2329            rooms: BTreeMap::from([(
2330                room.clone(),
2331                assign!(http::response::Room::default(), {
2332                    name: Some("Croissants lovers".to_owned()),
2333                    timeline: Vec::new(),
2334                }),
2335            )]),
2336
2337            extensions: assign!(http::response::Extensions::default(), {
2338                e2ee: assign!(http::response::E2EE::default(), {
2339                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2340                }),
2341                to_device: Some(assign!(http::response::ToDevice::default(), {
2342                    next_batch: "to-device-token".to_owned(),
2343                })),
2344            })
2345        });
2346
2347        // Don't process non-encryption events if the sliding sync is configured for
2348        // encryption only.
2349
2350        let sliding_sync = client
2351            .sliding_sync("test")?
2352            .with_to_device_extension(
2353                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2354            )
2355            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2356            .build()
2357            .await?;
2358
2359        {
2360            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2361
2362            sliding_sync
2363                .handle_response(
2364                    server_response.clone(),
2365                    &mut position_guard,
2366                    RequestedRequiredStates::default(),
2367                )
2368                .await?;
2369        }
2370
2371        // E2EE has been properly handled.
2372        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2373        assert_eq!(uploaded_key_count, 42);
2374
2375        {
2376            let olm_machine = &*client.olm_machine_for_testing().await;
2377            assert_eq!(
2378                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2379                Some("to-device-token")
2380            );
2381        }
2382
2383        // Room events haven't.
2384        assert!(client.get_room(&room).is_none());
2385
2386        // Conversely, only process room lists events if the sliding sync was configured
2387        // as so.
2388        let client = logged_in_client(Some(server.uri())).await;
2389
2390        let sliding_sync = client
2391            .sliding_sync("test")?
2392            .add_list(SlidingSyncList::builder("thelist"))
2393            .build()
2394            .await?;
2395
2396        {
2397            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2398
2399            sliding_sync
2400                .handle_response(
2401                    server_response.clone(),
2402                    &mut position_guard,
2403                    RequestedRequiredStates::default(),
2404                )
2405                .await?;
2406        }
2407
2408        // E2EE response has been ignored.
2409        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2410        assert_eq!(uploaded_key_count, 0);
2411
2412        {
2413            let olm_machine = &*client.olm_machine_for_testing().await;
2414            assert_eq!(
2415                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2416                None
2417            );
2418        }
2419
2420        // The room is now known.
2421        assert!(client.get_room(&room).is_some());
2422
2423        // And it's also possible to set up both.
2424        let client = logged_in_client(Some(server.uri())).await;
2425
2426        let sliding_sync = client
2427            .sliding_sync("test")?
2428            .add_list(SlidingSyncList::builder("thelist"))
2429            .with_to_device_extension(
2430                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2431            )
2432            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2433            .build()
2434            .await?;
2435
2436        {
2437            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2438
2439            sliding_sync
2440                .handle_response(
2441                    server_response.clone(),
2442                    &mut position_guard,
2443                    RequestedRequiredStates::default(),
2444                )
2445                .await?;
2446        }
2447
2448        // E2EE has been properly handled.
2449        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2450        assert_eq!(uploaded_key_count, 42);
2451
2452        {
2453            let olm_machine = &*client.olm_machine_for_testing().await;
2454            assert_eq!(
2455                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2456                Some("to-device-token")
2457            );
2458        }
2459
2460        // The room is now known.
2461        assert!(client.get_room(&room).is_some());
2462
2463        Ok(())
2464    }
2465
2466    #[async_test]
2467    async fn test_lock_multiple_requests() -> Result<()> {
2468        let server = MockServer::start().await;
2469        let client = logged_in_client(Some(server.uri())).await;
2470
2471        let pos = Arc::new(Mutex::new(0));
2472        let _mock_guard = Mock::given(SlidingSyncMatcher)
2473            .respond_with(move |_: &Request| {
2474                let mut pos = pos.lock().unwrap();
2475                *pos += 1;
2476                ResponseTemplate::new(200).set_body_json(json!({
2477                    "pos": pos.to_string(),
2478                    "lists": {},
2479                    "rooms": {}
2480                }))
2481            })
2482            .mount_as_scoped(&server)
2483            .await;
2484
2485        let sliding_sync = client
2486            .sliding_sync("test")?
2487            .with_to_device_extension(
2488                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2489            )
2490            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2491            .build()
2492            .await?;
2493
2494        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2495        // test would never terminate.
2496        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2497
2498        for result in requests.await {
2499            result?;
2500        }
2501
2502        Ok(())
2503    }
2504
2505    #[async_test]
2506    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2507        let server = MockServer::start().await;
2508        let client = logged_in_client(Some(server.uri())).await;
2509
2510        let pos = Arc::new(Mutex::new(0));
2511        let _mock_guard = Mock::given(SlidingSyncMatcher)
2512            .respond_with(move |_: &Request| {
2513                let mut pos = pos.lock().unwrap();
2514                *pos += 1;
2515                // Respond slowly enough that we can skip one iteration.
2516                ResponseTemplate::new(200)
2517                    .set_body_json(json!({
2518                        "pos": pos.to_string(),
2519                        "lists": {},
2520                        "rooms": {}
2521                    }))
2522                    .set_delay(Duration::from_secs(2))
2523            })
2524            .mount_as_scoped(&server)
2525            .await;
2526
2527        let sliding_sync =
2528            client
2529                .sliding_sync("test")?
2530                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2531                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2532                ))
2533                .add_list(
2534                    SlidingSyncList::builder("another-list")
2535                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2536                )
2537                .build()
2538                .await?;
2539
2540        let stream = sliding_sync.sync();
2541        pin_mut!(stream);
2542
2543        let cloned_sync = sliding_sync.clone();
2544        spawn(async move {
2545            tokio::time::sleep(Duration::from_millis(100)).await;
2546
2547            cloned_sync
2548                .on_list("another-list", |list| {
2549                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2550                    ready(())
2551                })
2552                .await;
2553        });
2554
2555        assert_matches!(stream.next().await, Some(Ok(_)));
2556
2557        sliding_sync.stop_sync().unwrap();
2558
2559        assert_matches!(stream.next().await, None);
2560
2561        let mut num_requests = 0;
2562
2563        for request in server.received_requests().await.unwrap() {
2564            if !SlidingSyncMatcher.matches(&request) {
2565                continue;
2566            }
2567
2568            let another_list_ranges = if num_requests == 0 {
2569                // First request
2570                json!([[0, 10]])
2571            } else {
2572                // Second request
2573                json!([[10, 20]])
2574            };
2575
2576            num_requests += 1;
2577            assert!(num_requests <= 2, "more than one request hit the server");
2578
2579            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2580
2581            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2582                &json_value,
2583                &json!({
2584                    "conn_id": "test",
2585                    "lists": {
2586                        "room-list": {
2587                            "ranges": [[0, 9]],
2588                            "required_state": [
2589                                ["m.room.encryption", ""],
2590                                ["m.room.tombstone", ""]
2591                            ],
2592                        },
2593                        "another-list": {
2594                            "ranges": another_list_ranges,
2595                            "required_state": [
2596                                ["m.room.encryption", ""],
2597                                ["m.room.tombstone", ""]
2598                            ],
2599                        },
2600                    }
2601                }),
2602                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2603            ) {
2604                dbg!(json_value);
2605                panic!("json differ: {err}");
2606            }
2607        }
2608
2609        assert_eq!(num_requests, 2);
2610
2611        Ok(())
2612    }
2613
2614    #[async_test]
2615    async fn test_timeout_zero_list() -> Result<()> {
2616        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2617
2618        let (request, _, _) =
2619            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2620
2621        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2622        // on new update to pop.
2623        assert!(request.timeout.is_some());
2624
2625        Ok(())
2626    }
2627
2628    #[async_test]
2629    async fn test_timeout_one_list() -> Result<()> {
2630        let (_server, sliding_sync) = new_sliding_sync(vec![
2631            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2632        ])
2633        .await?;
2634
2635        let (request, _, _) =
2636            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2637
2638        // The list does not require a timeout.
2639        assert!(request.timeout.is_none());
2640
2641        // Simulate a response.
2642        {
2643            let server_response = assign!(http::Response::new("0".to_owned()), {
2644                lists: BTreeMap::from([(
2645                    "foo".to_owned(),
2646                    assign!(http::response::List::default(), {
2647                        count: uint!(7),
2648                    })
2649                 )])
2650            });
2651
2652            let _summary = {
2653                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2654                sliding_sync
2655                    .handle_response(
2656                        server_response.clone(),
2657                        &mut pos_guard,
2658                        RequestedRequiredStates::default(),
2659                    )
2660                    .await?
2661            };
2662        }
2663
2664        let (request, _, _) =
2665            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2666
2667        // The list is now fully loaded, so it requires a timeout.
2668        assert!(request.timeout.is_some());
2669
2670        Ok(())
2671    }
2672
2673    #[async_test]
2674    async fn test_timeout_three_lists() -> Result<()> {
2675        let (_server, sliding_sync) = new_sliding_sync(vec![
2676            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2677            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2678            SlidingSyncList::builder("baz")
2679                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2680        ])
2681        .await?;
2682
2683        let (request, _, _) =
2684            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2685
2686        // Two lists don't require a timeout.
2687        assert!(request.timeout.is_none());
2688
2689        // Simulate a response.
2690        {
2691            let server_response = assign!(http::Response::new("0".to_owned()), {
2692                lists: BTreeMap::from([(
2693                    "foo".to_owned(),
2694                    assign!(http::response::List::default(), {
2695                        count: uint!(7),
2696                    })
2697                 )])
2698            });
2699
2700            let _summary = {
2701                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2702                sliding_sync
2703                    .handle_response(
2704                        server_response.clone(),
2705                        &mut pos_guard,
2706                        RequestedRequiredStates::default(),
2707                    )
2708                    .await?
2709            };
2710        }
2711
2712        let (request, _, _) =
2713            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2714
2715        // One don't require a timeout.
2716        assert!(request.timeout.is_none());
2717
2718        // Simulate a response.
2719        {
2720            let server_response = assign!(http::Response::new("1".to_owned()), {
2721                lists: BTreeMap::from([(
2722                    "bar".to_owned(),
2723                    assign!(http::response::List::default(), {
2724                        count: uint!(7),
2725                    })
2726                 )])
2727            });
2728
2729            let _summary = {
2730                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2731                sliding_sync
2732                    .handle_response(
2733                        server_response.clone(),
2734                        &mut pos_guard,
2735                        RequestedRequiredStates::default(),
2736                    )
2737                    .await?
2738            };
2739        }
2740
2741        let (request, _, _) =
2742            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2743
2744        // All lists require a timeout.
2745        assert!(request.timeout.is_some());
2746
2747        Ok(())
2748    }
2749
2750    #[async_test]
2751    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2752        let server = MockServer::start().await;
2753        let client = logged_in_client(Some(server.uri())).await;
2754
2755        let _mock_guard = Mock::given(SlidingSyncMatcher)
2756            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2757                "pos": "0",
2758                "lists": {},
2759                "rooms": {}
2760            })))
2761            .mount_as_scoped(&server)
2762            .await;
2763
2764        let sliding_sync = client
2765            .sliding_sync("test")?
2766            .with_to_device_extension(
2767                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2768            )
2769            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2770            .build()
2771            .await?;
2772
2773        let sliding_sync = Arc::new(sliding_sync);
2774
2775        // Create the listener and perform a sync request
2776        let sync_beat_listener = client.inner.sync_beat.listen();
2777        sliding_sync.sync_once().await?;
2778
2779        // The sync beat listener should be notified shortly after
2780        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2781        Ok(())
2782    }
2783
2784    #[async_test]
2785    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2786        let server = MockServer::start().await;
2787        let client = logged_in_client(Some(server.uri())).await;
2788
2789        let _mock_guard = Mock::given(SlidingSyncMatcher)
2790            .respond_with(ResponseTemplate::new(404))
2791            .mount_as_scoped(&server)
2792            .await;
2793
2794        let sliding_sync = client
2795            .sliding_sync("test")?
2796            .with_to_device_extension(
2797                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2798            )
2799            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2800            .build()
2801            .await?;
2802
2803        let sliding_sync = Arc::new(sliding_sync);
2804
2805        // Create the listener and perform a sync request
2806        let sync_beat_listener = client.inner.sync_beat.listen();
2807        let sync_result = sliding_sync.sync_once().await;
2808        assert!(sync_result.is_err());
2809
2810        // The sync beat listener won't be notified in this case
2811        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2812
2813        Ok(())
2814    }
2815
2816    #[async_test]
2817    async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2818        let server = MatrixMockServer::new().await;
2819        let client = server.client_builder().build().await;
2820        let room_id = room_id!("!mu5hr00m:example.org");
2821
2822        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2823            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2824                "pos": "0",
2825                "lists": {},
2826                "extensions": {
2827                    "account_data": {
2828                        "global": [
2829                            {
2830                                "type": "m.direct",
2831                                "content": {
2832                                    "@de4dlockh0lmes:example.org": [
2833                                        "!mu5hr00m:example.org"
2834                                    ]
2835                                }
2836                            }
2837                        ]
2838                    }
2839                },
2840                "rooms": {
2841                    room_id: {
2842                        "name": "Mario Bros Fanbase Room",
2843                        "initial": true,
2844                    },
2845                }
2846            })))
2847            .mount_as_scoped(server.server())
2848            .await;
2849
2850        let f = EventFactory::new().room(room_id);
2851
2852        Mock::given(method("GET"))
2853            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2854            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2855                "chunk": [
2856                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2857                ]
2858            })))
2859            .mount(server.server())
2860            .await;
2861
2862        let (tx, rx) = tokio::sync::oneshot::channel();
2863
2864        let tx = Arc::new(Mutex::new(Some(tx)));
2865        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2866            // Try to run a /members query while in a event handler.
2867            let members =
2868                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2869            assert_eq!(members.len(), 1);
2870            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2871        });
2872
2873        let sliding_sync = client
2874            .sliding_sync("test")?
2875            .add_list(SlidingSyncList::builder("thelist"))
2876            .with_account_data_extension(
2877                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2878            )
2879            .build()
2880            .await?;
2881
2882        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2883            .await
2884            .expect("Sync did not complete in time")
2885            .expect("Sync failed");
2886
2887        // Wait for the event handler to complete.
2888        tokio::time::timeout(Duration::from_secs(5), rx)
2889            .await
2890            .expect("Event handler did not complete in time")
2891            .expect("Event handler failed");
2892
2893        Ok(())
2894    }
2895}