matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26    collections::{BTreeMap, btree_map::Entry},
27    fmt::Debug,
28    future::Future,
29    sync::{Arc, RwLock as StdRwLock},
30    time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41    OwnedRoomId, RoomId,
42    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
43    assign,
44};
45use tokio::{
46    select,
47    sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
48};
49use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53    cache::restore_sliding_sync_state,
54    client::SlidingSyncResponseProcessor,
55    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{Client, Result, config::RequestConfig};
58
59/// The Sliding Sync instance.
60///
61/// It is OK to clone this type as much as you need: cloning it is cheap.
62#[derive(Clone, Debug)]
63pub struct SlidingSync {
64    /// The Sliding Sync data.
65    inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70    /// A unique identifier for this instance of sliding sync.
71    ///
72    /// Used to distinguish different connections to sliding sync.
73    id: String,
74
75    /// The HTTP Matrix client.
76    client: Client,
77
78    /// Long-polling timeout that appears in sliding sync request.
79    poll_timeout: Duration,
80
81    /// Extra duration for the sliding sync request to timeout. This is added to
82    /// the [`Self::poll_timeout`].
83    network_timeout: Duration,
84
85    /// The storage key to keep this cache at and load it from.
86    storage_key: String,
87
88    /// Should this sliding sync instance try to restore its sync position
89    /// from the database?
90    ///
91    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
92    /// keep it even so, to avoid sparkling cfg statements everywhere
93    /// throughout this file.
94    share_pos: bool,
95
96    /// Position markers.
97    ///
98    /// The `pos` marker represents a progression when exchanging requests and
99    /// responses with the server: the server acknowledges the request by
100    /// responding with a new `pos`. If the client sends two non-necessarily
101    /// consecutive requests with the same `pos`, the server has to reply with
102    /// the same identical response.
103    ///
104    /// `position` is behind a mutex so that a new request starts after the
105    /// previous request trip has fully ended (successfully or not). This
106    /// mechanism exists to wait for the response to be handled and to see the
107    /// `position` being updated, before sending a new request.
108    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110    /// The lists of this Sliding Sync instance.
111    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113    /// Request parameters that are sticky.
114    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    /// Whether the current sliding sync instance has set a sync position
127    /// marker.
128    pub async fn has_pos(&self) -> bool {
129        self.inner.position.lock().await.pos.is_some()
130    }
131
132    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
133        cache::store_sliding_sync_state(self, position).await
134    }
135
136    /// Create a new [`SlidingSyncBuilder`].
137    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
138        SlidingSyncBuilder::new(id, client)
139    }
140
141    /// Subscribe to many rooms.
142    ///
143    /// If the associated `Room`s exist, it will be marked as
144    /// members are missing, so that it ensures to re-fetch all members.
145    ///
146    /// A subscription to an already subscribed room is ignored.
147    pub fn subscribe_to_rooms(
148        &self,
149        room_ids: &[&RoomId],
150        settings: Option<http::request::RoomSubscription>,
151        cancel_in_flight_request: bool,
152    ) {
153        let settings = settings.unwrap_or_default();
154        let mut sticky = self.inner.sticky.write().unwrap();
155        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
156
157        let mut skip_over_current_sync_loop_iteration = false;
158
159        for room_id in room_ids {
160            // If the room subscription already exists, let's not
161            // override it with a new one. First, it would reset its
162            // state (`RoomSubscriptionState`), and second it would try to
163            // re-subscribe with the next request. We don't want that. A room
164            // subscription should happen once, and next subscriptions should
165            // be ignored.
166            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
167                if let Some(room) = self.inner.client.get_room(room_id) {
168                    room.mark_members_missing();
169                }
170
171                entry.insert((RoomSubscriptionState::default(), settings.clone()));
172
173                skip_over_current_sync_loop_iteration = true;
174            }
175        }
176
177        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
178            self.inner.internal_channel_send_if_possible(
179                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
180            );
181        }
182    }
183
184    /// Find a list by its name, and do something on it if it exists.
185    pub async fn on_list<Function, FunctionOutput, R>(
186        &self,
187        list_name: &str,
188        function: Function,
189    ) -> Option<R>
190    where
191        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
192        FunctionOutput: Future<Output = R>,
193    {
194        let lists = self.inner.lists.read().await;
195
196        match lists.get(list_name) {
197            Some(list) => Some(function(list).await),
198            None => None,
199        }
200    }
201
202    /// Add the list to the list of lists.
203    ///
204    /// As lists need to have a unique `.name`, if a list with the same name
205    /// is found the new list will replace the old one and the return it or
206    /// `None`.
207    pub async fn add_list(
208        &self,
209        list_builder: SlidingSyncListBuilder,
210    ) -> Result<Option<SlidingSyncList>> {
211        let list = list_builder.build(self.inner.internal_channel.clone());
212
213        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
214
215        self.inner.internal_channel_send_if_possible(
216            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
217        );
218
219        Ok(old_list)
220    }
221
222    /// Add a list that will be cached and reloaded from the cache.
223    ///
224    /// This will raise an error if a storage key was not set, or if there
225    /// was a I/O error reading from the cache.
226    ///
227    /// The rest of the semantics is the same as [`Self::add_list`].
228    pub async fn add_cached_list(
229        &self,
230        mut list_builder: SlidingSyncListBuilder,
231    ) -> Result<Option<SlidingSyncList>> {
232        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
233
234        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
235
236        self.add_list(list_builder).await
237    }
238
239    /// Handle the HTTP response.
240    #[instrument(skip_all)]
241    async fn handle_response(
242        &self,
243        mut sliding_sync_response: http::Response,
244        position: &mut SlidingSyncPositionMarkers,
245        requested_required_states: RequestedRequiredStates,
246    ) -> Result<UpdateSummary, crate::Error> {
247        let pos = Some(sliding_sync_response.pos.clone());
248
249        let must_process_rooms_response = self.must_process_rooms_response().await;
250
251        trace!(yes = must_process_rooms_response, "Must process rooms response?");
252
253        // Transform a Sliding Sync Response to a `SyncResponse`.
254        //
255        // We may not need the `sync_response` in the future (once `SyncResponse` will
256        // move to Sliding Sync, i.e. to `http::Response`), but processing the
257        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
258        // happens here.
259
260        let sync_response = {
261            let _timer = timer!("response processor");
262
263            let response_processor = {
264                // Take the lock to avoid concurrent sliding syncs overwriting each other's room
265                // infos.
266                let _sync_lock = {
267                    let _timer = timer!("acquiring the `sync_lock`");
268
269                    self.inner.client.base_client().sync_lock().lock().await
270                };
271
272                let mut response_processor =
273                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
274
275                // Process thread subscriptions if they're available.
276                //
277                // It's important to do this *before* handling the room responses, so that
278                // notifications can be properly generated based on the thread subscriptions,
279                // for the events in threads we've subscribed to.
280                if self.is_thread_subscriptions_enabled() {
281                    response_processor
282                        .handle_thread_subscriptions(
283                            position.pos.as_deref(),
284                            std::mem::take(
285                                &mut sliding_sync_response.extensions.thread_subscriptions,
286                            ),
287                        )
288                        .await?;
289                }
290
291                #[cfg(feature = "e2e-encryption")]
292                if self.is_e2ee_enabled() {
293                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
294                }
295
296                // Only handle the room's subsection of the response, if this sliding sync was
297                // configured to do so.
298                if must_process_rooms_response {
299                    response_processor
300                        .handle_room_response(&sliding_sync_response, &requested_required_states)
301                        .await?;
302                }
303
304                response_processor
305            };
306
307            // Release the lock before calling event handlers
308            response_processor.process_and_take_response().await?
309        };
310
311        debug!("Sliding Sync response has been handled by the client");
312        trace!(?sync_response);
313
314        // Commit sticky parameters, if needed.
315        if let Some(ref txn_id) = sliding_sync_response.txn_id {
316            let txn_id = txn_id.as_str().into();
317            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
318            let mut lists = self.inner.lists.write().await;
319            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
320        }
321
322        let update_summary = {
323            // Update the rooms.
324            let updated_rooms = {
325                let mut updated_rooms = Vec::with_capacity(
326                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
327                );
328
329                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
330
331                // There might be other rooms that were only mentioned in the sliding sync
332                // extensions part of the response, and thus would result in rooms present in
333                // the `sync_response.joined`. Mark them as updated too.
334                //
335                // Since we've removed rooms that were in the room subsection from
336                // `sync_response.rooms.joined`, the remaining ones aren't already present in
337                // `updated_rooms` and wouldn't cause any duplicates.
338                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
339
340                updated_rooms
341            };
342
343            // Update the lists.
344            let updated_lists = {
345                debug!(
346                    lists = ?sliding_sync_response.lists,
347                    "Update lists"
348                );
349
350                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
351                let mut lists = self.inner.lists.write().await;
352
353                // Iterate on known lists, not on lists in the response. Rooms may have been
354                // updated that were not involved in any list update.
355                for (name, list) in lists.iter_mut() {
356                    if let Some(updates) = sliding_sync_response.lists.get(name) {
357                        let maximum_number_of_rooms: u32 =
358                            updates.count.try_into().expect("failed to convert `count` to `u32`");
359
360                        if list.update(Some(maximum_number_of_rooms))? {
361                            updated_lists.push(name.clone());
362                        }
363                    } else if list.update(None)? {
364                        updated_lists.push(name.clone());
365                    }
366                }
367
368                // Report about unknown lists.
369                for name in sliding_sync_response.lists.keys() {
370                    if !lists.contains_key(name) {
371                        error!("Response for list `{name}` - unknown to us; skipping");
372                    }
373                }
374
375                updated_lists
376            };
377
378            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
379        };
380
381        // Everything went well, we can update the position markers.
382        //
383        // Save the new position markers.
384        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
385
386        position.pos = pos;
387
388        Ok(update_summary)
389    }
390
391    #[instrument(skip_all)]
392    async fn generate_sync_request(
393        &self,
394        txn_id: &mut LazyTransactionId,
395    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
396        // Collect requests for lists.
397        let mut requests_lists = BTreeMap::new();
398
399        let require_timeout = {
400            let lists = self.inner.lists.read().await;
401
402            // Start at `true` in case there is zero list.
403            let mut require_timeout = true;
404
405            for (name, list) in lists.iter() {
406                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
407                require_timeout = require_timeout && list.requires_timeout();
408            }
409
410            require_timeout
411        };
412
413        // Collect the `pos`.
414        //
415        // Wait on the `position` mutex to be available. It means no request nor
416        // response is running. The `position` mutex is released whether the response
417        // has been fully handled successfully, in this case the `pos` is updated, or
418        // the response handling has failed, in this case the `pos` hasn't been updated
419        // and the same `pos` will be used for this new request.
420        let mut position_guard = {
421            debug!("Waiting to acquire the `position` lock");
422
423            let _timer = timer!("acquiring the `position` lock");
424
425            self.inner.position.clone().lock_owned().await
426        };
427
428        debug!(pos = ?position_guard.pos, "Got a position");
429
430        let to_device_enabled =
431            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
432
433        let restored_fields = if self.inner.share_pos || to_device_enabled {
434            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
435        } else {
436            None
437        };
438
439        // Update pos: either the one restored from the database, if any and the sliding
440        // sync was configured so, or read it from the memory cache.
441        let pos = if self.inner.share_pos {
442            if let Some(fields) = &restored_fields {
443                // Override the memory one with the database one, for consistency.
444                if fields.pos != position_guard.pos {
445                    info!(
446                        "Pos from previous request ('{:?}') was different from \
447                         pos in database ('{:?}').",
448                        position_guard.pos, fields.pos
449                    );
450                    position_guard.pos = fields.pos.clone();
451                }
452                fields.pos.clone()
453            } else {
454                position_guard.pos.clone()
455            }
456        } else {
457            position_guard.pos.clone()
458        };
459
460        Span::current().record("pos", &pos);
461
462        // When the client sends a request with no `pos`, MSC4186 returns no device
463        // lists updates, as it only returns changes since the provided `pos`
464        // (which is `null` in this case); this is in line with sync v2.
465        //
466        // Therefore, with MSC4186, the device list cache must be marked as to be
467        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
468        // device lists updates that happened between the previous request and the new
469        // “initial” request.
470        #[cfg(feature = "e2e-encryption")]
471        if pos.is_none() && self.is_e2ee_enabled() {
472            info!("Marking all tracked users as dirty");
473
474            let olm_machine = self.inner.client.olm_machine().await;
475            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
476            olm_machine.mark_all_tracked_users_as_dirty().await?;
477        }
478
479        // Configure the timeout.
480        //
481        // The `timeout` query is necessary when all lists require it. Please see
482        // [`SlidingSyncList::requires_timeout`].
483        let timeout = require_timeout.then(|| self.inner.poll_timeout);
484
485        let mut request = assign!(http::Request::new(), {
486            conn_id: Some(self.inner.id.clone()),
487            pos,
488            timeout,
489            lists: requests_lists,
490        });
491
492        // Apply sticky parameters, if needs be.
493        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
494
495        // Extensions are now applied (via sticky parameters).
496        //
497        // Override the to-device token if the extension is enabled.
498        if to_device_enabled {
499            request.extensions.to_device.since =
500                restored_fields.and_then(|fields| fields.to_device_token);
501        }
502
503        // Apply the transaction id if one was generated.
504        if let Some(txn_id) = txn_id.get() {
505            request.txn_id = Some(txn_id.to_string());
506        }
507
508        Ok((
509            // The request itself.
510            request,
511            // Configure long-polling. We need some time for the long-poll itself,
512            // and extra time for the network delays.
513            RequestConfig::default()
514                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
515                .retry_limit(3),
516            position_guard,
517        ))
518    }
519
520    /// Send a sliding sync request.
521    ///
522    /// This method contains the sending logic.
523    async fn send_sync_request(
524        &self,
525        request: http::Request,
526        request_config: RequestConfig,
527        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
528    ) -> Result<UpdateSummary> {
529        debug!("Sending request");
530
531        // Prepare the request.
532        let requested_required_states = RequestedRequiredStates::from(&request);
533        let request = self.inner.client.send(request).with_request_config(request_config);
534
535        // Send the request and get a response with end-to-end encryption support.
536        //
537        // Sending the `/sync` request out when end-to-end encryption is enabled means
538        // that we need to also send out any outgoing e2ee related request out
539        // coming from the `OlmMachine::outgoing_requests()` method.
540
541        #[cfg(feature = "e2e-encryption")]
542        let response = {
543            if self.is_e2ee_enabled() {
544                // Here, we need to run 2 things:
545                //
546                // 1. Send the sliding sync request and get a response,
547                // 2. Send the E2EE requests.
548                //
549                // We don't want to use a `join` or `try_join` because we want to fail if and
550                // only if sending the sliding sync request fails. Failing to send the E2EE
551                // requests should just result in a log.
552                //
553                // We also want to give the priority to sliding sync request. E2EE requests are
554                // sent concurrently to the sliding sync request, but the priority is on waiting
555                // a sliding sync response.
556                //
557                // If sending sliding sync request fails, the sending of E2EE requests must be
558                // aborted as soon as possible.
559
560                let client = self.inner.client.clone();
561                let e2ee_uploads = spawn(
562                    async move {
563                        if let Err(error) = client.send_outgoing_requests().await {
564                            error!(?error, "Error while sending outgoing E2EE requests");
565                        }
566                    }
567                    .instrument(Span::current()),
568                )
569                // Ensure that the task is not running in detached mode. It is aborted when it's
570                // dropped.
571                .abort_on_drop();
572
573                // Wait on the sliding sync request success or failure early.
574                let response = request.await?;
575
576                // At this point, if `request` has been resolved successfully, we wait on
577                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
578                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
579                // been dropped, so aborted.
580                e2ee_uploads.await.map_err(|error| Error::JoinError {
581                    task_description: "e2ee_uploads".to_owned(),
582                    error,
583                })?;
584
585                response
586            } else {
587                request.await?
588            }
589        };
590
591        // Send the request and get a response _without_ end-to-end encryption support.
592        #[cfg(not(feature = "e2e-encryption"))]
593        let response = request.await?;
594
595        debug!("Received response");
596
597        // At this point, the request has been sent, and a response has been received.
598        //
599        // We must ensure the handling of the response cannot be stopped/
600        // cancelled. It must be done entirely, otherwise we can have
601        // corrupted/incomplete states for Sliding Sync and other parts of
602        // the code.
603        //
604        // That's why we are running the handling of the response in a spawned
605        // future that cannot be cancelled by anything.
606        let this = self.clone();
607
608        // Spawn a new future to ensure that the code inside this future cannot be
609        // cancelled if this method is cancelled.
610        let future = async move {
611            debug!("Start handling response");
612
613            // In case the task running this future is detached, we must
614            // ensure responses are handled one at a time. At this point we still own
615            // `position_guard`, so we're fine.
616
617            // Handle the response.
618            let updates = this
619                .handle_response(response, &mut position_guard, requested_required_states)
620                .await?;
621
622            this.cache_to_storage(&position_guard).await?;
623
624            // Release the position guard lock.
625            // It means that other responses can be generated and then handled later.
626            drop(position_guard);
627
628            debug!("Done handling response");
629
630            Ok(updates)
631        };
632
633        spawn(future.instrument(Span::current())).await.unwrap()
634    }
635
636    /// Is the e2ee extension enabled for this sliding sync instance?
637    #[cfg(feature = "e2e-encryption")]
638    fn is_e2ee_enabled(&self) -> bool {
639        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
640    }
641
642    /// Is the thread subscriptions extension enabled for this sliding sync
643    /// instance?
644    fn is_thread_subscriptions_enabled(&self) -> bool {
645        self.inner.sticky.read().unwrap().data().extensions.thread_subscriptions.enabled
646            == Some(true)
647    }
648
649    #[cfg(not(feature = "e2e-encryption"))]
650    fn is_e2ee_enabled(&self) -> bool {
651        false
652    }
653
654    /// Should we process the room's subpart of a response?
655    async fn must_process_rooms_response(&self) -> bool {
656        // We consider that we must, if there's any room subscription or there's any
657        // list.
658        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
659            || !self.inner.lists.read().await.is_empty()
660    }
661
662    /// Send a single sliding sync request, and returns the response summary.
663    ///
664    /// Public for testing purposes only.
665    #[doc(hidden)]
666    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
667    pub async fn sync_once(&self) -> Result<UpdateSummary> {
668        let (request, request_config, position_guard) =
669            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
670
671        // Send the request.
672        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
673
674        // Notify a new sync was received.
675        self.inner.client.inner.sync_beat.notify(usize::MAX);
676
677        Ok(summaries)
678    }
679
680    /// Create a _new_ Sliding Sync sync loop.
681    ///
682    /// This method returns a `Stream`, which will send requests and will handle
683    /// responses automatically. Lists and rooms are updated automatically.
684    ///
685    /// This function returns `Ok(…)` if everything went well, otherwise it will
686    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
687    /// termination.
688    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
689    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
690    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
691        debug!("Starting sync stream");
692
693        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
694
695        stream! {
696            loop {
697                debug!("Sync stream is running");
698
699                select! {
700                    biased;
701
702                    internal_message = internal_channel_receiver.recv() => {
703                        use SlidingSyncInternalMessage::*;
704
705                        debug!(?internal_message, "Sync stream has received an internal message");
706
707                        match internal_message {
708                            Err(_) | Ok(SyncLoopStop) => {
709                                break;
710                            }
711
712                            Ok(SyncLoopSkipOverCurrentIteration) => {
713                                continue;
714                            }
715                        }
716                    }
717
718                    update_summary = self.sync_once() => {
719                        match update_summary {
720                            Ok(updates) => {
721                                yield Ok(updates);
722                            }
723
724                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
725                            Err(error) => {
726                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
727                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
728                                    self.expire_session().await;
729                                }
730
731                                yield Err(error);
732
733                                // Terminates the loop, and terminates the stream.
734                                break;
735                            }
736                        }
737                    }
738                }
739            }
740
741            debug!("Sync stream has exited.");
742        }
743    }
744
745    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
746    ///
747    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
748    /// enough to “stop” it, but depending of how this `Stream` is used, it
749    /// might not be obvious to drop it immediately (thinking of using this API
750    /// over FFI; the foreign-language might not be able to drop a value
751    /// immediately). Thus, calling this method will ensure that the sync loop
752    /// stops gracefully and as soon as it returns.
753    pub fn stop_sync(&self) -> Result<()> {
754        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
755    }
756
757    /// Expire the current Sliding Sync session on the client-side.
758    ///
759    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
760    /// sticky parameters.
761    ///
762    /// This should only be used when it's clear that this session was about to
763    /// expire anyways, and should be used only in very specific cases (e.g.
764    /// multiple sliding syncs being run in parallel, and one of them has
765    /// expired).
766    ///
767    /// This method **MUST** be called when the sync loop is stopped.
768    #[doc(hidden)]
769    pub async fn expire_session(&self) {
770        info!("Session expired; resetting `pos` and sticky parameters");
771
772        {
773            let lists = self.inner.lists.read().await;
774            for list in lists.values() {
775                // Invalidate in-memory data that would be persisted on disk.
776                list.set_maximum_number_of_rooms(None);
777
778                // Invalidate the sticky data for this list.
779                list.invalidate_sticky_data();
780            }
781        }
782
783        // Remove the cached sliding sync state as well.
784        {
785            let mut position = self.inner.position.lock().await;
786
787            // Invalidate in memory.
788            position.pos = None;
789
790            // Propagate to disk.
791            // Note: this propagates both the sliding sync state and the cached lists'
792            // state to disk.
793            if let Err(err) = self.cache_to_storage(&position).await {
794                warn!("Failed to invalidate cached sliding sync state: {err}");
795            }
796        }
797
798        {
799            let mut sticky = self.inner.sticky.write().unwrap();
800
801            // Clear all room subscriptions: we don't want to resend all room subscriptions
802            // when the session will restart.
803            sticky.data_mut().room_subscriptions.clear();
804        }
805    }
806}
807
808impl SlidingSyncInner {
809    /// Send a message over the internal channel.
810    #[instrument]
811    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
812        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
813    }
814
815    /// Send a message over the internal channel if there is a receiver, i.e. if
816    /// the sync loop is running.
817    #[instrument]
818    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
819        // If there is no receiver, the send will fail, but that's OK here.
820        let _ = self.internal_channel.send(message);
821    }
822}
823
824#[derive(Copy, Clone, Debug, PartialEq)]
825enum SlidingSyncInternalMessage {
826    /// Instruct the sync loop to stop.
827    SyncLoopStop,
828
829    /// Instruct the sync loop to skip over any remaining work in its iteration,
830    /// and to jump to the next iteration.
831    SyncLoopSkipOverCurrentIteration,
832}
833
834#[cfg(any(test, feature = "testing"))]
835impl SlidingSync {
836    /// Set a new value for `pos`.
837    pub async fn set_pos(&self, new_pos: String) {
838        let mut position_lock = self.inner.position.lock().await;
839        position_lock.pos = Some(new_pos);
840    }
841
842    /// Read the static extension configuration for this Sliding Sync.
843    ///
844    /// Note: this is not the next content of the sticky parameters, but rightly
845    /// the static configuration that was set during creation of this
846    /// Sliding Sync.
847    pub fn extensions_config(&self) -> http::request::Extensions {
848        let sticky = self.inner.sticky.read().unwrap();
849        sticky.data().extensions.clone()
850    }
851}
852
853#[derive(Clone, Debug)]
854pub(super) struct SlidingSyncPositionMarkers {
855    /// An ephemeral position in the current stream, as received from the
856    /// previous `/sync` response, or `None` for the first request.
857    pos: Option<String>,
858}
859
860/// A summary of the updates received after a sync (like in
861/// [`SlidingSync::sync`]).
862#[derive(Debug, Clone)]
863pub struct UpdateSummary {
864    /// The names of the lists that have seen an update.
865    pub lists: Vec<String>,
866    /// The rooms that have seen updates
867    pub rooms: Vec<OwnedRoomId>,
868}
869
870/// A very basic bool-ish enum to represent the state of a
871/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
872/// once should ideally not being sent again, to mostly save bandwidth.
873#[derive(Debug, Default)]
874enum RoomSubscriptionState {
875    /// The `RoomSubscription` has not been sent or received correctly from the
876    /// server, i.e. the `RoomSubscription` —which is part of the sticky
877    /// parameters— has not been committed.
878    #[default]
879    Pending,
880
881    /// The `RoomSubscription` has been sent and received correctly by the
882    /// server.
883    Applied,
884}
885
886/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
887/// sent in the request.
888#[derive(Debug)]
889pub(super) struct SlidingSyncStickyParameters {
890    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
891    /// but one wants to receive updates.
892    room_subscriptions:
893        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
894
895    /// The intended state of the extensions being supplied to sliding /sync
896    /// calls.
897    extensions: http::request::Extensions,
898}
899
900impl SlidingSyncStickyParameters {
901    /// Create a new set of sticky parameters.
902    pub fn new(
903        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
904        extensions: http::request::Extensions,
905    ) -> Self {
906        Self {
907            room_subscriptions: room_subscriptions
908                .into_iter()
909                .map(|(room_id, room_subscription)| {
910                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
911                })
912                .collect(),
913            extensions,
914        }
915    }
916}
917
918impl StickyData for SlidingSyncStickyParameters {
919    type Request = http::Request;
920
921    fn apply(&self, request: &mut Self::Request) {
922        request.room_subscriptions = self
923            .room_subscriptions
924            .iter()
925            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
926            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
927            .collect();
928        request.extensions = self.extensions.clone();
929    }
930
931    fn on_commit(&mut self) {
932        // All room subscriptions are marked as `Applied`.
933        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
934            if matches!(state, RoomSubscriptionState::Pending) {
935                *state = RoomSubscriptionState::Applied;
936            }
937        }
938    }
939}
940
941#[cfg(all(test, not(target_family = "wasm")))]
942#[allow(clippy::dbg_macro)]
943mod tests {
944    use std::{
945        collections::BTreeMap,
946        future::ready,
947        ops::Not,
948        sync::{Arc, Mutex},
949        time::Duration,
950    };
951
952    use assert_matches::assert_matches;
953    use event_listener::Listener;
954    use futures_util::{StreamExt, future::join_all, pin_mut};
955    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
956    use matrix_sdk_common::executor::spawn;
957    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
958    use ruma::{
959        OwnedRoomId, TransactionId,
960        api::client::error::ErrorKind,
961        assign,
962        events::{direct::DirectEvent, room::member::MembershipState},
963        owned_room_id, room_id,
964        serde::Raw,
965        uint,
966    };
967    use serde::Deserialize;
968    use serde_json::json;
969    use wiremock::{
970        Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
971    };
972
973    use super::{
974        SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
975        SlidingSyncStickyParameters, http,
976        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
977    };
978    use crate::{
979        Client, Result,
980        sliding_sync::cache::restore_sliding_sync_state,
981        test_utils::{logged_in_client, mocks::MatrixMockServer},
982    };
983
984    #[derive(Copy, Clone)]
985    struct SlidingSyncMatcher;
986
987    impl Match for SlidingSyncMatcher {
988        fn matches(&self, request: &Request) -> bool {
989            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
990                && request.method == Method::POST
991        }
992    }
993
994    async fn new_sliding_sync(
995        lists: Vec<SlidingSyncListBuilder>,
996    ) -> Result<(MockServer, SlidingSync)> {
997        let server = MockServer::start().await;
998        let client = logged_in_client(Some(server.uri())).await;
999
1000        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1001
1002        for list in lists {
1003            sliding_sync_builder = sliding_sync_builder.add_list(list);
1004        }
1005
1006        let sliding_sync = sliding_sync_builder.build().await?;
1007
1008        Ok((server, sliding_sync))
1009    }
1010
1011    #[async_test]
1012    async fn test_subscribe_to_rooms() -> Result<()> {
1013        let (server, sliding_sync) = new_sliding_sync(vec![
1014            SlidingSyncList::builder("foo")
1015                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1016        ])
1017        .await?;
1018
1019        let stream = sliding_sync.sync();
1020        pin_mut!(stream);
1021
1022        let room_id_0 = room_id!("!r0:bar.org");
1023        let room_id_1 = room_id!("!r1:bar.org");
1024        let room_id_2 = room_id!("!r2:bar.org");
1025
1026        {
1027            let _mock_guard = Mock::given(SlidingSyncMatcher)
1028                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1029                    "pos": "1",
1030                    "lists": {},
1031                    "rooms": {
1032                        room_id_0: {
1033                            "name": "Room #0",
1034                            "initial": true,
1035                        },
1036                        room_id_1: {
1037                            "name": "Room #1",
1038                            "initial": true,
1039                        },
1040                        room_id_2: {
1041                            "name": "Room #2",
1042                            "initial": true,
1043                        },
1044                    }
1045                })))
1046                .mount_as_scoped(&server)
1047                .await;
1048
1049            let _ = stream.next().await.unwrap()?;
1050        }
1051
1052        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1053
1054        // Members aren't synced.
1055        // We need to make them synced, so that we can test that subscribing to a room
1056        // make members not synced. That's a desired feature.
1057        assert!(room0.are_members_synced().not());
1058
1059        {
1060            struct MemberMatcher(OwnedRoomId);
1061
1062            impl Match for MemberMatcher {
1063                fn matches(&self, request: &Request) -> bool {
1064                    request.url.path()
1065                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1066                        && request.method == Method::GET
1067                }
1068            }
1069
1070            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1071                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1072                    "chunk": [],
1073                })))
1074                .mount_as_scoped(&server)
1075                .await;
1076
1077            assert_matches!(room0.request_members().await, Ok(()));
1078        }
1079
1080        // Members are now synced! We can start subscribing and see how it goes.
1081        assert!(room0.are_members_synced());
1082
1083        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1084
1085        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1086        // now marked as not synced.
1087        assert!(room0.are_members_synced().not());
1088
1089        {
1090            let sticky = sliding_sync.inner.sticky.read().unwrap();
1091            let room_subscriptions = &sticky.data().room_subscriptions;
1092
1093            assert!(room_subscriptions.contains_key(room_id_0));
1094            assert!(room_subscriptions.contains_key(room_id_1));
1095            assert!(!room_subscriptions.contains_key(room_id_2));
1096        }
1097
1098        // Subscribing to the same room doesn't reset the member sync state.
1099
1100        {
1101            struct MemberMatcher(OwnedRoomId);
1102
1103            impl Match for MemberMatcher {
1104                fn matches(&self, request: &Request) -> bool {
1105                    request.url.path()
1106                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1107                        && request.method == Method::GET
1108                }
1109            }
1110
1111            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1112                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1113                    "chunk": [],
1114                })))
1115                .mount_as_scoped(&server)
1116                .await;
1117
1118            assert_matches!(room0.request_members().await, Ok(()));
1119        }
1120
1121        // Members are synced, good, good.
1122        assert!(room0.are_members_synced());
1123
1124        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1125
1126        // Members are still synced: because we have already subscribed to the
1127        // room, the members aren't marked as unsynced.
1128        assert!(room0.are_members_synced());
1129
1130        Ok(())
1131    }
1132
1133    #[async_test]
1134    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1135        let (_server, sliding_sync) = new_sliding_sync(vec![
1136            SlidingSyncList::builder("foo")
1137                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1138        ])
1139        .await?;
1140
1141        let room_id_0 = room_id!("!r0:bar.org");
1142        let room_id_1 = room_id!("!r1:bar.org");
1143        let room_id_2 = room_id!("!r2:bar.org");
1144
1145        // Subscribe to two rooms.
1146        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1147
1148        {
1149            let sticky = sliding_sync.inner.sticky.read().unwrap();
1150            let room_subscriptions = &sticky.data().room_subscriptions;
1151
1152            assert!(room_subscriptions.contains_key(room_id_0));
1153            assert!(room_subscriptions.contains_key(room_id_1));
1154            assert!(room_subscriptions.contains_key(room_id_2).not());
1155        }
1156
1157        // Subscribe to one more room.
1158        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1159
1160        {
1161            let sticky = sliding_sync.inner.sticky.read().unwrap();
1162            let room_subscriptions = &sticky.data().room_subscriptions;
1163
1164            assert!(room_subscriptions.contains_key(room_id_0));
1165            assert!(room_subscriptions.contains_key(room_id_1));
1166            assert!(room_subscriptions.contains_key(room_id_2));
1167        }
1168
1169        // Suddenly, the session expires!
1170        sliding_sync.expire_session().await;
1171
1172        {
1173            let sticky = sliding_sync.inner.sticky.read().unwrap();
1174            let room_subscriptions = &sticky.data().room_subscriptions;
1175
1176            assert!(room_subscriptions.is_empty());
1177        }
1178
1179        // Subscribe to one room again.
1180        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1181
1182        {
1183            let sticky = sliding_sync.inner.sticky.read().unwrap();
1184            let room_subscriptions = &sticky.data().room_subscriptions;
1185
1186            assert!(room_subscriptions.contains_key(room_id_0).not());
1187            assert!(room_subscriptions.contains_key(room_id_1).not());
1188            assert!(room_subscriptions.contains_key(room_id_2));
1189        }
1190
1191        Ok(())
1192    }
1193
1194    #[async_test]
1195    async fn test_add_list() -> Result<()> {
1196        let (_server, sliding_sync) = new_sliding_sync(vec![
1197            SlidingSyncList::builder("foo")
1198                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1199        ])
1200        .await?;
1201
1202        let _stream = sliding_sync.sync();
1203        pin_mut!(_stream);
1204
1205        sliding_sync
1206            .add_list(
1207                SlidingSyncList::builder("bar")
1208                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1209            )
1210            .await?;
1211
1212        let lists = sliding_sync.inner.lists.read().await;
1213
1214        assert!(lists.contains_key("foo"));
1215        assert!(lists.contains_key("bar"));
1216
1217        // this test also ensures that Tokio is not panicking when calling `add_list`.
1218
1219        Ok(())
1220    }
1221
1222    #[test]
1223    fn test_sticky_parameters_api_invalidated_flow() {
1224        let r0 = room_id!("!r0.matrix.org");
1225        let r1 = room_id!("!r1:matrix.org");
1226
1227        let mut room_subscriptions = BTreeMap::new();
1228        room_subscriptions.insert(r0.to_owned(), Default::default());
1229
1230        // At first it's invalidated.
1231        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1232            room_subscriptions,
1233            Default::default(),
1234        ));
1235        assert!(sticky.is_invalidated());
1236
1237        // Then when we create a request, the sticky parameters are applied.
1238        let txn_id: &TransactionId = "tid123".into();
1239
1240        let mut request = http::Request::default();
1241        request.txn_id = Some(txn_id.to_string());
1242
1243        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1244
1245        assert!(request.txn_id.is_some());
1246        assert_eq!(request.room_subscriptions.len(), 1);
1247        assert!(request.room_subscriptions.contains_key(r0));
1248
1249        let tid = request.txn_id.unwrap();
1250
1251        sticky.maybe_commit(tid.as_str().into());
1252        assert!(!sticky.is_invalidated());
1253
1254        // Applying new parameters will invalidate again.
1255        sticky
1256            .data_mut()
1257            .room_subscriptions
1258            .insert(r1.to_owned(), (Default::default(), Default::default()));
1259        assert!(sticky.is_invalidated());
1260
1261        // Committing with the wrong transaction id will keep it invalidated.
1262        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1263        assert!(sticky.is_invalidated());
1264
1265        // Restarting a request will only remember the last generated transaction id.
1266        let txn_id1: &TransactionId = "tid456".into();
1267        let mut request1 = http::Request::default();
1268        request1.txn_id = Some(txn_id1.to_string());
1269        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1270
1271        assert!(sticky.is_invalidated());
1272        // The first room subscription has been applied to `request`, so it's not
1273        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1274        // related to the sticky design.
1275        assert_eq!(request1.room_subscriptions.len(), 1);
1276        assert!(request1.room_subscriptions.contains_key(r1));
1277
1278        let txn_id2: &TransactionId = "tid789".into();
1279        let mut request2 = http::Request::default();
1280        request2.txn_id = Some(txn_id2.to_string());
1281
1282        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1283        assert!(sticky.is_invalidated());
1284        // `request2` contains `r1` because the sticky parameters have not been
1285        // committed, so it's still marked as pending.
1286        assert_eq!(request2.room_subscriptions.len(), 1);
1287        assert!(request2.room_subscriptions.contains_key(r1));
1288
1289        // Here we commit with the not most-recent TID, so it keeps the invalidated
1290        // status.
1291        sticky.maybe_commit(txn_id1);
1292        assert!(sticky.is_invalidated());
1293
1294        // But here we use the latest TID, so the commit is effective.
1295        sticky.maybe_commit(txn_id2);
1296        assert!(!sticky.is_invalidated());
1297    }
1298
1299    #[test]
1300    fn test_room_subscriptions_are_sticky() {
1301        let r0 = room_id!("!r0.matrix.org");
1302        let r1 = room_id!("!r1:matrix.org");
1303
1304        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1305            BTreeMap::new(),
1306            Default::default(),
1307        ));
1308
1309        // A room subscription is added, applied, and committed.
1310        {
1311            // Insert `r0`.
1312            sticky
1313                .data_mut()
1314                .room_subscriptions
1315                .insert(r0.to_owned(), (Default::default(), Default::default()));
1316
1317            // Then the sticky parameters are applied.
1318            let txn_id: &TransactionId = "tid0".into();
1319            let mut request = http::Request::default();
1320            request.txn_id = Some(txn_id.to_string());
1321
1322            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1323
1324            assert!(request.txn_id.is_some());
1325            assert_eq!(request.room_subscriptions.len(), 1);
1326            assert!(request.room_subscriptions.contains_key(r0));
1327
1328            // Then the sticky parameters are committed.
1329            let tid = request.txn_id.unwrap();
1330
1331            sticky.maybe_commit(tid.as_str().into());
1332        }
1333
1334        // A room subscription is added, applied, but NOT committed.
1335        {
1336            // Insert `r1`.
1337            sticky
1338                .data_mut()
1339                .room_subscriptions
1340                .insert(r1.to_owned(), (Default::default(), Default::default()));
1341
1342            // Then the sticky parameters are applied.
1343            let txn_id: &TransactionId = "tid1".into();
1344            let mut request = http::Request::default();
1345            request.txn_id = Some(txn_id.to_string());
1346
1347            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1348
1349            assert!(request.txn_id.is_some());
1350            assert_eq!(request.room_subscriptions.len(), 1);
1351            // `r0` is not present, it's only `r1`.
1352            assert!(request.room_subscriptions.contains_key(r1));
1353
1354            // Then the sticky parameters are NOT committed.
1355            // It can happen if the request has failed to be sent for example,
1356            // or if the response didn't match.
1357        }
1358
1359        // A previously added room subscription is re-added, applied, and committed.
1360        {
1361            // Then the sticky parameters are applied.
1362            let txn_id: &TransactionId = "tid2".into();
1363            let mut request = http::Request::default();
1364            request.txn_id = Some(txn_id.to_string());
1365
1366            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1367
1368            assert!(request.txn_id.is_some());
1369            assert_eq!(request.room_subscriptions.len(), 1);
1370            // `r0` is not present, it's only `r1`.
1371            assert!(request.room_subscriptions.contains_key(r1));
1372
1373            // Then the sticky parameters are committed.
1374            let tid = request.txn_id.unwrap();
1375
1376            sticky.maybe_commit(tid.as_str().into());
1377        }
1378
1379        // All room subscriptions have been committed.
1380        {
1381            // Then the sticky parameters are applied.
1382            let txn_id: &TransactionId = "tid3".into();
1383            let mut request = http::Request::default();
1384            request.txn_id = Some(txn_id.to_string());
1385
1386            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1387
1388            assert!(request.txn_id.is_some());
1389            // All room subscriptions have been sent.
1390            assert!(request.room_subscriptions.is_empty());
1391        }
1392    }
1393
1394    #[test]
1395    fn test_extensions_are_sticky() {
1396        let mut extensions = http::request::Extensions::default();
1397        extensions.account_data.enabled = Some(true);
1398
1399        // At first it's invalidated.
1400        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1401            Default::default(),
1402            extensions,
1403        ));
1404
1405        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1406
1407        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1408        // to-device.
1409        let extensions = &sticky.data().extensions;
1410        assert_eq!(extensions.e2ee.enabled, None);
1411        assert_eq!(extensions.to_device.enabled, None);
1412        assert_eq!(extensions.to_device.since, None);
1413
1414        // What the user explicitly enabled is… enabled.
1415        assert_eq!(extensions.account_data.enabled, Some(true));
1416
1417        let txn_id: &TransactionId = "tid123".into();
1418        let mut request = http::Request::default();
1419        request.txn_id = Some(txn_id.to_string());
1420        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1421        assert!(sticky.is_invalidated());
1422        assert_eq!(request.extensions.to_device.enabled, None);
1423        assert_eq!(request.extensions.to_device.since, None);
1424        assert_eq!(request.extensions.e2ee.enabled, None);
1425        assert_eq!(request.extensions.account_data.enabled, Some(true));
1426    }
1427
1428    #[async_test]
1429    async fn test_sticky_extensions_plus_since() -> Result<()> {
1430        let server = MockServer::start().await;
1431        let client = logged_in_client(Some(server.uri())).await;
1432
1433        let sync = client
1434            .sliding_sync("test-slidingsync")?
1435            .add_list(SlidingSyncList::builder("new_list"))
1436            .build()
1437            .await?;
1438
1439        // No extensions have been explicitly enabled here.
1440        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1441        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1442        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1443
1444        // Now enable e2ee and to-device.
1445        let sync = client
1446            .sliding_sync("test-slidingsync")?
1447            .add_list(SlidingSyncList::builder("new_list"))
1448            .with_to_device_extension(
1449                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1450            )
1451            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1452            .build()
1453            .await?;
1454
1455        // Even without a since token, the first request will contain the extensions
1456        // configuration, at least.
1457        let txn_id = TransactionId::new();
1458        let (request, _, _) = sync
1459            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1460            .await?;
1461
1462        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1463        assert_eq!(request.extensions.to_device.enabled, Some(true));
1464        assert!(request.extensions.to_device.since.is_none());
1465
1466        {
1467            // Committing with another transaction id doesn't validate anything.
1468            let mut sticky = sync.inner.sticky.write().unwrap();
1469            assert!(sticky.is_invalidated());
1470            sticky.maybe_commit(
1471                "hopefully the rng won't generate this very specific transaction id".into(),
1472            );
1473            assert!(sticky.is_invalidated());
1474        }
1475
1476        // Regenerating a request will yield the same one.
1477        let txn_id2 = TransactionId::new();
1478        let (request, _, _) = sync
1479            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1480            .await?;
1481
1482        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1483        assert_eq!(request.extensions.to_device.enabled, Some(true));
1484        assert!(request.extensions.to_device.since.is_none());
1485
1486        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1487
1488        {
1489            // Committing with the expected transaction id will validate it.
1490            let mut sticky = sync.inner.sticky.write().unwrap();
1491            assert!(sticky.is_invalidated());
1492            sticky.maybe_commit(txn_id2.as_str().into());
1493            assert!(!sticky.is_invalidated());
1494        }
1495
1496        // The next request should contain no sticky parameters.
1497        let txn_id = TransactionId::new();
1498        let (request, _, _) = sync
1499            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1500            .await?;
1501        assert!(request.extensions.e2ee.enabled.is_none());
1502        assert!(request.extensions.to_device.enabled.is_none());
1503        assert!(request.extensions.to_device.since.is_none());
1504
1505        // If there's a to-device `since` token, we make sure we put the token
1506        // into the extension config. The rest doesn't need to be re-enabled due to
1507        // stickiness.
1508        let _since_token = "since";
1509
1510        #[cfg(feature = "e2e-encryption")]
1511        {
1512            use matrix_sdk_base::crypto::store::types::Changes;
1513            if let Some(olm_machine) = &*client.olm_machine().await {
1514                olm_machine
1515                    .store()
1516                    .save_changes(Changes {
1517                        next_batch_token: Some(_since_token.to_owned()),
1518                        ..Default::default()
1519                    })
1520                    .await?;
1521            }
1522        }
1523
1524        let txn_id = TransactionId::new();
1525        let (request, _, _) = sync
1526            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1527            .await?;
1528
1529        assert!(request.extensions.e2ee.enabled.is_none());
1530        assert!(request.extensions.to_device.enabled.is_none());
1531
1532        #[cfg(feature = "e2e-encryption")]
1533        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1534
1535        Ok(())
1536    }
1537
1538    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1539    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1540    // `/key/query` requests must be sent. See the code to see the details.
1541    //
1542    // This test is asserting that.
1543    #[async_test]
1544    #[cfg(feature = "e2e-encryption")]
1545    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1546        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1547        use matrix_sdk_test::ruma_response_from_json;
1548        use ruma::user_id;
1549
1550        let server = MockServer::start().await;
1551        let client = logged_in_client(Some(server.uri())).await;
1552
1553        let alice = user_id!("@alice:localhost");
1554        let bob = user_id!("@bob:localhost");
1555        let me = user_id!("@example:localhost");
1556
1557        // Track and mark users are not dirty, so that we can check they are “dirty”
1558        // after that. Dirty here means that a `/key/query` must be sent.
1559        {
1560            let olm_machine = client.olm_machine().await;
1561            let olm_machine = olm_machine.as_ref().unwrap();
1562
1563            olm_machine.update_tracked_users([alice, bob]).await?;
1564
1565            // Assert requests.
1566            let outgoing_requests = olm_machine.outgoing_requests().await?;
1567
1568            assert_eq!(outgoing_requests.len(), 2);
1569            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1570            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1571
1572            // Fake responses.
1573            olm_machine
1574                .mark_request_as_sent(
1575                    outgoing_requests[0].request_id(),
1576                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1577                        "one_time_key_counts": {}
1578                    }))),
1579                )
1580                .await?;
1581
1582            olm_machine
1583                .mark_request_as_sent(
1584                    outgoing_requests[1].request_id(),
1585                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1586                        "device_keys": {
1587                            alice: {},
1588                            bob: {},
1589                        }
1590                    }))),
1591                )
1592                .await?;
1593
1594            // Once more.
1595            let outgoing_requests = olm_machine.outgoing_requests().await?;
1596
1597            assert_eq!(outgoing_requests.len(), 1);
1598            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1599
1600            olm_machine
1601                .mark_request_as_sent(
1602                    outgoing_requests[0].request_id(),
1603                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1604                        "device_keys": {
1605                            me: {},
1606                        }
1607                    }))),
1608                )
1609                .await?;
1610
1611            // No more.
1612            let outgoing_requests = olm_machine.outgoing_requests().await?;
1613
1614            assert!(outgoing_requests.is_empty());
1615        }
1616
1617        let sync = client
1618            .sliding_sync("test-slidingsync")?
1619            .add_list(SlidingSyncList::builder("new_list"))
1620            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1621            .build()
1622            .await?;
1623
1624        // First request: no `pos`.
1625        let txn_id = TransactionId::new();
1626        let (_request, _, _) = sync
1627            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1628            .await?;
1629
1630        // Now, tracked users must be dirty.
1631        {
1632            let olm_machine = client.olm_machine().await;
1633            let olm_machine = olm_machine.as_ref().unwrap();
1634
1635            // Assert requests.
1636            let outgoing_requests = olm_machine.outgoing_requests().await?;
1637
1638            assert_eq!(outgoing_requests.len(), 1);
1639            assert_matches!(
1640                outgoing_requests[0].request(),
1641                AnyOutgoingRequest::KeysQuery(request) => {
1642                    assert!(request.device_keys.contains_key(alice));
1643                    assert!(request.device_keys.contains_key(bob));
1644                    assert!(request.device_keys.contains_key(me));
1645                }
1646            );
1647
1648            // Fake responses.
1649            olm_machine
1650                .mark_request_as_sent(
1651                    outgoing_requests[0].request_id(),
1652                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1653                        "device_keys": {
1654                            alice: {},
1655                            bob: {},
1656                            me: {},
1657                        }
1658                    }))),
1659                )
1660                .await?;
1661        }
1662
1663        // Second request: with a `pos` this time.
1664        sync.set_pos("chocolat".to_owned()).await;
1665
1666        let txn_id = TransactionId::new();
1667        let (_request, _, _) = sync
1668            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1669            .await?;
1670
1671        // Tracked users are not marked as dirty.
1672        {
1673            let olm_machine = client.olm_machine().await;
1674            let olm_machine = olm_machine.as_ref().unwrap();
1675
1676            // Assert requests.
1677            let outgoing_requests = olm_machine.outgoing_requests().await?;
1678
1679            assert!(outgoing_requests.is_empty());
1680        }
1681
1682        Ok(())
1683    }
1684
1685    #[async_test]
1686    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1687        let server = MockServer::start().await;
1688        let client = logged_in_client(Some(server.uri())).await;
1689
1690        let sliding_sync = client
1691            .sliding_sync("test-slidingsync")?
1692            .with_to_device_extension(
1693                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1694            )
1695            .build()
1696            .await?;
1697
1698        // First request asks to enable the extension.
1699        let (request, _, _) =
1700            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1701        assert!(request.extensions.to_device.enabled.is_some());
1702
1703        let sync = sliding_sync.sync();
1704        pin_mut!(sync);
1705
1706        // `pos` is `None` to start with.
1707        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1708
1709        #[derive(Deserialize)]
1710        struct PartialRequest {
1711            txn_id: Option<String>,
1712        }
1713
1714        {
1715            let _mock_guard = Mock::given(SlidingSyncMatcher)
1716                .respond_with(|request: &Request| {
1717                    // Repeat the txn_id in the response, if set.
1718                    let request: PartialRequest = request.body_json().unwrap();
1719
1720                    ResponseTemplate::new(200).set_body_json(json!({
1721                        "txn_id": request.txn_id,
1722                        "pos": "0",
1723                    }))
1724                })
1725                .mount_as_scoped(&server)
1726                .await;
1727
1728            let next = sync.next().await;
1729            assert_matches!(next, Some(Ok(_update_summary)));
1730
1731            // `pos` has been updated.
1732            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1733        }
1734
1735        // Next request doesn't ask to enable the extension.
1736        let (request, _, _) =
1737            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1738        assert!(request.extensions.to_device.enabled.is_none());
1739
1740        // Next request is successful.
1741        {
1742            let _mock_guard = Mock::given(SlidingSyncMatcher)
1743                .respond_with(|request: &Request| {
1744                    // Repeat the txn_id in the response, if set.
1745                    let request: PartialRequest = request.body_json().unwrap();
1746
1747                    ResponseTemplate::new(200).set_body_json(json!({
1748                        "txn_id": request.txn_id,
1749                        "pos": "1",
1750                    }))
1751                })
1752                .mount_as_scoped(&server)
1753                .await;
1754
1755            let next = sync.next().await;
1756            assert_matches!(next, Some(Ok(_update_summary)));
1757
1758            // `pos` has been updated.
1759            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1760        }
1761
1762        // Next request is successful despite it receives an already
1763        // received `pos` from the server.
1764        {
1765            let _mock_guard = Mock::given(SlidingSyncMatcher)
1766                .respond_with(|request: &Request| {
1767                    // Repeat the txn_id in the response, if set.
1768                    let request: PartialRequest = request.body_json().unwrap();
1769
1770                    ResponseTemplate::new(200).set_body_json(json!({
1771                        "txn_id": request.txn_id,
1772                        "pos": "0", // <- already received!
1773                    }))
1774                })
1775                .up_to_n_times(1) // run this mock only once.
1776                .mount_as_scoped(&server)
1777                .await;
1778
1779            let next = sync.next().await;
1780            assert_matches!(next, Some(Ok(_update_summary)));
1781
1782            // `pos` has been updated.
1783            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1784        }
1785
1786        // Stop responding with successful requests!
1787        //
1788        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1789        // so they're reset. It also resets the `pos`.
1790        {
1791            let _mock_guard = Mock::given(SlidingSyncMatcher)
1792                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1793                    "error": "foo",
1794                    "errcode": "M_UNKNOWN_POS",
1795                })))
1796                .mount_as_scoped(&server)
1797                .await;
1798
1799            let next = sync.next().await;
1800
1801            // The expected error is returned.
1802            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1803
1804            // `pos` has been reset.
1805            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1806
1807            // Next request asks to enable the extension again.
1808            let (request, _, _) =
1809                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1810
1811            assert!(request.extensions.to_device.enabled.is_some());
1812
1813            // `sync` has been stopped.
1814            assert!(sync.next().await.is_none());
1815        }
1816
1817        Ok(())
1818    }
1819
1820    #[cfg(feature = "e2e-encryption")]
1821    #[async_test]
1822    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1823        let server = MockServer::start().await;
1824
1825        #[derive(Deserialize)]
1826        struct PartialRequest {
1827            txn_id: Option<String>,
1828        }
1829
1830        let server_pos = Arc::new(Mutex::new(0));
1831        let _mock_guard = Mock::given(SlidingSyncMatcher)
1832            .respond_with(move |request: &Request| {
1833                // Repeat the txn_id in the response, if set.
1834                let request: PartialRequest = request.body_json().unwrap();
1835                let pos = {
1836                    let mut pos = server_pos.lock().unwrap();
1837                    let prev = *pos;
1838                    *pos += 1;
1839                    prev
1840                };
1841
1842                ResponseTemplate::new(200).set_body_json(json!({
1843                    "txn_id": request.txn_id,
1844                    "pos": pos.to_string(),
1845                }))
1846            })
1847            .mount_as_scoped(&server)
1848            .await;
1849
1850        let client = logged_in_client(Some(server.uri())).await;
1851
1852        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1853
1854        // `pos` is `None` to start with.
1855        {
1856            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1857
1858            let (request, _, _) =
1859                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1860            assert!(request.pos.is_none());
1861        }
1862
1863        let sync = sliding_sync.sync();
1864        pin_mut!(sync);
1865
1866        // Sync goes well, and then the position is saved both into the internal memory
1867        // and the database.
1868        let next = sync.next().await;
1869        assert_matches!(next, Some(Ok(_update_summary)));
1870
1871        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1872
1873        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1874            .await?
1875            .expect("must have restored fields");
1876
1877        // While it has been saved into the database, it's not necessarily going to be
1878        // used later!
1879        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1880
1881        // Now, even if we mess with the position stored in the database, the sliding
1882        // sync instance isn't configured to reload the stream position from the
1883        // database, so it won't be changed.
1884        {
1885            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1886
1887            let mut position_guard = other_sync.inner.position.lock().await;
1888            position_guard.pos = Some("yolo".to_owned());
1889
1890            other_sync.cache_to_storage(&position_guard).await?;
1891        }
1892
1893        // It's still 0, not "yolo".
1894        {
1895            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1896            let (request, _, _) =
1897                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1898            assert_eq!(request.pos.as_deref(), Some("0"));
1899        }
1900
1901        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1902        // asked to.
1903        {
1904            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1905            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1906        }
1907
1908        Ok(())
1909    }
1910
1911    #[cfg(feature = "e2e-encryption")]
1912    #[async_test]
1913    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1914        let server = MockServer::start().await;
1915
1916        #[derive(Deserialize)]
1917        struct PartialRequest {
1918            txn_id: Option<String>,
1919        }
1920
1921        let server_pos = Arc::new(Mutex::new(0));
1922        let _mock_guard = Mock::given(SlidingSyncMatcher)
1923            .respond_with(move |request: &Request| {
1924                // Repeat the txn_id in the response, if set.
1925                let request: PartialRequest = request.body_json().unwrap();
1926                let pos = {
1927                    let mut pos = server_pos.lock().unwrap();
1928                    let prev = *pos;
1929                    *pos += 1;
1930                    prev
1931                };
1932
1933                ResponseTemplate::new(200).set_body_json(json!({
1934                    "txn_id": request.txn_id,
1935                    "pos": pos.to_string(),
1936                }))
1937            })
1938            .mount_as_scoped(&server)
1939            .await;
1940
1941        let client = logged_in_client(Some(server.uri())).await;
1942
1943        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1944
1945        // `pos` is `None` to start with.
1946        {
1947            let (request, _, _) =
1948                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1949
1950            assert!(request.pos.is_none());
1951            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1952        }
1953
1954        let sync = sliding_sync.sync();
1955        pin_mut!(sync);
1956
1957        // Sync goes well, and then the position is saved both into the internal memory
1958        // and the database.
1959        let next = sync.next().await;
1960        assert_matches!(next, Some(Ok(_update_summary)));
1961
1962        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1963
1964        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1965            .await?
1966            .expect("must have restored fields");
1967
1968        // While it has been saved into the database, it's not necessarily going to be
1969        // used later!
1970        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1971
1972        // Another process modifies the stream position under our feet...
1973        {
1974            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1975
1976            let mut position_guard = other_sync.inner.position.lock().await;
1977            position_guard.pos = Some("42".to_owned());
1978
1979            other_sync.cache_to_storage(&position_guard).await?;
1980        }
1981
1982        // It's alright, the next request will load it from the database.
1983        {
1984            let (request, _, _) =
1985                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1986            assert_eq!(request.pos.as_deref(), Some("42"));
1987            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1988        }
1989
1990        // Recreating a sliding sync with the same ID will reload it too.
1991        {
1992            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1993            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1994
1995            let (request, _, _) =
1996                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1997            assert_eq!(request.pos.as_deref(), Some("42"));
1998        }
1999
2000        // Invalidating the session will remove the in-memory value AND the database
2001        // value.
2002        sliding_sync.expire_session().await;
2003
2004        {
2005            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2006
2007            let (request, _, _) =
2008                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2009            assert!(request.pos.is_none());
2010        }
2011
2012        // And new sliding syncs with the same ID won't find it either.
2013        {
2014            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2015            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2016
2017            let (request, _, _) =
2018                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2019            assert!(request.pos.is_none());
2020        }
2021
2022        Ok(())
2023    }
2024
2025    #[async_test]
2026    async fn test_stop_sync_loop() -> Result<()> {
2027        let (_server, sliding_sync) = new_sliding_sync(vec![
2028            SlidingSyncList::builder("foo")
2029                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2030        ])
2031        .await?;
2032
2033        // Start the sync loop.
2034        let stream = sliding_sync.sync();
2035        pin_mut!(stream);
2036
2037        // The sync loop is actually running.
2038        assert!(stream.next().await.is_some());
2039
2040        // Stop the sync loop.
2041        sliding_sync.stop_sync()?;
2042
2043        // The sync loop is actually stopped.
2044        assert!(stream.next().await.is_none());
2045
2046        // Start a new sync loop.
2047        let stream = sliding_sync.sync();
2048        pin_mut!(stream);
2049
2050        // The sync loop is actually running.
2051        assert!(stream.next().await.is_some());
2052
2053        Ok(())
2054    }
2055
2056    #[async_test]
2057    async fn test_process_read_receipts() -> Result<()> {
2058        let room = owned_room_id!("!pony:example.org");
2059
2060        let server = MockServer::start().await;
2061        let client = logged_in_client(Some(server.uri())).await;
2062        client.event_cache().subscribe().unwrap();
2063
2064        let sliding_sync = client
2065            .sliding_sync("test")?
2066            .with_receipt_extension(
2067                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2068            )
2069            .add_list(
2070                SlidingSyncList::builder("all")
2071                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2072            )
2073            .build()
2074            .await?;
2075
2076        // Initial state.
2077        {
2078            let server_response = assign!(http::Response::new("0".to_owned()), {
2079                rooms: BTreeMap::from([(
2080                    room.clone(),
2081                    http::response::Room::default(),
2082                )])
2083            });
2084
2085            let _summary = {
2086                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2087                sliding_sync
2088                    .handle_response(
2089                        server_response.clone(),
2090                        &mut pos_guard,
2091                        RequestedRequiredStates::default(),
2092                    )
2093                    .await?
2094            };
2095        }
2096
2097        let server_response = assign!(http::Response::new("1".to_owned()), {
2098            extensions: assign!(http::response::Extensions::default(), {
2099                receipts: assign!(http::response::Receipts::default(), {
2100                    rooms: BTreeMap::from([
2101                        (
2102                            room.clone(),
2103                            Raw::from_json_string(
2104                                json!({
2105                                    "room_id": room,
2106                                    "type": "m.receipt",
2107                                    "content": {
2108                                        "$event:bar.org": {
2109                                            "m.read": {
2110                                                client.user_id().unwrap(): {
2111                                                    "ts": 1436451550,
2112                                                }
2113                                            }
2114                                        }
2115                                    }
2116                                })
2117                                .to_string(),
2118                            ).unwrap()
2119                        )
2120                    ])
2121                })
2122            })
2123        });
2124
2125        let summary = {
2126            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2127            sliding_sync
2128                .handle_response(
2129                    server_response.clone(),
2130                    &mut pos_guard,
2131                    RequestedRequiredStates::default(),
2132                )
2133                .await?
2134        };
2135
2136        assert!(summary.rooms.contains(&room));
2137
2138        Ok(())
2139    }
2140
2141    #[async_test]
2142    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2143        let room_id = owned_room_id!("!unicorn:example.org");
2144
2145        let server = MockServer::start().await;
2146        let client = logged_in_client(Some(server.uri())).await;
2147
2148        // Setup sliding sync with with one room and one list
2149
2150        let sliding_sync = client
2151            .sliding_sync("test")?
2152            .with_account_data_extension(
2153                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2154            )
2155            .add_list(
2156                SlidingSyncList::builder("all")
2157                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2158            )
2159            .build()
2160            .await?;
2161
2162        // Initial state.
2163        {
2164            let server_response = assign!(http::Response::new("0".to_owned()), {
2165                rooms: BTreeMap::from([(
2166                    room_id.clone(),
2167                    http::response::Room::default(),
2168                )])
2169            });
2170
2171            let _summary = {
2172                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2173                sliding_sync
2174                    .handle_response(
2175                        server_response.clone(),
2176                        &mut pos_guard,
2177                        RequestedRequiredStates::default(),
2178                    )
2179                    .await?
2180            };
2181        }
2182
2183        // Simulate a response that only changes the marked unread state of the room to
2184        // true
2185
2186        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2187
2188        let update_summary = {
2189            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2190            sliding_sync
2191                .handle_response(
2192                    server_response.clone(),
2193                    &mut pos_guard,
2194                    RequestedRequiredStates::default(),
2195                )
2196                .await?
2197        };
2198
2199        // Check that the list list and entry received the update
2200
2201        assert!(update_summary.rooms.contains(&room_id));
2202
2203        let room = client.get_room(&room_id).unwrap();
2204
2205        // Check the actual room data, this powers RoomInfo
2206
2207        assert!(room.is_marked_unread());
2208
2209        // Change it back to false and check if it updates
2210
2211        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2212
2213        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2214        sliding_sync
2215            .handle_response(
2216                server_response.clone(),
2217                &mut pos_guard,
2218                RequestedRequiredStates::default(),
2219            )
2220            .await?;
2221
2222        let room = client.get_room(&room_id).unwrap();
2223
2224        assert!(!room.is_marked_unread());
2225
2226        Ok(())
2227    }
2228
2229    fn make_mark_unread_response(
2230        response_number: &str,
2231        room_id: OwnedRoomId,
2232        unread: bool,
2233        add_rooms_section: bool,
2234    ) -> http::Response {
2235        let rooms = if add_rooms_section {
2236            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2237        } else {
2238            BTreeMap::new()
2239        };
2240
2241        let extensions = assign!(http::response::Extensions::default(), {
2242            account_data: assign!(http::response::AccountData::default(), {
2243                rooms: BTreeMap::from([
2244                    (
2245                        room_id,
2246                        vec![
2247                            Raw::from_json_string(
2248                                json!({
2249                                    "content": {
2250                                        "unread": unread
2251                                    },
2252                                    "type": "m.marked_unread"
2253                                })
2254                                .to_string(),
2255                            ).unwrap()
2256                        ]
2257                    )
2258                ])
2259            })
2260        });
2261
2262        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2263    }
2264
2265    #[async_test]
2266    async fn test_process_rooms_account_data() -> Result<()> {
2267        let room = owned_room_id!("!pony:example.org");
2268
2269        let server = MockServer::start().await;
2270        let client = logged_in_client(Some(server.uri())).await;
2271
2272        let sliding_sync = client
2273            .sliding_sync("test")?
2274            .with_account_data_extension(
2275                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2276            )
2277            .add_list(
2278                SlidingSyncList::builder("all")
2279                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2280            )
2281            .build()
2282            .await?;
2283
2284        // Initial state.
2285        {
2286            let server_response = assign!(http::Response::new("0".to_owned()), {
2287                rooms: BTreeMap::from([(
2288                    room.clone(),
2289                    http::response::Room::default(),
2290                )])
2291            });
2292
2293            let _summary = {
2294                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2295                sliding_sync
2296                    .handle_response(
2297                        server_response.clone(),
2298                        &mut pos_guard,
2299                        RequestedRequiredStates::default(),
2300                    )
2301                    .await?
2302            };
2303        }
2304
2305        let server_response = assign!(http::Response::new("1".to_owned()), {
2306            extensions: assign!(http::response::Extensions::default(), {
2307                account_data: assign!(http::response::AccountData::default(), {
2308                    rooms: BTreeMap::from([
2309                        (
2310                            room.clone(),
2311                            vec![
2312                                Raw::from_json_string(
2313                                    json!({
2314                                        "content": {
2315                                            "tags": {
2316                                                "u.work": {
2317                                                    "order": 0.9
2318                                                }
2319                                            }
2320                                        },
2321                                        "type": "m.tag"
2322                                    })
2323                                    .to_string(),
2324                                ).unwrap()
2325                            ]
2326                        )
2327                    ])
2328                })
2329            })
2330        });
2331        let summary = {
2332            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2333            sliding_sync
2334                .handle_response(
2335                    server_response.clone(),
2336                    &mut pos_guard,
2337                    RequestedRequiredStates::default(),
2338                )
2339                .await?
2340        };
2341
2342        assert!(summary.rooms.contains(&room));
2343
2344        Ok(())
2345    }
2346
2347    #[async_test]
2348    #[cfg(feature = "e2e-encryption")]
2349    async fn test_process_only_encryption_events() -> Result<()> {
2350        use ruma::OneTimeKeyAlgorithm;
2351
2352        let room = owned_room_id!("!croissant:example.org");
2353
2354        let server = MockServer::start().await;
2355        let client = logged_in_client(Some(server.uri())).await;
2356
2357        let server_response = assign!(http::Response::new("0".to_owned()), {
2358            rooms: BTreeMap::from([(
2359                room.clone(),
2360                assign!(http::response::Room::default(), {
2361                    name: Some("Croissants lovers".to_owned()),
2362                    timeline: Vec::new(),
2363                }),
2364            )]),
2365
2366            extensions: assign!(http::response::Extensions::default(), {
2367                e2ee: assign!(http::response::E2EE::default(), {
2368                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2369                }),
2370                to_device: Some(assign!(http::response::ToDevice::default(), {
2371                    next_batch: "to-device-token".to_owned(),
2372                })),
2373            })
2374        });
2375
2376        // Don't process non-encryption events if the sliding sync is configured for
2377        // encryption only.
2378
2379        let sliding_sync = client
2380            .sliding_sync("test")?
2381            .with_to_device_extension(
2382                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2383            )
2384            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2385            .build()
2386            .await?;
2387
2388        {
2389            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2390
2391            sliding_sync
2392                .handle_response(
2393                    server_response.clone(),
2394                    &mut position_guard,
2395                    RequestedRequiredStates::default(),
2396                )
2397                .await?;
2398        }
2399
2400        // E2EE has been properly handled.
2401        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2402        assert_eq!(uploaded_key_count, 42);
2403
2404        {
2405            let olm_machine = &*client.olm_machine_for_testing().await;
2406            assert_eq!(
2407                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2408                Some("to-device-token")
2409            );
2410        }
2411
2412        // Room events haven't.
2413        assert!(client.get_room(&room).is_none());
2414
2415        // Conversely, only process room lists events if the sliding sync was configured
2416        // as so.
2417        let client = logged_in_client(Some(server.uri())).await;
2418
2419        let sliding_sync = client
2420            .sliding_sync("test")?
2421            .add_list(SlidingSyncList::builder("thelist"))
2422            .build()
2423            .await?;
2424
2425        {
2426            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2427
2428            sliding_sync
2429                .handle_response(
2430                    server_response.clone(),
2431                    &mut position_guard,
2432                    RequestedRequiredStates::default(),
2433                )
2434                .await?;
2435        }
2436
2437        // E2EE response has been ignored.
2438        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2439        assert_eq!(uploaded_key_count, 0);
2440
2441        {
2442            let olm_machine = &*client.olm_machine_for_testing().await;
2443            assert_eq!(
2444                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2445                None
2446            );
2447        }
2448
2449        // The room is now known.
2450        assert!(client.get_room(&room).is_some());
2451
2452        // And it's also possible to set up both.
2453        let client = logged_in_client(Some(server.uri())).await;
2454
2455        let sliding_sync = client
2456            .sliding_sync("test")?
2457            .add_list(SlidingSyncList::builder("thelist"))
2458            .with_to_device_extension(
2459                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2460            )
2461            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2462            .build()
2463            .await?;
2464
2465        {
2466            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2467
2468            sliding_sync
2469                .handle_response(
2470                    server_response.clone(),
2471                    &mut position_guard,
2472                    RequestedRequiredStates::default(),
2473                )
2474                .await?;
2475        }
2476
2477        // E2EE has been properly handled.
2478        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2479        assert_eq!(uploaded_key_count, 42);
2480
2481        {
2482            let olm_machine = &*client.olm_machine_for_testing().await;
2483            assert_eq!(
2484                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2485                Some("to-device-token")
2486            );
2487        }
2488
2489        // The room is now known.
2490        assert!(client.get_room(&room).is_some());
2491
2492        Ok(())
2493    }
2494
2495    #[async_test]
2496    async fn test_lock_multiple_requests() -> Result<()> {
2497        let server = MockServer::start().await;
2498        let client = logged_in_client(Some(server.uri())).await;
2499
2500        let pos = Arc::new(Mutex::new(0));
2501        let _mock_guard = Mock::given(SlidingSyncMatcher)
2502            .respond_with(move |_: &Request| {
2503                let mut pos = pos.lock().unwrap();
2504                *pos += 1;
2505                ResponseTemplate::new(200).set_body_json(json!({
2506                    "pos": pos.to_string(),
2507                    "lists": {},
2508                    "rooms": {}
2509                }))
2510            })
2511            .mount_as_scoped(&server)
2512            .await;
2513
2514        let sliding_sync = client
2515            .sliding_sync("test")?
2516            .with_to_device_extension(
2517                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2518            )
2519            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2520            .build()
2521            .await?;
2522
2523        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2524        // test would never terminate.
2525        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2526
2527        for result in requests.await {
2528            result?;
2529        }
2530
2531        Ok(())
2532    }
2533
2534    #[async_test]
2535    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2536        let server = MockServer::start().await;
2537        let client = logged_in_client(Some(server.uri())).await;
2538
2539        let pos = Arc::new(Mutex::new(0));
2540        let _mock_guard = Mock::given(SlidingSyncMatcher)
2541            .respond_with(move |_: &Request| {
2542                let mut pos = pos.lock().unwrap();
2543                *pos += 1;
2544                // Respond slowly enough that we can skip one iteration.
2545                ResponseTemplate::new(200)
2546                    .set_body_json(json!({
2547                        "pos": pos.to_string(),
2548                        "lists": {},
2549                        "rooms": {}
2550                    }))
2551                    .set_delay(Duration::from_secs(2))
2552            })
2553            .mount_as_scoped(&server)
2554            .await;
2555
2556        let sliding_sync =
2557            client
2558                .sliding_sync("test")?
2559                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2560                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2561                ))
2562                .add_list(
2563                    SlidingSyncList::builder("another-list")
2564                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2565                )
2566                .build()
2567                .await?;
2568
2569        let stream = sliding_sync.sync();
2570        pin_mut!(stream);
2571
2572        let cloned_sync = sliding_sync.clone();
2573        spawn(async move {
2574            tokio::time::sleep(Duration::from_millis(100)).await;
2575
2576            cloned_sync
2577                .on_list("another-list", |list| {
2578                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2579                    ready(())
2580                })
2581                .await;
2582        });
2583
2584        assert_matches!(stream.next().await, Some(Ok(_)));
2585
2586        sliding_sync.stop_sync().unwrap();
2587
2588        assert_matches!(stream.next().await, None);
2589
2590        let mut num_requests = 0;
2591
2592        for request in server.received_requests().await.unwrap() {
2593            if !SlidingSyncMatcher.matches(&request) {
2594                continue;
2595            }
2596
2597            let another_list_ranges = if num_requests == 0 {
2598                // First request
2599                json!([[0, 10]])
2600            } else {
2601                // Second request
2602                json!([[10, 20]])
2603            };
2604
2605            num_requests += 1;
2606            assert!(num_requests <= 2, "more than one request hit the server");
2607
2608            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2609
2610            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2611                &json_value,
2612                &json!({
2613                    "conn_id": "test",
2614                    "lists": {
2615                        "room-list": {
2616                            "ranges": [[0, 9]],
2617                            "required_state": [
2618                                ["m.room.encryption", ""],
2619                                ["m.room.tombstone", ""]
2620                            ],
2621                        },
2622                        "another-list": {
2623                            "ranges": another_list_ranges,
2624                            "required_state": [
2625                                ["m.room.encryption", ""],
2626                                ["m.room.tombstone", ""]
2627                            ],
2628                        },
2629                    }
2630                }),
2631                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2632            ) {
2633                dbg!(json_value);
2634                panic!("json differ: {err}");
2635            }
2636        }
2637
2638        assert_eq!(num_requests, 2);
2639
2640        Ok(())
2641    }
2642
2643    #[async_test]
2644    async fn test_timeout_zero_list() -> Result<()> {
2645        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2646
2647        let (request, _, _) =
2648            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2649
2650        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2651        // on new update to pop.
2652        assert!(request.timeout.is_some());
2653
2654        Ok(())
2655    }
2656
2657    #[async_test]
2658    async fn test_timeout_one_list() -> Result<()> {
2659        let (_server, sliding_sync) = new_sliding_sync(vec![
2660            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2661        ])
2662        .await?;
2663
2664        let (request, _, _) =
2665            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2666
2667        // The list does not require a timeout.
2668        assert!(request.timeout.is_none());
2669
2670        // Simulate a response.
2671        {
2672            let server_response = assign!(http::Response::new("0".to_owned()), {
2673                lists: BTreeMap::from([(
2674                    "foo".to_owned(),
2675                    assign!(http::response::List::default(), {
2676                        count: uint!(7),
2677                    })
2678                 )])
2679            });
2680
2681            let _summary = {
2682                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2683                sliding_sync
2684                    .handle_response(
2685                        server_response.clone(),
2686                        &mut pos_guard,
2687                        RequestedRequiredStates::default(),
2688                    )
2689                    .await?
2690            };
2691        }
2692
2693        let (request, _, _) =
2694            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2695
2696        // The list is now fully loaded, so it requires a timeout.
2697        assert!(request.timeout.is_some());
2698
2699        Ok(())
2700    }
2701
2702    #[async_test]
2703    async fn test_timeout_three_lists() -> Result<()> {
2704        let (_server, sliding_sync) = new_sliding_sync(vec![
2705            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2706            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2707            SlidingSyncList::builder("baz")
2708                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2709        ])
2710        .await?;
2711
2712        let (request, _, _) =
2713            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2714
2715        // Two lists don't require a timeout.
2716        assert!(request.timeout.is_none());
2717
2718        // Simulate a response.
2719        {
2720            let server_response = assign!(http::Response::new("0".to_owned()), {
2721                lists: BTreeMap::from([(
2722                    "foo".to_owned(),
2723                    assign!(http::response::List::default(), {
2724                        count: uint!(7),
2725                    })
2726                 )])
2727            });
2728
2729            let _summary = {
2730                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2731                sliding_sync
2732                    .handle_response(
2733                        server_response.clone(),
2734                        &mut pos_guard,
2735                        RequestedRequiredStates::default(),
2736                    )
2737                    .await?
2738            };
2739        }
2740
2741        let (request, _, _) =
2742            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2743
2744        // One don't require a timeout.
2745        assert!(request.timeout.is_none());
2746
2747        // Simulate a response.
2748        {
2749            let server_response = assign!(http::Response::new("1".to_owned()), {
2750                lists: BTreeMap::from([(
2751                    "bar".to_owned(),
2752                    assign!(http::response::List::default(), {
2753                        count: uint!(7),
2754                    })
2755                 )])
2756            });
2757
2758            let _summary = {
2759                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2760                sliding_sync
2761                    .handle_response(
2762                        server_response.clone(),
2763                        &mut pos_guard,
2764                        RequestedRequiredStates::default(),
2765                    )
2766                    .await?
2767            };
2768        }
2769
2770        let (request, _, _) =
2771            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2772
2773        // All lists require a timeout.
2774        assert!(request.timeout.is_some());
2775
2776        Ok(())
2777    }
2778
2779    #[async_test]
2780    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2781        let server = MockServer::start().await;
2782        let client = logged_in_client(Some(server.uri())).await;
2783
2784        let _mock_guard = Mock::given(SlidingSyncMatcher)
2785            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2786                "pos": "0",
2787                "lists": {},
2788                "rooms": {}
2789            })))
2790            .mount_as_scoped(&server)
2791            .await;
2792
2793        let sliding_sync = client
2794            .sliding_sync("test")?
2795            .with_to_device_extension(
2796                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2797            )
2798            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2799            .build()
2800            .await?;
2801
2802        let sliding_sync = Arc::new(sliding_sync);
2803
2804        // Create the listener and perform a sync request
2805        let sync_beat_listener = client.inner.sync_beat.listen();
2806        sliding_sync.sync_once().await?;
2807
2808        // The sync beat listener should be notified shortly after
2809        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2810        Ok(())
2811    }
2812
2813    #[async_test]
2814    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2815        let server = MockServer::start().await;
2816        let client = logged_in_client(Some(server.uri())).await;
2817
2818        let _mock_guard = Mock::given(SlidingSyncMatcher)
2819            .respond_with(ResponseTemplate::new(404))
2820            .mount_as_scoped(&server)
2821            .await;
2822
2823        let sliding_sync = client
2824            .sliding_sync("test")?
2825            .with_to_device_extension(
2826                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2827            )
2828            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2829            .build()
2830            .await?;
2831
2832        let sliding_sync = Arc::new(sliding_sync);
2833
2834        // Create the listener and perform a sync request
2835        let sync_beat_listener = client.inner.sync_beat.listen();
2836        let sync_result = sliding_sync.sync_once().await;
2837        assert!(sync_result.is_err());
2838
2839        // The sync beat listener won't be notified in this case
2840        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2841
2842        Ok(())
2843    }
2844
2845    #[async_test]
2846    async fn test_sync_lock_is_released_before_calling_handlers() -> Result<()> {
2847        let server = MatrixMockServer::new().await;
2848        let client = server.client_builder().build().await;
2849        let room_id = room_id!("!mu5hr00m:example.org");
2850
2851        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2852            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2853                "pos": "0",
2854                "lists": {},
2855                "extensions": {
2856                    "account_data": {
2857                        "global": [
2858                            {
2859                                "type": "m.direct",
2860                                "content": {
2861                                    "@de4dlockh0lmes:example.org": [
2862                                        "!mu5hr00m:example.org"
2863                                    ]
2864                                }
2865                            }
2866                        ]
2867                    }
2868                },
2869                "rooms": {
2870                    room_id: {
2871                        "name": "Mario Bros Fanbase Room",
2872                        "initial": true,
2873                    },
2874                }
2875            })))
2876            .mount_as_scoped(server.server())
2877            .await;
2878
2879        let f = EventFactory::new().room(room_id);
2880
2881        Mock::given(method("GET"))
2882            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2883            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2884                "chunk": [
2885                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2886                ]
2887            })))
2888            .mount(server.server())
2889            .await;
2890
2891        let (tx, rx) = tokio::sync::oneshot::channel();
2892
2893        let tx = Arc::new(Mutex::new(Some(tx)));
2894        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2895            // Try to run a /members query while in a event handler.
2896            let members =
2897                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2898            assert_eq!(members.len(), 1);
2899            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2900        });
2901
2902        let sliding_sync = client
2903            .sliding_sync("test")?
2904            .add_list(SlidingSyncList::builder("thelist"))
2905            .with_account_data_extension(
2906                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2907            )
2908            .build()
2909            .await?;
2910
2911        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2912            .await
2913            .expect("Sync did not complete in time")
2914            .expect("Sync failed");
2915
2916        // Wait for the event handler to complete.
2917        tokio::time::timeout(Duration::from_secs(5), rx)
2918            .await
2919            .expect("Event handler did not complete in time")
2920            .expect("Event handler failed");
2921
2922        Ok(())
2923    }
2924}