matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod sticky_parameters;
24
25use std::{
26    collections::{BTreeMap, btree_map::Entry},
27    fmt::Debug,
28    future::Future,
29    sync::{Arc, RwLock as StdRwLock},
30    time::Duration,
31};
32
33use async_stream::stream;
34pub use client::{Version, VersionBuilder};
35use futures_core::stream::Stream;
36use matrix_sdk_base::RequestedRequiredStates;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_common::executor::JoinHandleExt as _;
39use matrix_sdk_common::{executor::spawn, timer};
40use ruma::{
41    OwnedRoomId, RoomId,
42    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
43    assign,
44};
45use tokio::{
46    select,
47    sync::{Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock, broadcast::Sender},
48};
49use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
50
51pub use self::{builder::*, client::VersionBuilderError, error::*, list::*};
52use self::{
53    cache::restore_sliding_sync_state,
54    client::SlidingSyncResponseProcessor,
55    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
56};
57use crate::{Client, Result, config::RequestConfig};
58
59/// The Sliding Sync instance.
60///
61/// It is OK to clone this type as much as you need: cloning it is cheap.
62#[derive(Clone, Debug)]
63pub struct SlidingSync {
64    /// The Sliding Sync data.
65    inner: Arc<SlidingSyncInner>,
66}
67
68#[derive(Debug)]
69pub(super) struct SlidingSyncInner {
70    /// A unique identifier for this instance of sliding sync.
71    ///
72    /// Used to distinguish different connections to sliding sync.
73    id: String,
74
75    /// The HTTP Matrix client.
76    client: Client,
77
78    /// Long-polling timeout that appears in sliding sync request.
79    poll_timeout: Duration,
80
81    /// Extra duration for the sliding sync request to timeout. This is added to
82    /// the [`Self::poll_timeout`].
83    network_timeout: Duration,
84
85    /// The storage key to keep this cache at and load it from.
86    storage_key: String,
87
88    /// Should this sliding sync instance try to restore its sync position
89    /// from the database?
90    ///
91    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
92    /// keep it even so, to avoid sparkling cfg statements everywhere
93    /// throughout this file.
94    share_pos: bool,
95
96    /// Position markers.
97    ///
98    /// The `pos` marker represents a progression when exchanging requests and
99    /// responses with the server: the server acknowledges the request by
100    /// responding with a new `pos`. If the client sends two non-necessarily
101    /// consecutive requests with the same `pos`, the server has to reply with
102    /// the same identical response.
103    ///
104    /// `position` is behind a mutex so that a new request starts after the
105    /// previous request trip has fully ended (successfully or not). This
106    /// mechanism exists to wait for the response to be handled and to see the
107    /// `position` being updated, before sending a new request.
108    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
109
110    /// The lists of this Sliding Sync instance.
111    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
112
113    /// Request parameters that are sticky.
114    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
115
116    /// Internal channel used to pass messages between Sliding Sync and other
117    /// types.
118    internal_channel: Sender<SlidingSyncInternalMessage>,
119}
120
121impl SlidingSync {
122    pub(super) fn new(inner: SlidingSyncInner) -> Self {
123        Self { inner: Arc::new(inner) }
124    }
125
126    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
127        cache::store_sliding_sync_state(self, position).await
128    }
129
130    /// Create a new [`SlidingSyncBuilder`].
131    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
132        SlidingSyncBuilder::new(id, client)
133    }
134
135    /// Subscribe to many rooms.
136    ///
137    /// If the associated `Room`s exist, it will be marked as
138    /// members are missing, so that it ensures to re-fetch all members.
139    ///
140    /// A subscription to an already subscribed room is ignored.
141    pub fn subscribe_to_rooms(
142        &self,
143        room_ids: &[&RoomId],
144        settings: Option<http::request::RoomSubscription>,
145        cancel_in_flight_request: bool,
146    ) {
147        let settings = settings.unwrap_or_default();
148        let mut sticky = self.inner.sticky.write().unwrap();
149        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
150
151        let mut skip_over_current_sync_loop_iteration = false;
152
153        for room_id in room_ids {
154            // If the room subscription already exists, let's not
155            // override it with a new one. First, it would reset its
156            // state (`RoomSubscriptionState`), and second it would try to
157            // re-subscribe with the next request. We don't want that. A room
158            // subscription should happen once, and next subscriptions should
159            // be ignored.
160            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
161                if let Some(room) = self.inner.client.get_room(room_id) {
162                    room.mark_members_missing();
163                }
164
165                entry.insert((RoomSubscriptionState::default(), settings.clone()));
166
167                skip_over_current_sync_loop_iteration = true;
168            }
169        }
170
171        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
172            self.inner.internal_channel_send_if_possible(
173                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
174            );
175        }
176    }
177
178    /// Find a list by its name, and do something on it if it exists.
179    pub async fn on_list<Function, FunctionOutput, R>(
180        &self,
181        list_name: &str,
182        function: Function,
183    ) -> Option<R>
184    where
185        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
186        FunctionOutput: Future<Output = R>,
187    {
188        let lists = self.inner.lists.read().await;
189
190        match lists.get(list_name) {
191            Some(list) => Some(function(list).await),
192            None => None,
193        }
194    }
195
196    /// Add the list to the list of lists.
197    ///
198    /// As lists need to have a unique `.name`, if a list with the same name
199    /// is found the new list will replace the old one and the return it or
200    /// `None`.
201    pub async fn add_list(
202        &self,
203        list_builder: SlidingSyncListBuilder,
204    ) -> Result<Option<SlidingSyncList>> {
205        let list = list_builder.build(self.inner.internal_channel.clone());
206
207        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
208
209        self.inner.internal_channel_send_if_possible(
210            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
211        );
212
213        Ok(old_list)
214    }
215
216    /// Add a list that will be cached and reloaded from the cache.
217    ///
218    /// This will raise an error if a storage key was not set, or if there
219    /// was a I/O error reading from the cache.
220    ///
221    /// The rest of the semantics is the same as [`Self::add_list`].
222    pub async fn add_cached_list(
223        &self,
224        mut list_builder: SlidingSyncListBuilder,
225    ) -> Result<Option<SlidingSyncList>> {
226        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
227
228        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
229
230        self.add_list(list_builder).await
231    }
232
233    /// Handle the HTTP response.
234    #[instrument(skip_all)]
235    async fn handle_response(
236        &self,
237        mut sliding_sync_response: http::Response,
238        position: &mut SlidingSyncPositionMarkers,
239        requested_required_states: RequestedRequiredStates,
240    ) -> Result<UpdateSummary, crate::Error> {
241        let pos = Some(sliding_sync_response.pos.clone());
242
243        let must_process_rooms_response = self.must_process_rooms_response().await;
244
245        trace!(yes = must_process_rooms_response, "Must process rooms response?");
246
247        // Transform a Sliding Sync Response to a `SyncResponse`.
248        //
249        // We may not need the `sync_response` in the future (once `SyncResponse` will
250        // move to Sliding Sync, i.e. to `http::Response`), but processing the
251        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
252        // happens here.
253
254        let sync_response = {
255            let _timer = timer!("response processor");
256
257            let response_processor = {
258                // Take the lock to synchronise accesses to the state store, to avoid concurrent
259                // sliding syncs overwriting each other's room infos.
260                let _state_store_lock = {
261                    let _timer = timer!("acquiring the `state_store_lock`");
262
263                    self.inner.client.base_client().state_store_lock().lock().await
264                };
265
266                let mut response_processor =
267                    SlidingSyncResponseProcessor::new(self.inner.client.clone());
268
269                // Process thread subscriptions if they're available.
270                //
271                // It's important to do this *before* handling the room responses, so that
272                // notifications can be properly generated based on the thread subscriptions,
273                // for the events in threads we've subscribed to.
274                if self.is_thread_subscriptions_enabled() {
275                    response_processor
276                        .handle_thread_subscriptions(
277                            position.pos.as_deref(),
278                            std::mem::take(
279                                &mut sliding_sync_response.extensions.thread_subscriptions,
280                            ),
281                        )
282                        .await?;
283                }
284
285                #[cfg(feature = "e2e-encryption")]
286                if self.is_e2ee_enabled() {
287                    response_processor.handle_encryption(&sliding_sync_response.extensions).await?
288                }
289
290                // Only handle the room's subsection of the response, if this sliding sync was
291                // configured to do so.
292                if must_process_rooms_response {
293                    response_processor
294                        .handle_room_response(&sliding_sync_response, &requested_required_states)
295                        .await?;
296                }
297
298                response_processor
299            };
300
301            // Release the lock before calling event handlers
302            response_processor.process_and_take_response().await?
303        };
304
305        debug!("Sliding Sync response has been handled by the client");
306        trace!(?sync_response);
307
308        // Commit sticky parameters, if needed.
309        if let Some(ref txn_id) = sliding_sync_response.txn_id {
310            let txn_id = txn_id.as_str().into();
311            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
312            let mut lists = self.inner.lists.write().await;
313            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
314        }
315
316        let update_summary = {
317            // Update the rooms.
318            let updated_rooms = {
319                let mut updated_rooms = Vec::with_capacity(
320                    sliding_sync_response.rooms.len() + sync_response.rooms.joined.len(),
321                );
322
323                updated_rooms.extend(sliding_sync_response.rooms.keys().cloned());
324
325                // There might be other rooms that were only mentioned in the sliding sync
326                // extensions part of the response, and thus would result in rooms present in
327                // the `sync_response.joined`. Mark them as updated too.
328                //
329                // Since we've removed rooms that were in the room subsection from
330                // `sync_response.rooms.joined`, the remaining ones aren't already present in
331                // `updated_rooms` and wouldn't cause any duplicates.
332                updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
333
334                updated_rooms
335            };
336
337            // Update the lists.
338            let updated_lists = {
339                debug!(
340                    lists = ?sliding_sync_response.lists,
341                    "Update lists"
342                );
343
344                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
345                let mut lists = self.inner.lists.write().await;
346
347                // Iterate on known lists, not on lists in the response. Rooms may have been
348                // updated that were not involved in any list update.
349                for (name, list) in lists.iter_mut() {
350                    if let Some(updates) = sliding_sync_response.lists.get(name) {
351                        let maximum_number_of_rooms: u32 =
352                            updates.count.try_into().expect("failed to convert `count` to `u32`");
353
354                        if list.update(Some(maximum_number_of_rooms))? {
355                            updated_lists.push(name.clone());
356                        }
357                    } else if list.update(None)? {
358                        updated_lists.push(name.clone());
359                    }
360                }
361
362                // Report about unknown lists.
363                for name in sliding_sync_response.lists.keys() {
364                    if !lists.contains_key(name) {
365                        error!("Response for list `{name}` - unknown to us; skipping");
366                    }
367                }
368
369                updated_lists
370            };
371
372            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
373        };
374
375        // Everything went well, we can update the position markers.
376        //
377        // Save the new position markers.
378        debug!(previous_pos = position.pos, new_pos = pos, "Updating `pos`");
379
380        position.pos = pos;
381
382        Ok(update_summary)
383    }
384
385    #[instrument(skip_all)]
386    async fn generate_sync_request(
387        &self,
388        txn_id: &mut LazyTransactionId,
389    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
390        // Collect requests for lists.
391        let mut requests_lists = BTreeMap::new();
392
393        let require_timeout = {
394            let lists = self.inner.lists.read().await;
395
396            // Start at `true` in case there is zero list.
397            let mut require_timeout = true;
398
399            for (name, list) in lists.iter() {
400                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
401                require_timeout = require_timeout && list.requires_timeout();
402            }
403
404            require_timeout
405        };
406
407        // Collect the `pos`.
408        //
409        // Wait on the `position` mutex to be available. It means no request nor
410        // response is running. The `position` mutex is released whether the response
411        // has been fully handled successfully, in this case the `pos` is updated, or
412        // the response handling has failed, in this case the `pos` hasn't been updated
413        // and the same `pos` will be used for this new request.
414        let mut position_guard = {
415            debug!("Waiting to acquire the `position` lock");
416
417            let _timer = timer!("acquiring the `position` lock");
418
419            self.inner.position.clone().lock_owned().await
420        };
421
422        debug!(pos = ?position_guard.pos, "Got a position");
423
424        let to_device_enabled =
425            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
426
427        let restored_fields = if self.inner.share_pos || to_device_enabled {
428            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key).await?
429        } else {
430            None
431        };
432
433        // Update pos: either the one restored from the database, if any and the sliding
434        // sync was configured so, or read it from the memory cache.
435        let pos = if self.inner.share_pos {
436            if let Some(fields) = &restored_fields {
437                // Override the memory one with the database one, for consistency.
438                if fields.pos != position_guard.pos {
439                    info!(
440                        "Pos from previous request ('{:?}') was different from \
441                         pos in database ('{:?}').",
442                        position_guard.pos, fields.pos
443                    );
444                    position_guard.pos = fields.pos.clone();
445                }
446                fields.pos.clone()
447            } else {
448                position_guard.pos.clone()
449            }
450        } else {
451            position_guard.pos.clone()
452        };
453
454        Span::current().record("pos", &pos);
455
456        // When the client sends a request with no `pos`, MSC4186 returns no device
457        // lists updates, as it only returns changes since the provided `pos`
458        // (which is `null` in this case); this is in line with sync v2.
459        //
460        // Therefore, with MSC4186, the device list cache must be marked as to be
461        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
462        // device lists updates that happened between the previous request and the new
463        // “initial” request.
464        #[cfg(feature = "e2e-encryption")]
465        if pos.is_none() && self.is_e2ee_enabled() {
466            info!("Marking all tracked users as dirty");
467
468            let olm_machine = self.inner.client.olm_machine().await;
469            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
470            olm_machine.mark_all_tracked_users_as_dirty().await?;
471        }
472
473        // Configure the timeout.
474        //
475        // The `timeout` query is necessary when all lists require it. Please see
476        // [`SlidingSyncList::requires_timeout`].
477        let timeout = require_timeout.then(|| self.inner.poll_timeout);
478
479        let mut request = assign!(http::Request::new(), {
480            conn_id: Some(self.inner.id.clone()),
481            pos,
482            timeout,
483            lists: requests_lists,
484        });
485
486        // Apply sticky parameters, if needs be.
487        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
488
489        // Extensions are now applied (via sticky parameters).
490        //
491        // Override the to-device token if the extension is enabled.
492        if to_device_enabled {
493            request.extensions.to_device.since =
494                restored_fields.and_then(|fields| fields.to_device_token);
495        }
496
497        // Apply the transaction id if one was generated.
498        if let Some(txn_id) = txn_id.get() {
499            request.txn_id = Some(txn_id.to_string());
500        }
501
502        Ok((
503            // The request itself.
504            request,
505            // Configure long-polling. We need some time for the long-poll itself,
506            // and extra time for the network delays.
507            RequestConfig::default()
508                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
509                .retry_limit(3),
510            position_guard,
511        ))
512    }
513
514    /// Send a sliding sync request.
515    ///
516    /// This method contains the sending logic.
517    async fn send_sync_request(
518        &self,
519        request: http::Request,
520        request_config: RequestConfig,
521        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
522    ) -> Result<UpdateSummary> {
523        debug!("Sending request");
524
525        // Prepare the request.
526        let requested_required_states = RequestedRequiredStates::from(&request);
527        let request = self.inner.client.send(request).with_request_config(request_config);
528
529        // Send the request and get a response with end-to-end encryption support.
530        //
531        // Sending the `/sync` request out when end-to-end encryption is enabled means
532        // that we need to also send out any outgoing e2ee related request out
533        // coming from the `OlmMachine::outgoing_requests()` method.
534
535        #[cfg(feature = "e2e-encryption")]
536        let response = {
537            if self.is_e2ee_enabled() {
538                // Here, we need to run 2 things:
539                //
540                // 1. Send the sliding sync request and get a response,
541                // 2. Send the E2EE requests.
542                //
543                // We don't want to use a `join` or `try_join` because we want to fail if and
544                // only if sending the sliding sync request fails. Failing to send the E2EE
545                // requests should just result in a log.
546                //
547                // We also want to give the priority to sliding sync request. E2EE requests are
548                // sent concurrently to the sliding sync request, but the priority is on waiting
549                // a sliding sync response.
550                //
551                // If sending sliding sync request fails, the sending of E2EE requests must be
552                // aborted as soon as possible.
553
554                let client = self.inner.client.clone();
555                let e2ee_uploads = spawn(
556                    async move {
557                        if let Err(error) = client.send_outgoing_requests().await {
558                            error!(?error, "Error while sending outgoing E2EE requests");
559                        }
560                    }
561                    .instrument(Span::current()),
562                )
563                // Ensure that the task is not running in detached mode. It is aborted when it's
564                // dropped.
565                .abort_on_drop();
566
567                // Wait on the sliding sync request success or failure early.
568                let response = request.await?;
569
570                // At this point, if `request` has been resolved successfully, we wait on
571                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
572                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
573                // been dropped, so aborted.
574                e2ee_uploads.await.map_err(|error| Error::JoinError {
575                    task_description: "e2ee_uploads".to_owned(),
576                    error,
577                })?;
578
579                response
580            } else {
581                request.await?
582            }
583        };
584
585        // Send the request and get a response _without_ end-to-end encryption support.
586        #[cfg(not(feature = "e2e-encryption"))]
587        let response = request.await?;
588
589        debug!("Received response");
590
591        // At this point, the request has been sent, and a response has been received.
592        //
593        // We must ensure the handling of the response cannot be stopped/
594        // cancelled. It must be done entirely, otherwise we can have
595        // corrupted/incomplete states for Sliding Sync and other parts of
596        // the code.
597        //
598        // That's why we are running the handling of the response in a spawned
599        // future that cannot be cancelled by anything.
600        let this = self.clone();
601
602        // Spawn a new future to ensure that the code inside this future cannot be
603        // cancelled if this method is cancelled.
604        let future = async move {
605            debug!("Start handling response");
606
607            // In case the task running this future is detached, we must
608            // ensure responses are handled one at a time. At this point we still own
609            // `position_guard`, so we're fine.
610
611            // Handle the response.
612            let updates = this
613                .handle_response(response, &mut position_guard, requested_required_states)
614                .await?;
615
616            this.cache_to_storage(&position_guard).await?;
617
618            // Release the position guard lock.
619            // It means that other responses can be generated and then handled later.
620            drop(position_guard);
621
622            debug!("Done handling response");
623
624            Ok(updates)
625        };
626
627        spawn(future.instrument(Span::current())).await.unwrap()
628    }
629
630    /// Is the e2ee extension enabled for this sliding sync instance?
631    #[cfg(feature = "e2e-encryption")]
632    fn is_e2ee_enabled(&self) -> bool {
633        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
634    }
635
636    /// Is the thread subscriptions extension enabled for this sliding sync
637    /// instance?
638    fn is_thread_subscriptions_enabled(&self) -> bool {
639        self.inner.sticky.read().unwrap().data().extensions.thread_subscriptions.enabled
640            == Some(true)
641    }
642
643    #[cfg(not(feature = "e2e-encryption"))]
644    fn is_e2ee_enabled(&self) -> bool {
645        false
646    }
647
648    /// Should we process the room's subpart of a response?
649    async fn must_process_rooms_response(&self) -> bool {
650        // We consider that we must, if there's any room subscription or there's any
651        // list.
652        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
653            || !self.inner.lists.read().await.is_empty()
654    }
655
656    /// Send a single sliding sync request, and returns the response summary.
657    ///
658    /// Public for testing purposes only.
659    #[doc(hidden)]
660    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
661    pub async fn sync_once(&self) -> Result<UpdateSummary> {
662        let (request, request_config, position_guard) =
663            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
664
665        // Send the request.
666        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
667
668        // Notify a new sync was received.
669        self.inner.client.inner.sync_beat.notify(usize::MAX);
670
671        Ok(summaries)
672    }
673
674    /// Create a _new_ Sliding Sync sync loop.
675    ///
676    /// This method returns a `Stream`, which will send requests and will handle
677    /// responses automatically. Lists and rooms are updated automatically.
678    ///
679    /// This function returns `Ok(…)` if everything went well, otherwise it will
680    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
681    /// termination.
682    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
683    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
684    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
685        debug!("Starting sync stream");
686
687        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
688
689        stream! {
690            loop {
691                debug!("Sync stream is running");
692
693                select! {
694                    biased;
695
696                    internal_message = internal_channel_receiver.recv() => {
697                        use SlidingSyncInternalMessage::*;
698
699                        debug!(?internal_message, "Sync stream has received an internal message");
700
701                        match internal_message {
702                            Err(_) | Ok(SyncLoopStop) => {
703                                break;
704                            }
705
706                            Ok(SyncLoopSkipOverCurrentIteration) => {
707                                continue;
708                            }
709                        }
710                    }
711
712                    update_summary = self.sync_once() => {
713                        match update_summary {
714                            Ok(updates) => {
715                                yield Ok(updates);
716                            }
717
718                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
719                            Err(error) => {
720                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
721                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
722                                    self.expire_session().await;
723                                }
724
725                                yield Err(error);
726
727                                // Terminates the loop, and terminates the stream.
728                                break;
729                            }
730                        }
731                    }
732                }
733            }
734
735            debug!("Sync stream has exited.");
736        }
737    }
738
739    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
740    ///
741    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
742    /// enough to “stop” it, but depending of how this `Stream` is used, it
743    /// might not be obvious to drop it immediately (thinking of using this API
744    /// over FFI; the foreign-language might not be able to drop a value
745    /// immediately). Thus, calling this method will ensure that the sync loop
746    /// stops gracefully and as soon as it returns.
747    pub fn stop_sync(&self) -> Result<()> {
748        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
749    }
750
751    /// Expire the current Sliding Sync session on the client-side.
752    ///
753    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
754    /// sticky parameters.
755    ///
756    /// This should only be used when it's clear that this session was about to
757    /// expire anyways, and should be used only in very specific cases (e.g.
758    /// multiple sliding syncs being run in parallel, and one of them has
759    /// expired).
760    ///
761    /// This method **MUST** be called when the sync loop is stopped.
762    #[doc(hidden)]
763    pub async fn expire_session(&self) {
764        info!("Session expired; resetting `pos` and sticky parameters");
765
766        {
767            let lists = self.inner.lists.read().await;
768            for list in lists.values() {
769                // Invalidate in-memory data that would be persisted on disk.
770                list.set_maximum_number_of_rooms(None);
771
772                // Invalidate the sticky data for this list.
773                list.invalidate_sticky_data();
774            }
775        }
776
777        // Remove the cached sliding sync state as well.
778        {
779            let mut position = self.inner.position.lock().await;
780
781            // Invalidate in memory.
782            position.pos = None;
783
784            // Propagate to disk.
785            // Note: this propagates both the sliding sync state and the cached lists'
786            // state to disk.
787            if let Err(err) = self.cache_to_storage(&position).await {
788                warn!("Failed to invalidate cached sliding sync state: {err}");
789            }
790        }
791
792        {
793            let mut sticky = self.inner.sticky.write().unwrap();
794
795            // Clear all room subscriptions: we don't want to resend all room subscriptions
796            // when the session will restart.
797            sticky.data_mut().room_subscriptions.clear();
798        }
799    }
800}
801
802impl SlidingSyncInner {
803    /// Send a message over the internal channel.
804    #[instrument]
805    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
806        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
807    }
808
809    /// Send a message over the internal channel if there is a receiver, i.e. if
810    /// the sync loop is running.
811    #[instrument]
812    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
813        // If there is no receiver, the send will fail, but that's OK here.
814        let _ = self.internal_channel.send(message);
815    }
816}
817
818#[derive(Copy, Clone, Debug, PartialEq)]
819enum SlidingSyncInternalMessage {
820    /// Instruct the sync loop to stop.
821    SyncLoopStop,
822
823    /// Instruct the sync loop to skip over any remaining work in its iteration,
824    /// and to jump to the next iteration.
825    SyncLoopSkipOverCurrentIteration,
826}
827
828#[cfg(any(test, feature = "testing"))]
829impl SlidingSync {
830    /// Set a new value for `pos`.
831    pub async fn set_pos(&self, new_pos: String) {
832        let mut position_lock = self.inner.position.lock().await;
833        position_lock.pos = Some(new_pos);
834    }
835
836    /// Read the static extension configuration for this Sliding Sync.
837    ///
838    /// Note: this is not the next content of the sticky parameters, but rightly
839    /// the static configuration that was set during creation of this
840    /// Sliding Sync.
841    pub fn extensions_config(&self) -> http::request::Extensions {
842        let sticky = self.inner.sticky.read().unwrap();
843        sticky.data().extensions.clone()
844    }
845}
846
847#[derive(Clone, Debug)]
848pub(super) struct SlidingSyncPositionMarkers {
849    /// An ephemeral position in the current stream, as received from the
850    /// previous `/sync` response, or `None` for the first request.
851    pos: Option<String>,
852}
853
854/// A summary of the updates received after a sync (like in
855/// [`SlidingSync::sync`]).
856#[derive(Debug, Clone)]
857pub struct UpdateSummary {
858    /// The names of the lists that have seen an update.
859    pub lists: Vec<String>,
860    /// The rooms that have seen updates
861    pub rooms: Vec<OwnedRoomId>,
862}
863
864/// A very basic bool-ish enum to represent the state of a
865/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
866/// once should ideally not being sent again, to mostly save bandwidth.
867#[derive(Debug, Default)]
868enum RoomSubscriptionState {
869    /// The `RoomSubscription` has not been sent or received correctly from the
870    /// server, i.e. the `RoomSubscription` —which is part of the sticky
871    /// parameters— has not been committed.
872    #[default]
873    Pending,
874
875    /// The `RoomSubscription` has been sent and received correctly by the
876    /// server.
877    Applied,
878}
879
880/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
881/// sent in the request.
882#[derive(Debug)]
883pub(super) struct SlidingSyncStickyParameters {
884    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
885    /// but one wants to receive updates.
886    room_subscriptions:
887        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
888
889    /// The intended state of the extensions being supplied to sliding /sync
890    /// calls.
891    extensions: http::request::Extensions,
892}
893
894impl SlidingSyncStickyParameters {
895    /// Create a new set of sticky parameters.
896    pub fn new(
897        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
898        extensions: http::request::Extensions,
899    ) -> Self {
900        Self {
901            room_subscriptions: room_subscriptions
902                .into_iter()
903                .map(|(room_id, room_subscription)| {
904                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
905                })
906                .collect(),
907            extensions,
908        }
909    }
910}
911
912impl StickyData for SlidingSyncStickyParameters {
913    type Request = http::Request;
914
915    fn apply(&self, request: &mut Self::Request) {
916        request.room_subscriptions = self
917            .room_subscriptions
918            .iter()
919            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
920            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
921            .collect();
922        request.extensions = self.extensions.clone();
923    }
924
925    fn on_commit(&mut self) {
926        // All room subscriptions are marked as `Applied`.
927        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
928            if matches!(state, RoomSubscriptionState::Pending) {
929                *state = RoomSubscriptionState::Applied;
930            }
931        }
932    }
933}
934
935#[cfg(all(test, not(target_family = "wasm")))]
936#[allow(clippy::dbg_macro)]
937mod tests {
938    use std::{
939        collections::BTreeMap,
940        future::ready,
941        ops::Not,
942        sync::{Arc, Mutex},
943        time::Duration,
944    };
945
946    use assert_matches::assert_matches;
947    use event_listener::Listener;
948    use futures_util::{StreamExt, future::join_all, pin_mut};
949    use matrix_sdk_base::{RequestedRequiredStates, RoomMemberships};
950    use matrix_sdk_common::executor::spawn;
951    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
952    use ruma::{
953        OwnedRoomId, TransactionId,
954        api::client::error::ErrorKind,
955        assign,
956        events::{direct::DirectEvent, room::member::MembershipState},
957        owned_room_id, room_id,
958        serde::Raw,
959        uint,
960    };
961    use serde::Deserialize;
962    use serde_json::json;
963    use wiremock::{
964        Match, Mock, MockServer, Request, ResponseTemplate, http::Method, matchers::method,
965    };
966
967    use super::{
968        SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
969        SlidingSyncStickyParameters, http,
970        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
971    };
972    use crate::{
973        Client, Result,
974        sliding_sync::cache::restore_sliding_sync_state,
975        test_utils::{logged_in_client, mocks::MatrixMockServer},
976    };
977
978    #[derive(Copy, Clone)]
979    struct SlidingSyncMatcher;
980
981    impl Match for SlidingSyncMatcher {
982        fn matches(&self, request: &Request) -> bool {
983            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
984                && request.method == Method::POST
985        }
986    }
987
988    async fn new_sliding_sync(
989        lists: Vec<SlidingSyncListBuilder>,
990    ) -> Result<(MockServer, SlidingSync)> {
991        let server = MockServer::start().await;
992        let client = logged_in_client(Some(server.uri())).await;
993
994        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
995
996        for list in lists {
997            sliding_sync_builder = sliding_sync_builder.add_list(list);
998        }
999
1000        let sliding_sync = sliding_sync_builder.build().await?;
1001
1002        Ok((server, sliding_sync))
1003    }
1004
1005    #[async_test]
1006    async fn test_subscribe_to_rooms() -> Result<()> {
1007        let (server, sliding_sync) = new_sliding_sync(vec![
1008            SlidingSyncList::builder("foo")
1009                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1010        ])
1011        .await?;
1012
1013        let stream = sliding_sync.sync();
1014        pin_mut!(stream);
1015
1016        let room_id_0 = room_id!("!r0:bar.org");
1017        let room_id_1 = room_id!("!r1:bar.org");
1018        let room_id_2 = room_id!("!r2:bar.org");
1019
1020        {
1021            let _mock_guard = Mock::given(SlidingSyncMatcher)
1022                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1023                    "pos": "1",
1024                    "lists": {},
1025                    "rooms": {
1026                        room_id_0: {
1027                            "name": "Room #0",
1028                            "initial": true,
1029                        },
1030                        room_id_1: {
1031                            "name": "Room #1",
1032                            "initial": true,
1033                        },
1034                        room_id_2: {
1035                            "name": "Room #2",
1036                            "initial": true,
1037                        },
1038                    }
1039                })))
1040                .mount_as_scoped(&server)
1041                .await;
1042
1043            let _ = stream.next().await.unwrap()?;
1044        }
1045
1046        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1047
1048        // Members aren't synced.
1049        // We need to make them synced, so that we can test that subscribing to a room
1050        // make members not synced. That's a desired feature.
1051        assert!(room0.are_members_synced().not());
1052
1053        {
1054            struct MemberMatcher(OwnedRoomId);
1055
1056            impl Match for MemberMatcher {
1057                fn matches(&self, request: &Request) -> bool {
1058                    request.url.path()
1059                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1060                        && request.method == Method::GET
1061                }
1062            }
1063
1064            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1065                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1066                    "chunk": [],
1067                })))
1068                .mount_as_scoped(&server)
1069                .await;
1070
1071            assert_matches!(room0.request_members().await, Ok(()));
1072        }
1073
1074        // Members are now synced! We can start subscribing and see how it goes.
1075        assert!(room0.are_members_synced());
1076
1077        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1078
1079        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1080        // now marked as not synced.
1081        assert!(room0.are_members_synced().not());
1082
1083        {
1084            let sticky = sliding_sync.inner.sticky.read().unwrap();
1085            let room_subscriptions = &sticky.data().room_subscriptions;
1086
1087            assert!(room_subscriptions.contains_key(room_id_0));
1088            assert!(room_subscriptions.contains_key(room_id_1));
1089            assert!(!room_subscriptions.contains_key(room_id_2));
1090        }
1091
1092        // Subscribing to the same room doesn't reset the member sync state.
1093
1094        {
1095            struct MemberMatcher(OwnedRoomId);
1096
1097            impl Match for MemberMatcher {
1098                fn matches(&self, request: &Request) -> bool {
1099                    request.url.path()
1100                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1101                        && request.method == Method::GET
1102                }
1103            }
1104
1105            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1106                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1107                    "chunk": [],
1108                })))
1109                .mount_as_scoped(&server)
1110                .await;
1111
1112            assert_matches!(room0.request_members().await, Ok(()));
1113        }
1114
1115        // Members are synced, good, good.
1116        assert!(room0.are_members_synced());
1117
1118        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1119
1120        // Members are still synced: because we have already subscribed to the
1121        // room, the members aren't marked as unsynced.
1122        assert!(room0.are_members_synced());
1123
1124        Ok(())
1125    }
1126
1127    #[async_test]
1128    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1129        let (_server, sliding_sync) = new_sliding_sync(vec![
1130            SlidingSyncList::builder("foo")
1131                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1132        ])
1133        .await?;
1134
1135        let room_id_0 = room_id!("!r0:bar.org");
1136        let room_id_1 = room_id!("!r1:bar.org");
1137        let room_id_2 = room_id!("!r2:bar.org");
1138
1139        // Subscribe to two rooms.
1140        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1141
1142        {
1143            let sticky = sliding_sync.inner.sticky.read().unwrap();
1144            let room_subscriptions = &sticky.data().room_subscriptions;
1145
1146            assert!(room_subscriptions.contains_key(room_id_0));
1147            assert!(room_subscriptions.contains_key(room_id_1));
1148            assert!(room_subscriptions.contains_key(room_id_2).not());
1149        }
1150
1151        // Subscribe to one more room.
1152        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1153
1154        {
1155            let sticky = sliding_sync.inner.sticky.read().unwrap();
1156            let room_subscriptions = &sticky.data().room_subscriptions;
1157
1158            assert!(room_subscriptions.contains_key(room_id_0));
1159            assert!(room_subscriptions.contains_key(room_id_1));
1160            assert!(room_subscriptions.contains_key(room_id_2));
1161        }
1162
1163        // Suddenly, the session expires!
1164        sliding_sync.expire_session().await;
1165
1166        {
1167            let sticky = sliding_sync.inner.sticky.read().unwrap();
1168            let room_subscriptions = &sticky.data().room_subscriptions;
1169
1170            assert!(room_subscriptions.is_empty());
1171        }
1172
1173        // Subscribe to one room again.
1174        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1175
1176        {
1177            let sticky = sliding_sync.inner.sticky.read().unwrap();
1178            let room_subscriptions = &sticky.data().room_subscriptions;
1179
1180            assert!(room_subscriptions.contains_key(room_id_0).not());
1181            assert!(room_subscriptions.contains_key(room_id_1).not());
1182            assert!(room_subscriptions.contains_key(room_id_2));
1183        }
1184
1185        Ok(())
1186    }
1187
1188    #[async_test]
1189    async fn test_add_list() -> Result<()> {
1190        let (_server, sliding_sync) = new_sliding_sync(vec![
1191            SlidingSyncList::builder("foo")
1192                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
1193        ])
1194        .await?;
1195
1196        let _stream = sliding_sync.sync();
1197        pin_mut!(_stream);
1198
1199        sliding_sync
1200            .add_list(
1201                SlidingSyncList::builder("bar")
1202                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1203            )
1204            .await?;
1205
1206        let lists = sliding_sync.inner.lists.read().await;
1207
1208        assert!(lists.contains_key("foo"));
1209        assert!(lists.contains_key("bar"));
1210
1211        // this test also ensures that Tokio is not panicking when calling `add_list`.
1212
1213        Ok(())
1214    }
1215
1216    #[test]
1217    fn test_sticky_parameters_api_invalidated_flow() {
1218        let r0 = room_id!("!r0.matrix.org");
1219        let r1 = room_id!("!r1:matrix.org");
1220
1221        let mut room_subscriptions = BTreeMap::new();
1222        room_subscriptions.insert(r0.to_owned(), Default::default());
1223
1224        // At first it's invalidated.
1225        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1226            room_subscriptions,
1227            Default::default(),
1228        ));
1229        assert!(sticky.is_invalidated());
1230
1231        // Then when we create a request, the sticky parameters are applied.
1232        let txn_id: &TransactionId = "tid123".into();
1233
1234        let mut request = http::Request::default();
1235        request.txn_id = Some(txn_id.to_string());
1236
1237        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1238
1239        assert!(request.txn_id.is_some());
1240        assert_eq!(request.room_subscriptions.len(), 1);
1241        assert!(request.room_subscriptions.contains_key(r0));
1242
1243        let tid = request.txn_id.unwrap();
1244
1245        sticky.maybe_commit(tid.as_str().into());
1246        assert!(!sticky.is_invalidated());
1247
1248        // Applying new parameters will invalidate again.
1249        sticky
1250            .data_mut()
1251            .room_subscriptions
1252            .insert(r1.to_owned(), (Default::default(), Default::default()));
1253        assert!(sticky.is_invalidated());
1254
1255        // Committing with the wrong transaction id will keep it invalidated.
1256        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1257        assert!(sticky.is_invalidated());
1258
1259        // Restarting a request will only remember the last generated transaction id.
1260        let txn_id1: &TransactionId = "tid456".into();
1261        let mut request1 = http::Request::default();
1262        request1.txn_id = Some(txn_id1.to_string());
1263        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1264
1265        assert!(sticky.is_invalidated());
1266        // The first room subscription has been applied to `request`, so it's not
1267        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1268        // related to the sticky design.
1269        assert_eq!(request1.room_subscriptions.len(), 1);
1270        assert!(request1.room_subscriptions.contains_key(r1));
1271
1272        let txn_id2: &TransactionId = "tid789".into();
1273        let mut request2 = http::Request::default();
1274        request2.txn_id = Some(txn_id2.to_string());
1275
1276        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1277        assert!(sticky.is_invalidated());
1278        // `request2` contains `r1` because the sticky parameters have not been
1279        // committed, so it's still marked as pending.
1280        assert_eq!(request2.room_subscriptions.len(), 1);
1281        assert!(request2.room_subscriptions.contains_key(r1));
1282
1283        // Here we commit with the not most-recent TID, so it keeps the invalidated
1284        // status.
1285        sticky.maybe_commit(txn_id1);
1286        assert!(sticky.is_invalidated());
1287
1288        // But here we use the latest TID, so the commit is effective.
1289        sticky.maybe_commit(txn_id2);
1290        assert!(!sticky.is_invalidated());
1291    }
1292
1293    #[test]
1294    fn test_room_subscriptions_are_sticky() {
1295        let r0 = room_id!("!r0.matrix.org");
1296        let r1 = room_id!("!r1:matrix.org");
1297
1298        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1299            BTreeMap::new(),
1300            Default::default(),
1301        ));
1302
1303        // A room subscription is added, applied, and committed.
1304        {
1305            // Insert `r0`.
1306            sticky
1307                .data_mut()
1308                .room_subscriptions
1309                .insert(r0.to_owned(), (Default::default(), Default::default()));
1310
1311            // Then the sticky parameters are applied.
1312            let txn_id: &TransactionId = "tid0".into();
1313            let mut request = http::Request::default();
1314            request.txn_id = Some(txn_id.to_string());
1315
1316            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1317
1318            assert!(request.txn_id.is_some());
1319            assert_eq!(request.room_subscriptions.len(), 1);
1320            assert!(request.room_subscriptions.contains_key(r0));
1321
1322            // Then the sticky parameters are committed.
1323            let tid = request.txn_id.unwrap();
1324
1325            sticky.maybe_commit(tid.as_str().into());
1326        }
1327
1328        // A room subscription is added, applied, but NOT committed.
1329        {
1330            // Insert `r1`.
1331            sticky
1332                .data_mut()
1333                .room_subscriptions
1334                .insert(r1.to_owned(), (Default::default(), Default::default()));
1335
1336            // Then the sticky parameters are applied.
1337            let txn_id: &TransactionId = "tid1".into();
1338            let mut request = http::Request::default();
1339            request.txn_id = Some(txn_id.to_string());
1340
1341            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1342
1343            assert!(request.txn_id.is_some());
1344            assert_eq!(request.room_subscriptions.len(), 1);
1345            // `r0` is not present, it's only `r1`.
1346            assert!(request.room_subscriptions.contains_key(r1));
1347
1348            // Then the sticky parameters are NOT committed.
1349            // It can happen if the request has failed to be sent for example,
1350            // or if the response didn't match.
1351        }
1352
1353        // A previously added room subscription is re-added, applied, and committed.
1354        {
1355            // Then the sticky parameters are applied.
1356            let txn_id: &TransactionId = "tid2".into();
1357            let mut request = http::Request::default();
1358            request.txn_id = Some(txn_id.to_string());
1359
1360            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1361
1362            assert!(request.txn_id.is_some());
1363            assert_eq!(request.room_subscriptions.len(), 1);
1364            // `r0` is not present, it's only `r1`.
1365            assert!(request.room_subscriptions.contains_key(r1));
1366
1367            // Then the sticky parameters are committed.
1368            let tid = request.txn_id.unwrap();
1369
1370            sticky.maybe_commit(tid.as_str().into());
1371        }
1372
1373        // All room subscriptions have been committed.
1374        {
1375            // Then the sticky parameters are applied.
1376            let txn_id: &TransactionId = "tid3".into();
1377            let mut request = http::Request::default();
1378            request.txn_id = Some(txn_id.to_string());
1379
1380            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1381
1382            assert!(request.txn_id.is_some());
1383            // All room subscriptions have been sent.
1384            assert!(request.room_subscriptions.is_empty());
1385        }
1386    }
1387
1388    #[test]
1389    fn test_extensions_are_sticky() {
1390        let mut extensions = http::request::Extensions::default();
1391        extensions.account_data.enabled = Some(true);
1392
1393        // At first it's invalidated.
1394        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1395            Default::default(),
1396            extensions,
1397        ));
1398
1399        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1400
1401        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1402        // to-device.
1403        let extensions = &sticky.data().extensions;
1404        assert_eq!(extensions.e2ee.enabled, None);
1405        assert_eq!(extensions.to_device.enabled, None);
1406        assert_eq!(extensions.to_device.since, None);
1407
1408        // What the user explicitly enabled is… enabled.
1409        assert_eq!(extensions.account_data.enabled, Some(true));
1410
1411        let txn_id: &TransactionId = "tid123".into();
1412        let mut request = http::Request::default();
1413        request.txn_id = Some(txn_id.to_string());
1414        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1415        assert!(sticky.is_invalidated());
1416        assert_eq!(request.extensions.to_device.enabled, None);
1417        assert_eq!(request.extensions.to_device.since, None);
1418        assert_eq!(request.extensions.e2ee.enabled, None);
1419        assert_eq!(request.extensions.account_data.enabled, Some(true));
1420    }
1421
1422    #[async_test]
1423    async fn test_sticky_extensions_plus_since() -> Result<()> {
1424        let server = MockServer::start().await;
1425        let client = logged_in_client(Some(server.uri())).await;
1426
1427        let sync = client
1428            .sliding_sync("test-slidingsync")?
1429            .add_list(SlidingSyncList::builder("new_list"))
1430            .build()
1431            .await?;
1432
1433        // No extensions have been explicitly enabled here.
1434        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1435        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1436        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1437
1438        // Now enable e2ee and to-device.
1439        let sync = client
1440            .sliding_sync("test-slidingsync")?
1441            .add_list(SlidingSyncList::builder("new_list"))
1442            .with_to_device_extension(
1443                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1444            )
1445            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1446            .build()
1447            .await?;
1448
1449        // Even without a since token, the first request will contain the extensions
1450        // configuration, at least.
1451        let txn_id = TransactionId::new();
1452        let (request, _, _) = sync
1453            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1454            .await?;
1455
1456        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1457        assert_eq!(request.extensions.to_device.enabled, Some(true));
1458        assert!(request.extensions.to_device.since.is_none());
1459
1460        {
1461            // Committing with another transaction id doesn't validate anything.
1462            let mut sticky = sync.inner.sticky.write().unwrap();
1463            assert!(sticky.is_invalidated());
1464            sticky.maybe_commit(
1465                "hopefully the rng won't generate this very specific transaction id".into(),
1466            );
1467            assert!(sticky.is_invalidated());
1468        }
1469
1470        // Regenerating a request will yield the same one.
1471        let txn_id2 = TransactionId::new();
1472        let (request, _, _) = sync
1473            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1474            .await?;
1475
1476        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1477        assert_eq!(request.extensions.to_device.enabled, Some(true));
1478        assert!(request.extensions.to_device.since.is_none());
1479
1480        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1481
1482        {
1483            // Committing with the expected transaction id will validate it.
1484            let mut sticky = sync.inner.sticky.write().unwrap();
1485            assert!(sticky.is_invalidated());
1486            sticky.maybe_commit(txn_id2.as_str().into());
1487            assert!(!sticky.is_invalidated());
1488        }
1489
1490        // The next request should contain no sticky parameters.
1491        let txn_id = TransactionId::new();
1492        let (request, _, _) = sync
1493            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1494            .await?;
1495        assert!(request.extensions.e2ee.enabled.is_none());
1496        assert!(request.extensions.to_device.enabled.is_none());
1497        assert!(request.extensions.to_device.since.is_none());
1498
1499        // If there's a to-device `since` token, we make sure we put the token
1500        // into the extension config. The rest doesn't need to be re-enabled due to
1501        // stickiness.
1502        let _since_token = "since";
1503
1504        #[cfg(feature = "e2e-encryption")]
1505        {
1506            use matrix_sdk_base::crypto::store::types::Changes;
1507            if let Some(olm_machine) = &*client.olm_machine().await {
1508                olm_machine
1509                    .store()
1510                    .save_changes(Changes {
1511                        next_batch_token: Some(_since_token.to_owned()),
1512                        ..Default::default()
1513                    })
1514                    .await?;
1515            }
1516        }
1517
1518        let txn_id = TransactionId::new();
1519        let (request, _, _) = sync
1520            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1521            .await?;
1522
1523        assert!(request.extensions.e2ee.enabled.is_none());
1524        assert!(request.extensions.to_device.enabled.is_none());
1525
1526        #[cfg(feature = "e2e-encryption")]
1527        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1528
1529        Ok(())
1530    }
1531
1532    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1533    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1534    // `/key/query` requests must be sent. See the code to see the details.
1535    //
1536    // This test is asserting that.
1537    #[async_test]
1538    #[cfg(feature = "e2e-encryption")]
1539    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1540        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1541        use matrix_sdk_test::ruma_response_from_json;
1542        use ruma::user_id;
1543
1544        let server = MockServer::start().await;
1545        let client = logged_in_client(Some(server.uri())).await;
1546
1547        let alice = user_id!("@alice:localhost");
1548        let bob = user_id!("@bob:localhost");
1549        let me = user_id!("@example:localhost");
1550
1551        // Track and mark users are not dirty, so that we can check they are “dirty”
1552        // after that. Dirty here means that a `/key/query` must be sent.
1553        {
1554            let olm_machine = client.olm_machine().await;
1555            let olm_machine = olm_machine.as_ref().unwrap();
1556
1557            olm_machine.update_tracked_users([alice, bob]).await?;
1558
1559            // Assert requests.
1560            let outgoing_requests = olm_machine.outgoing_requests().await?;
1561
1562            assert_eq!(outgoing_requests.len(), 2);
1563            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1564            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1565
1566            // Fake responses.
1567            olm_machine
1568                .mark_request_as_sent(
1569                    outgoing_requests[0].request_id(),
1570                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1571                        "one_time_key_counts": {}
1572                    }))),
1573                )
1574                .await?;
1575
1576            olm_machine
1577                .mark_request_as_sent(
1578                    outgoing_requests[1].request_id(),
1579                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1580                        "device_keys": {
1581                            alice: {},
1582                            bob: {},
1583                        }
1584                    }))),
1585                )
1586                .await?;
1587
1588            // Once more.
1589            let outgoing_requests = olm_machine.outgoing_requests().await?;
1590
1591            assert_eq!(outgoing_requests.len(), 1);
1592            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1593
1594            olm_machine
1595                .mark_request_as_sent(
1596                    outgoing_requests[0].request_id(),
1597                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1598                        "device_keys": {
1599                            me: {},
1600                        }
1601                    }))),
1602                )
1603                .await?;
1604
1605            // No more.
1606            let outgoing_requests = olm_machine.outgoing_requests().await?;
1607
1608            assert!(outgoing_requests.is_empty());
1609        }
1610
1611        let sync = client
1612            .sliding_sync("test-slidingsync")?
1613            .add_list(SlidingSyncList::builder("new_list"))
1614            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1615            .build()
1616            .await?;
1617
1618        // First request: no `pos`.
1619        let txn_id = TransactionId::new();
1620        let (_request, _, _) = sync
1621            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1622            .await?;
1623
1624        // Now, tracked users must be dirty.
1625        {
1626            let olm_machine = client.olm_machine().await;
1627            let olm_machine = olm_machine.as_ref().unwrap();
1628
1629            // Assert requests.
1630            let outgoing_requests = olm_machine.outgoing_requests().await?;
1631
1632            assert_eq!(outgoing_requests.len(), 1);
1633            assert_matches!(
1634                outgoing_requests[0].request(),
1635                AnyOutgoingRequest::KeysQuery(request) => {
1636                    assert!(request.device_keys.contains_key(alice));
1637                    assert!(request.device_keys.contains_key(bob));
1638                    assert!(request.device_keys.contains_key(me));
1639                }
1640            );
1641
1642            // Fake responses.
1643            olm_machine
1644                .mark_request_as_sent(
1645                    outgoing_requests[0].request_id(),
1646                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1647                        "device_keys": {
1648                            alice: {},
1649                            bob: {},
1650                            me: {},
1651                        }
1652                    }))),
1653                )
1654                .await?;
1655        }
1656
1657        // Second request: with a `pos` this time.
1658        sync.set_pos("chocolat".to_owned()).await;
1659
1660        let txn_id = TransactionId::new();
1661        let (_request, _, _) = sync
1662            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1663            .await?;
1664
1665        // Tracked users are not marked as dirty.
1666        {
1667            let olm_machine = client.olm_machine().await;
1668            let olm_machine = olm_machine.as_ref().unwrap();
1669
1670            // Assert requests.
1671            let outgoing_requests = olm_machine.outgoing_requests().await?;
1672
1673            assert!(outgoing_requests.is_empty());
1674        }
1675
1676        Ok(())
1677    }
1678
1679    #[async_test]
1680    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1681        let server = MockServer::start().await;
1682        let client = logged_in_client(Some(server.uri())).await;
1683
1684        let sliding_sync = client
1685            .sliding_sync("test-slidingsync")?
1686            .with_to_device_extension(
1687                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1688            )
1689            .build()
1690            .await?;
1691
1692        // First request asks to enable the extension.
1693        let (request, _, _) =
1694            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1695        assert!(request.extensions.to_device.enabled.is_some());
1696
1697        let sync = sliding_sync.sync();
1698        pin_mut!(sync);
1699
1700        // `pos` is `None` to start with.
1701        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1702
1703        #[derive(Deserialize)]
1704        struct PartialRequest {
1705            txn_id: Option<String>,
1706        }
1707
1708        {
1709            let _mock_guard = Mock::given(SlidingSyncMatcher)
1710                .respond_with(|request: &Request| {
1711                    // Repeat the txn_id in the response, if set.
1712                    let request: PartialRequest = request.body_json().unwrap();
1713
1714                    ResponseTemplate::new(200).set_body_json(json!({
1715                        "txn_id": request.txn_id,
1716                        "pos": "0",
1717                    }))
1718                })
1719                .mount_as_scoped(&server)
1720                .await;
1721
1722            let next = sync.next().await;
1723            assert_matches!(next, Some(Ok(_update_summary)));
1724
1725            // `pos` has been updated.
1726            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1727        }
1728
1729        // Next request doesn't ask to enable the extension.
1730        let (request, _, _) =
1731            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1732        assert!(request.extensions.to_device.enabled.is_none());
1733
1734        // Next request is successful.
1735        {
1736            let _mock_guard = Mock::given(SlidingSyncMatcher)
1737                .respond_with(|request: &Request| {
1738                    // Repeat the txn_id in the response, if set.
1739                    let request: PartialRequest = request.body_json().unwrap();
1740
1741                    ResponseTemplate::new(200).set_body_json(json!({
1742                        "txn_id": request.txn_id,
1743                        "pos": "1",
1744                    }))
1745                })
1746                .mount_as_scoped(&server)
1747                .await;
1748
1749            let next = sync.next().await;
1750            assert_matches!(next, Some(Ok(_update_summary)));
1751
1752            // `pos` has been updated.
1753            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1754        }
1755
1756        // Next request is successful despite it receives an already
1757        // received `pos` from the server.
1758        {
1759            let _mock_guard = Mock::given(SlidingSyncMatcher)
1760                .respond_with(|request: &Request| {
1761                    // Repeat the txn_id in the response, if set.
1762                    let request: PartialRequest = request.body_json().unwrap();
1763
1764                    ResponseTemplate::new(200).set_body_json(json!({
1765                        "txn_id": request.txn_id,
1766                        "pos": "0", // <- already received!
1767                    }))
1768                })
1769                .up_to_n_times(1) // run this mock only once.
1770                .mount_as_scoped(&server)
1771                .await;
1772
1773            let next = sync.next().await;
1774            assert_matches!(next, Some(Ok(_update_summary)));
1775
1776            // `pos` has been updated.
1777            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1778        }
1779
1780        // Stop responding with successful requests!
1781        //
1782        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1783        // so they're reset. It also resets the `pos`.
1784        {
1785            let _mock_guard = Mock::given(SlidingSyncMatcher)
1786                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1787                    "error": "foo",
1788                    "errcode": "M_UNKNOWN_POS",
1789                })))
1790                .mount_as_scoped(&server)
1791                .await;
1792
1793            let next = sync.next().await;
1794
1795            // The expected error is returned.
1796            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1797
1798            // `pos` has been reset.
1799            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1800
1801            // Next request asks to enable the extension again.
1802            let (request, _, _) =
1803                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1804
1805            assert!(request.extensions.to_device.enabled.is_some());
1806
1807            // `sync` has been stopped.
1808            assert!(sync.next().await.is_none());
1809        }
1810
1811        Ok(())
1812    }
1813
1814    #[cfg(feature = "e2e-encryption")]
1815    #[async_test]
1816    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1817        let server = MockServer::start().await;
1818
1819        #[derive(Deserialize)]
1820        struct PartialRequest {
1821            txn_id: Option<String>,
1822        }
1823
1824        let server_pos = Arc::new(Mutex::new(0));
1825        let _mock_guard = Mock::given(SlidingSyncMatcher)
1826            .respond_with(move |request: &Request| {
1827                // Repeat the txn_id in the response, if set.
1828                let request: PartialRequest = request.body_json().unwrap();
1829                let pos = {
1830                    let mut pos = server_pos.lock().unwrap();
1831                    let prev = *pos;
1832                    *pos += 1;
1833                    prev
1834                };
1835
1836                ResponseTemplate::new(200).set_body_json(json!({
1837                    "txn_id": request.txn_id,
1838                    "pos": pos.to_string(),
1839                }))
1840            })
1841            .mount_as_scoped(&server)
1842            .await;
1843
1844        let client = logged_in_client(Some(server.uri())).await;
1845
1846        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1847
1848        // `pos` is `None` to start with.
1849        {
1850            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1851
1852            let (request, _, _) =
1853                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1854            assert!(request.pos.is_none());
1855        }
1856
1857        let sync = sliding_sync.sync();
1858        pin_mut!(sync);
1859
1860        // Sync goes well, and then the position is saved both into the internal memory
1861        // and the database.
1862        let next = sync.next().await;
1863        assert_matches!(next, Some(Ok(_update_summary)));
1864
1865        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1866
1867        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1868            .await?
1869            .expect("must have restored fields");
1870
1871        // While it has been saved into the database, it's not necessarily going to be
1872        // used later!
1873        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1874
1875        // Now, even if we mess with the position stored in the database, the sliding
1876        // sync instance isn't configured to reload the stream position from the
1877        // database, so it won't be changed.
1878        {
1879            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1880
1881            let mut position_guard = other_sync.inner.position.lock().await;
1882            position_guard.pos = Some("yolo".to_owned());
1883
1884            other_sync.cache_to_storage(&position_guard).await?;
1885        }
1886
1887        // It's still 0, not "yolo".
1888        {
1889            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1890            let (request, _, _) =
1891                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1892            assert_eq!(request.pos.as_deref(), Some("0"));
1893        }
1894
1895        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1896        // asked to.
1897        {
1898            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1899            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1900        }
1901
1902        Ok(())
1903    }
1904
1905    #[cfg(feature = "e2e-encryption")]
1906    #[async_test]
1907    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1908        let server = MockServer::start().await;
1909
1910        #[derive(Deserialize)]
1911        struct PartialRequest {
1912            txn_id: Option<String>,
1913        }
1914
1915        let server_pos = Arc::new(Mutex::new(0));
1916        let _mock_guard = Mock::given(SlidingSyncMatcher)
1917            .respond_with(move |request: &Request| {
1918                // Repeat the txn_id in the response, if set.
1919                let request: PartialRequest = request.body_json().unwrap();
1920                let pos = {
1921                    let mut pos = server_pos.lock().unwrap();
1922                    let prev = *pos;
1923                    *pos += 1;
1924                    prev
1925                };
1926
1927                ResponseTemplate::new(200).set_body_json(json!({
1928                    "txn_id": request.txn_id,
1929                    "pos": pos.to_string(),
1930                }))
1931            })
1932            .mount_as_scoped(&server)
1933            .await;
1934
1935        let client = logged_in_client(Some(server.uri())).await;
1936
1937        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1938
1939        // `pos` is `None` to start with.
1940        {
1941            let (request, _, _) =
1942                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1943
1944            assert!(request.pos.is_none());
1945            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1946        }
1947
1948        let sync = sliding_sync.sync();
1949        pin_mut!(sync);
1950
1951        // Sync goes well, and then the position is saved both into the internal memory
1952        // and the database.
1953        let next = sync.next().await;
1954        assert_matches!(next, Some(Ok(_update_summary)));
1955
1956        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1957
1958        let restored_fields = restore_sliding_sync_state(&client, &sliding_sync.inner.storage_key)
1959            .await?
1960            .expect("must have restored fields");
1961
1962        // While it has been saved into the database, it's not necessarily going to be
1963        // used later!
1964        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1965
1966        // Another process modifies the stream position under our feet...
1967        {
1968            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
1969
1970            let mut position_guard = other_sync.inner.position.lock().await;
1971            position_guard.pos = Some("42".to_owned());
1972
1973            other_sync.cache_to_storage(&position_guard).await?;
1974        }
1975
1976        // It's alright, the next request will load it from the database.
1977        {
1978            let (request, _, _) =
1979                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1980            assert_eq!(request.pos.as_deref(), Some("42"));
1981            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1982        }
1983
1984        // Recreating a sliding sync with the same ID will reload it too.
1985        {
1986            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1987            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
1988
1989            let (request, _, _) =
1990                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1991            assert_eq!(request.pos.as_deref(), Some("42"));
1992        }
1993
1994        // Invalidating the session will remove the in-memory value AND the database
1995        // value.
1996        sliding_sync.expire_session().await;
1997
1998        {
1999            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2000
2001            let (request, _, _) =
2002                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2003            assert!(request.pos.is_none());
2004        }
2005
2006        // And new sliding syncs with the same ID won't find it either.
2007        {
2008            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2009            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2010
2011            let (request, _, _) =
2012                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2013            assert!(request.pos.is_none());
2014        }
2015
2016        Ok(())
2017    }
2018
2019    #[async_test]
2020    async fn test_stop_sync_loop() -> Result<()> {
2021        let (_server, sliding_sync) = new_sliding_sync(vec![
2022            SlidingSyncList::builder("foo")
2023                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2024        ])
2025        .await?;
2026
2027        // Start the sync loop.
2028        let stream = sliding_sync.sync();
2029        pin_mut!(stream);
2030
2031        // The sync loop is actually running.
2032        assert!(stream.next().await.is_some());
2033
2034        // Stop the sync loop.
2035        sliding_sync.stop_sync()?;
2036
2037        // The sync loop is actually stopped.
2038        assert!(stream.next().await.is_none());
2039
2040        // Start a new sync loop.
2041        let stream = sliding_sync.sync();
2042        pin_mut!(stream);
2043
2044        // The sync loop is actually running.
2045        assert!(stream.next().await.is_some());
2046
2047        Ok(())
2048    }
2049
2050    #[async_test]
2051    async fn test_process_read_receipts() -> Result<()> {
2052        let room = owned_room_id!("!pony:example.org");
2053
2054        let server = MockServer::start().await;
2055        let client = logged_in_client(Some(server.uri())).await;
2056        client.event_cache().subscribe().unwrap();
2057
2058        let sliding_sync = client
2059            .sliding_sync("test")?
2060            .with_receipt_extension(
2061                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2062            )
2063            .add_list(
2064                SlidingSyncList::builder("all")
2065                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2066            )
2067            .build()
2068            .await?;
2069
2070        // Initial state.
2071        {
2072            let server_response = assign!(http::Response::new("0".to_owned()), {
2073                rooms: BTreeMap::from([(
2074                    room.clone(),
2075                    http::response::Room::default(),
2076                )])
2077            });
2078
2079            let _summary = {
2080                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2081                sliding_sync
2082                    .handle_response(
2083                        server_response.clone(),
2084                        &mut pos_guard,
2085                        RequestedRequiredStates::default(),
2086                    )
2087                    .await?
2088            };
2089        }
2090
2091        let server_response = assign!(http::Response::new("1".to_owned()), {
2092            extensions: assign!(http::response::Extensions::default(), {
2093                receipts: assign!(http::response::Receipts::default(), {
2094                    rooms: BTreeMap::from([
2095                        (
2096                            room.clone(),
2097                            Raw::from_json_string(
2098                                json!({
2099                                    "room_id": room,
2100                                    "type": "m.receipt",
2101                                    "content": {
2102                                        "$event:bar.org": {
2103                                            "m.read": {
2104                                                client.user_id().unwrap(): {
2105                                                    "ts": 1436451550,
2106                                                }
2107                                            }
2108                                        }
2109                                    }
2110                                })
2111                                .to_string(),
2112                            ).unwrap()
2113                        )
2114                    ])
2115                })
2116            })
2117        });
2118
2119        let summary = {
2120            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2121            sliding_sync
2122                .handle_response(
2123                    server_response.clone(),
2124                    &mut pos_guard,
2125                    RequestedRequiredStates::default(),
2126                )
2127                .await?
2128        };
2129
2130        assert!(summary.rooms.contains(&room));
2131
2132        Ok(())
2133    }
2134
2135    #[async_test]
2136    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2137        let room_id = owned_room_id!("!unicorn:example.org");
2138
2139        let server = MockServer::start().await;
2140        let client = logged_in_client(Some(server.uri())).await;
2141
2142        // Setup sliding sync with with one room and one list
2143
2144        let sliding_sync = client
2145            .sliding_sync("test")?
2146            .with_account_data_extension(
2147                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2148            )
2149            .add_list(
2150                SlidingSyncList::builder("all")
2151                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2152            )
2153            .build()
2154            .await?;
2155
2156        // Initial state.
2157        {
2158            let server_response = assign!(http::Response::new("0".to_owned()), {
2159                rooms: BTreeMap::from([(
2160                    room_id.clone(),
2161                    http::response::Room::default(),
2162                )])
2163            });
2164
2165            let _summary = {
2166                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2167                sliding_sync
2168                    .handle_response(
2169                        server_response.clone(),
2170                        &mut pos_guard,
2171                        RequestedRequiredStates::default(),
2172                    )
2173                    .await?
2174            };
2175        }
2176
2177        // Simulate a response that only changes the marked unread state of the room to
2178        // true
2179
2180        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2181
2182        let update_summary = {
2183            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2184            sliding_sync
2185                .handle_response(
2186                    server_response.clone(),
2187                    &mut pos_guard,
2188                    RequestedRequiredStates::default(),
2189                )
2190                .await?
2191        };
2192
2193        // Check that the list list and entry received the update
2194
2195        assert!(update_summary.rooms.contains(&room_id));
2196
2197        let room = client.get_room(&room_id).unwrap();
2198
2199        // Check the actual room data, this powers RoomInfo
2200
2201        assert!(room.is_marked_unread());
2202
2203        // Change it back to false and check if it updates
2204
2205        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2206
2207        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2208        sliding_sync
2209            .handle_response(
2210                server_response.clone(),
2211                &mut pos_guard,
2212                RequestedRequiredStates::default(),
2213            )
2214            .await?;
2215
2216        let room = client.get_room(&room_id).unwrap();
2217
2218        assert!(!room.is_marked_unread());
2219
2220        Ok(())
2221    }
2222
2223    fn make_mark_unread_response(
2224        response_number: &str,
2225        room_id: OwnedRoomId,
2226        unread: bool,
2227        add_rooms_section: bool,
2228    ) -> http::Response {
2229        let rooms = if add_rooms_section {
2230            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2231        } else {
2232            BTreeMap::new()
2233        };
2234
2235        let extensions = assign!(http::response::Extensions::default(), {
2236            account_data: assign!(http::response::AccountData::default(), {
2237                rooms: BTreeMap::from([
2238                    (
2239                        room_id,
2240                        vec![
2241                            Raw::from_json_string(
2242                                json!({
2243                                    "content": {
2244                                        "unread": unread
2245                                    },
2246                                    "type": "m.marked_unread"
2247                                })
2248                                .to_string(),
2249                            ).unwrap()
2250                        ]
2251                    )
2252                ])
2253            })
2254        });
2255
2256        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2257    }
2258
2259    #[async_test]
2260    async fn test_process_rooms_account_data() -> Result<()> {
2261        let room = owned_room_id!("!pony:example.org");
2262
2263        let server = MockServer::start().await;
2264        let client = logged_in_client(Some(server.uri())).await;
2265
2266        let sliding_sync = client
2267            .sliding_sync("test")?
2268            .with_account_data_extension(
2269                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2270            )
2271            .add_list(
2272                SlidingSyncList::builder("all")
2273                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2274            )
2275            .build()
2276            .await?;
2277
2278        // Initial state.
2279        {
2280            let server_response = assign!(http::Response::new("0".to_owned()), {
2281                rooms: BTreeMap::from([(
2282                    room.clone(),
2283                    http::response::Room::default(),
2284                )])
2285            });
2286
2287            let _summary = {
2288                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2289                sliding_sync
2290                    .handle_response(
2291                        server_response.clone(),
2292                        &mut pos_guard,
2293                        RequestedRequiredStates::default(),
2294                    )
2295                    .await?
2296            };
2297        }
2298
2299        let server_response = assign!(http::Response::new("1".to_owned()), {
2300            extensions: assign!(http::response::Extensions::default(), {
2301                account_data: assign!(http::response::AccountData::default(), {
2302                    rooms: BTreeMap::from([
2303                        (
2304                            room.clone(),
2305                            vec![
2306                                Raw::from_json_string(
2307                                    json!({
2308                                        "content": {
2309                                            "tags": {
2310                                                "u.work": {
2311                                                    "order": 0.9
2312                                                }
2313                                            }
2314                                        },
2315                                        "type": "m.tag"
2316                                    })
2317                                    .to_string(),
2318                                ).unwrap()
2319                            ]
2320                        )
2321                    ])
2322                })
2323            })
2324        });
2325        let summary = {
2326            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2327            sliding_sync
2328                .handle_response(
2329                    server_response.clone(),
2330                    &mut pos_guard,
2331                    RequestedRequiredStates::default(),
2332                )
2333                .await?
2334        };
2335
2336        assert!(summary.rooms.contains(&room));
2337
2338        Ok(())
2339    }
2340
2341    #[async_test]
2342    #[cfg(feature = "e2e-encryption")]
2343    async fn test_process_only_encryption_events() -> Result<()> {
2344        use ruma::OneTimeKeyAlgorithm;
2345
2346        let room = owned_room_id!("!croissant:example.org");
2347
2348        let server = MockServer::start().await;
2349        let client = logged_in_client(Some(server.uri())).await;
2350
2351        let server_response = assign!(http::Response::new("0".to_owned()), {
2352            rooms: BTreeMap::from([(
2353                room.clone(),
2354                assign!(http::response::Room::default(), {
2355                    name: Some("Croissants lovers".to_owned()),
2356                    timeline: Vec::new(),
2357                }),
2358            )]),
2359
2360            extensions: assign!(http::response::Extensions::default(), {
2361                e2ee: assign!(http::response::E2EE::default(), {
2362                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2363                }),
2364                to_device: Some(assign!(http::response::ToDevice::default(), {
2365                    next_batch: "to-device-token".to_owned(),
2366                })),
2367            })
2368        });
2369
2370        // Don't process non-encryption events if the sliding sync is configured for
2371        // encryption only.
2372
2373        let sliding_sync = client
2374            .sliding_sync("test")?
2375            .with_to_device_extension(
2376                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2377            )
2378            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2379            .build()
2380            .await?;
2381
2382        {
2383            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2384
2385            sliding_sync
2386                .handle_response(
2387                    server_response.clone(),
2388                    &mut position_guard,
2389                    RequestedRequiredStates::default(),
2390                )
2391                .await?;
2392        }
2393
2394        // E2EE has been properly handled.
2395        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2396        assert_eq!(uploaded_key_count, 42);
2397
2398        {
2399            let olm_machine = &*client.olm_machine_for_testing().await;
2400            assert_eq!(
2401                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2402                Some("to-device-token")
2403            );
2404        }
2405
2406        // Room events haven't.
2407        assert!(client.get_room(&room).is_none());
2408
2409        // Conversely, only process room lists events if the sliding sync was configured
2410        // as so.
2411        let client = logged_in_client(Some(server.uri())).await;
2412
2413        let sliding_sync = client
2414            .sliding_sync("test")?
2415            .add_list(SlidingSyncList::builder("thelist"))
2416            .build()
2417            .await?;
2418
2419        {
2420            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2421
2422            sliding_sync
2423                .handle_response(
2424                    server_response.clone(),
2425                    &mut position_guard,
2426                    RequestedRequiredStates::default(),
2427                )
2428                .await?;
2429        }
2430
2431        // E2EE response has been ignored.
2432        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2433        assert_eq!(uploaded_key_count, 0);
2434
2435        {
2436            let olm_machine = &*client.olm_machine_for_testing().await;
2437            assert_eq!(
2438                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2439                None
2440            );
2441        }
2442
2443        // The room is now known.
2444        assert!(client.get_room(&room).is_some());
2445
2446        // And it's also possible to set up both.
2447        let client = logged_in_client(Some(server.uri())).await;
2448
2449        let sliding_sync = client
2450            .sliding_sync("test")?
2451            .add_list(SlidingSyncList::builder("thelist"))
2452            .with_to_device_extension(
2453                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2454            )
2455            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2456            .build()
2457            .await?;
2458
2459        {
2460            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2461
2462            sliding_sync
2463                .handle_response(
2464                    server_response.clone(),
2465                    &mut position_guard,
2466                    RequestedRequiredStates::default(),
2467                )
2468                .await?;
2469        }
2470
2471        // E2EE has been properly handled.
2472        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2473        assert_eq!(uploaded_key_count, 42);
2474
2475        {
2476            let olm_machine = &*client.olm_machine_for_testing().await;
2477            assert_eq!(
2478                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2479                Some("to-device-token")
2480            );
2481        }
2482
2483        // The room is now known.
2484        assert!(client.get_room(&room).is_some());
2485
2486        Ok(())
2487    }
2488
2489    #[async_test]
2490    async fn test_lock_multiple_requests() -> Result<()> {
2491        let server = MockServer::start().await;
2492        let client = logged_in_client(Some(server.uri())).await;
2493
2494        let pos = Arc::new(Mutex::new(0));
2495        let _mock_guard = Mock::given(SlidingSyncMatcher)
2496            .respond_with(move |_: &Request| {
2497                let mut pos = pos.lock().unwrap();
2498                *pos += 1;
2499                ResponseTemplate::new(200).set_body_json(json!({
2500                    "pos": pos.to_string(),
2501                    "lists": {},
2502                    "rooms": {}
2503                }))
2504            })
2505            .mount_as_scoped(&server)
2506            .await;
2507
2508        let sliding_sync = client
2509            .sliding_sync("test")?
2510            .with_to_device_extension(
2511                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2512            )
2513            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2514            .build()
2515            .await?;
2516
2517        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2518        // test would never terminate.
2519        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2520
2521        for result in requests.await {
2522            result?;
2523        }
2524
2525        Ok(())
2526    }
2527
2528    #[async_test]
2529    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2530        let server = MockServer::start().await;
2531        let client = logged_in_client(Some(server.uri())).await;
2532
2533        let pos = Arc::new(Mutex::new(0));
2534        let _mock_guard = Mock::given(SlidingSyncMatcher)
2535            .respond_with(move |_: &Request| {
2536                let mut pos = pos.lock().unwrap();
2537                *pos += 1;
2538                // Respond slowly enough that we can skip one iteration.
2539                ResponseTemplate::new(200)
2540                    .set_body_json(json!({
2541                        "pos": pos.to_string(),
2542                        "lists": {},
2543                        "rooms": {}
2544                    }))
2545                    .set_delay(Duration::from_secs(2))
2546            })
2547            .mount_as_scoped(&server)
2548            .await;
2549
2550        let sliding_sync =
2551            client
2552                .sliding_sync("test")?
2553                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2554                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2555                ))
2556                .add_list(
2557                    SlidingSyncList::builder("another-list")
2558                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2559                )
2560                .build()
2561                .await?;
2562
2563        let stream = sliding_sync.sync();
2564        pin_mut!(stream);
2565
2566        let cloned_sync = sliding_sync.clone();
2567        spawn(async move {
2568            tokio::time::sleep(Duration::from_millis(100)).await;
2569
2570            cloned_sync
2571                .on_list("another-list", |list| {
2572                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2573                    ready(())
2574                })
2575                .await;
2576        });
2577
2578        assert_matches!(stream.next().await, Some(Ok(_)));
2579
2580        sliding_sync.stop_sync().unwrap();
2581
2582        assert_matches!(stream.next().await, None);
2583
2584        let mut num_requests = 0;
2585
2586        for request in server.received_requests().await.unwrap() {
2587            if !SlidingSyncMatcher.matches(&request) {
2588                continue;
2589            }
2590
2591            let another_list_ranges = if num_requests == 0 {
2592                // First request
2593                json!([[0, 10]])
2594            } else {
2595                // Second request
2596                json!([[10, 20]])
2597            };
2598
2599            num_requests += 1;
2600            assert!(num_requests <= 2, "more than one request hit the server");
2601
2602            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2603
2604            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2605                &json_value,
2606                &json!({
2607                    "conn_id": "test",
2608                    "lists": {
2609                        "room-list": {
2610                            "ranges": [[0, 9]],
2611                            "required_state": [
2612                                ["m.room.encryption", ""],
2613                                ["m.room.tombstone", ""]
2614                            ],
2615                        },
2616                        "another-list": {
2617                            "ranges": another_list_ranges,
2618                            "required_state": [
2619                                ["m.room.encryption", ""],
2620                                ["m.room.tombstone", ""]
2621                            ],
2622                        },
2623                    }
2624                }),
2625                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2626            ) {
2627                dbg!(json_value);
2628                panic!("json differ: {err}");
2629            }
2630        }
2631
2632        assert_eq!(num_requests, 2);
2633
2634        Ok(())
2635    }
2636
2637    #[async_test]
2638    async fn test_timeout_zero_list() -> Result<()> {
2639        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2640
2641        let (request, _, _) =
2642            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2643
2644        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2645        // on new update to pop.
2646        assert!(request.timeout.is_some());
2647
2648        Ok(())
2649    }
2650
2651    #[async_test]
2652    async fn test_timeout_one_list() -> Result<()> {
2653        let (_server, sliding_sync) = new_sliding_sync(vec![
2654            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2655        ])
2656        .await?;
2657
2658        let (request, _, _) =
2659            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2660
2661        // The list does not require a timeout.
2662        assert!(request.timeout.is_none());
2663
2664        // Simulate a response.
2665        {
2666            let server_response = assign!(http::Response::new("0".to_owned()), {
2667                lists: BTreeMap::from([(
2668                    "foo".to_owned(),
2669                    assign!(http::response::List::default(), {
2670                        count: uint!(7),
2671                    })
2672                 )])
2673            });
2674
2675            let _summary = {
2676                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2677                sliding_sync
2678                    .handle_response(
2679                        server_response.clone(),
2680                        &mut pos_guard,
2681                        RequestedRequiredStates::default(),
2682                    )
2683                    .await?
2684            };
2685        }
2686
2687        let (request, _, _) =
2688            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2689
2690        // The list is now fully loaded, so it requires a timeout.
2691        assert!(request.timeout.is_some());
2692
2693        Ok(())
2694    }
2695
2696    #[async_test]
2697    async fn test_timeout_three_lists() -> Result<()> {
2698        let (_server, sliding_sync) = new_sliding_sync(vec![
2699            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2700            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2701            SlidingSyncList::builder("baz")
2702                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2703        ])
2704        .await?;
2705
2706        let (request, _, _) =
2707            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2708
2709        // Two lists don't require a timeout.
2710        assert!(request.timeout.is_none());
2711
2712        // Simulate a response.
2713        {
2714            let server_response = assign!(http::Response::new("0".to_owned()), {
2715                lists: BTreeMap::from([(
2716                    "foo".to_owned(),
2717                    assign!(http::response::List::default(), {
2718                        count: uint!(7),
2719                    })
2720                 )])
2721            });
2722
2723            let _summary = {
2724                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2725                sliding_sync
2726                    .handle_response(
2727                        server_response.clone(),
2728                        &mut pos_guard,
2729                        RequestedRequiredStates::default(),
2730                    )
2731                    .await?
2732            };
2733        }
2734
2735        let (request, _, _) =
2736            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2737
2738        // One don't require a timeout.
2739        assert!(request.timeout.is_none());
2740
2741        // Simulate a response.
2742        {
2743            let server_response = assign!(http::Response::new("1".to_owned()), {
2744                lists: BTreeMap::from([(
2745                    "bar".to_owned(),
2746                    assign!(http::response::List::default(), {
2747                        count: uint!(7),
2748                    })
2749                 )])
2750            });
2751
2752            let _summary = {
2753                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2754                sliding_sync
2755                    .handle_response(
2756                        server_response.clone(),
2757                        &mut pos_guard,
2758                        RequestedRequiredStates::default(),
2759                    )
2760                    .await?
2761            };
2762        }
2763
2764        let (request, _, _) =
2765            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2766
2767        // All lists require a timeout.
2768        assert!(request.timeout.is_some());
2769
2770        Ok(())
2771    }
2772
2773    #[async_test]
2774    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2775        let server = MockServer::start().await;
2776        let client = logged_in_client(Some(server.uri())).await;
2777
2778        let _mock_guard = Mock::given(SlidingSyncMatcher)
2779            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2780                "pos": "0",
2781                "lists": {},
2782                "rooms": {}
2783            })))
2784            .mount_as_scoped(&server)
2785            .await;
2786
2787        let sliding_sync = client
2788            .sliding_sync("test")?
2789            .with_to_device_extension(
2790                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2791            )
2792            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2793            .build()
2794            .await?;
2795
2796        let sliding_sync = Arc::new(sliding_sync);
2797
2798        // Create the listener and perform a sync request
2799        let sync_beat_listener = client.inner.sync_beat.listen();
2800        sliding_sync.sync_once().await?;
2801
2802        // The sync beat listener should be notified shortly after
2803        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2804        Ok(())
2805    }
2806
2807    #[async_test]
2808    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2809        let server = MockServer::start().await;
2810        let client = logged_in_client(Some(server.uri())).await;
2811
2812        let _mock_guard = Mock::given(SlidingSyncMatcher)
2813            .respond_with(ResponseTemplate::new(404))
2814            .mount_as_scoped(&server)
2815            .await;
2816
2817        let sliding_sync = client
2818            .sliding_sync("test")?
2819            .with_to_device_extension(
2820                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2821            )
2822            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2823            .build()
2824            .await?;
2825
2826        let sliding_sync = Arc::new(sliding_sync);
2827
2828        // Create the listener and perform a sync request
2829        let sync_beat_listener = client.inner.sync_beat.listen();
2830        let sync_result = sliding_sync.sync_once().await;
2831        assert!(sync_result.is_err());
2832
2833        // The sync beat listener won't be notified in this case
2834        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2835
2836        Ok(())
2837    }
2838
2839    #[async_test]
2840    async fn test_state_store_lock_is_released_before_calling_handlers() -> Result<()> {
2841        let server = MatrixMockServer::new().await;
2842        let client = server.client_builder().build().await;
2843        let room_id = room_id!("!mu5hr00m:example.org");
2844
2845        let _sync_mock_guard = Mock::given(SlidingSyncMatcher)
2846            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2847                "pos": "0",
2848                "lists": {},
2849                "extensions": {
2850                    "account_data": {
2851                        "global": [
2852                            {
2853                                "type": "m.direct",
2854                                "content": {
2855                                    "@de4dlockh0lmes:example.org": [
2856                                        "!mu5hr00m:example.org"
2857                                    ]
2858                                }
2859                            }
2860                        ]
2861                    }
2862                },
2863                "rooms": {
2864                    room_id: {
2865                        "name": "Mario Bros Fanbase Room",
2866                        "initial": true,
2867                    },
2868                }
2869            })))
2870            .mount_as_scoped(server.server())
2871            .await;
2872
2873        let f = EventFactory::new().room(room_id);
2874
2875        Mock::given(method("GET"))
2876            .and(wiremock::matchers::path_regex(r"/_matrix/client/v3/rooms/.*/members"))
2877            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2878                "chunk": [
2879                    f.member(&ALICE).membership(MembershipState::Join).into_raw_timeline(),
2880                ]
2881            })))
2882            .mount(server.server())
2883            .await;
2884
2885        let (tx, rx) = tokio::sync::oneshot::channel();
2886
2887        let tx = Arc::new(Mutex::new(Some(tx)));
2888        client.add_event_handler(move |_: DirectEvent, client: Client| async move {
2889            // Try to run a /members query while in a event handler.
2890            let members =
2891                client.get_room(room_id).unwrap().members(RoomMemberships::JOIN).await.unwrap();
2892            assert_eq!(members.len(), 1);
2893            tx.lock().unwrap().take().expect("sender consumed multiple times").send(()).unwrap();
2894        });
2895
2896        let sliding_sync = client
2897            .sliding_sync("test")?
2898            .add_list(SlidingSyncList::builder("thelist"))
2899            .with_account_data_extension(
2900                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2901            )
2902            .build()
2903            .await?;
2904
2905        tokio::time::timeout(Duration::from_secs(5), sliding_sync.sync_once())
2906            .await
2907            .expect("Sync did not complete in time")
2908            .expect("Sync failed");
2909
2910        // Wait for the event handler to complete.
2911        tokio::time::timeout(Duration::from_secs(5), rx)
2912            .await
2913            .expect("Event handler did not complete in time")
2914            .expect("Event handler failed");
2915
2916        Ok(())
2917    }
2918}