matrix_sdk/sliding_sync/
mod.rs

1// Copyright 2022-2023 Benjamin Kampmann
2// Copyright 2022 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for that specific language governing permissions and
14// limitations under the License.
15
16#![doc = include_str!("README.md")]
17
18mod builder;
19mod cache;
20mod client;
21mod error;
22mod list;
23mod room;
24mod sticky_parameters;
25mod utils;
26
27use std::{
28    collections::{btree_map::Entry, BTreeMap},
29    fmt::Debug,
30    future::Future,
31    sync::{Arc, RwLock as StdRwLock},
32    time::Duration,
33};
34
35use async_stream::stream;
36pub use client::{Version, VersionBuilder};
37use futures_core::stream::Stream;
38use matrix_sdk_base::RequestedRequiredStates;
39use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
40use ruma::{
41    api::client::{error::ErrorKind, sync::sync_events::v5 as http},
42    assign, OwnedRoomId, RoomId,
43};
44use serde::{Deserialize, Serialize};
45use tokio::{
46    select,
47    sync::{broadcast::Sender, Mutex as AsyncMutex, OwnedMutexGuard, RwLock as AsyncRwLock},
48};
49use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
50
51#[cfg(feature = "e2e-encryption")]
52use self::utils::JoinHandleExt as _;
53pub use self::{builder::*, client::VersionBuilderError, error::*, list::*, room::*};
54use self::{
55    cache::restore_sliding_sync_state,
56    client::SlidingSyncResponseProcessor,
57    sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
58};
59use crate::{config::RequestConfig, Client, Result};
60
61/// The Sliding Sync instance.
62///
63/// It is OK to clone this type as much as you need: cloning it is cheap.
64#[derive(Clone, Debug)]
65pub struct SlidingSync {
66    /// The Sliding Sync data.
67    inner: Arc<SlidingSyncInner>,
68}
69
70#[derive(Debug)]
71pub(super) struct SlidingSyncInner {
72    /// A unique identifier for this instance of sliding sync.
73    ///
74    /// Used to distinguish different connections to sliding sync.
75    id: String,
76
77    /// The HTTP Matrix client.
78    client: Client,
79
80    /// Long-polling timeout that appears in sliding sync request.
81    poll_timeout: Duration,
82
83    /// Extra duration for the sliding sync request to timeout. This is added to
84    /// the [`Self::proxy_timeout`].
85    network_timeout: Duration,
86
87    /// The storage key to keep this cache at and load it from.
88    storage_key: String,
89
90    /// Should this sliding sync instance try to restore its sync position
91    /// from the database?
92    ///
93    /// Note: in non-cfg(e2e-encryption) builds, it's always set to false. We
94    /// keep it even so, to avoid sparkling cfg statements everywhere
95    /// throughout this file.
96    share_pos: bool,
97
98    /// Position markers.
99    ///
100    /// The `pos` marker represents a progression when exchanging requests and
101    /// responses with the server: the server acknowledges the request by
102    /// responding with a new `pos`. If the client sends two non-necessarily
103    /// consecutive requests with the same `pos`, the server has to reply with
104    /// the same identical response.
105    ///
106    /// `position` is behind a mutex so that a new request starts after the
107    /// previous request trip has fully ended (successfully or not). This
108    /// mechanism exists to wait for the response to be handled and to see the
109    /// `position` being updated, before sending a new request.
110    position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
111
112    /// The lists of this Sliding Sync instance.
113    lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
114
115    /// All the rooms synced with Sliding Sync.
116    rooms: AsyncRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
117
118    /// Request parameters that are sticky.
119    sticky: StdRwLock<SlidingSyncStickyManager<SlidingSyncStickyParameters>>,
120
121    /// Internal channel used to pass messages between Sliding Sync and other
122    /// types.
123    internal_channel: Sender<SlidingSyncInternalMessage>,
124}
125
126impl SlidingSync {
127    pub(super) fn new(inner: SlidingSyncInner) -> Self {
128        Self { inner: Arc::new(inner) }
129    }
130
131    async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
132        cache::store_sliding_sync_state(self, position).await
133    }
134
135    /// Create a new [`SlidingSyncBuilder`].
136    pub fn builder(id: String, client: Client) -> Result<SlidingSyncBuilder, Error> {
137        SlidingSyncBuilder::new(id, client)
138    }
139
140    /// Subscribe to many rooms.
141    ///
142    /// If the associated `Room`s exist, it will be marked as
143    /// members are missing, so that it ensures to re-fetch all members.
144    ///
145    /// A subscription to an already subscribed room is ignored.
146    pub fn subscribe_to_rooms(
147        &self,
148        room_ids: &[&RoomId],
149        settings: Option<http::request::RoomSubscription>,
150        cancel_in_flight_request: bool,
151    ) {
152        let settings = settings.unwrap_or_default();
153        let mut sticky = self.inner.sticky.write().unwrap();
154        let room_subscriptions = &mut sticky.data_mut().room_subscriptions;
155
156        let mut skip_over_current_sync_loop_iteration = false;
157
158        for room_id in room_ids {
159            // If the room subscription already exists, let's not
160            // override it with a new one. First, it would reset its
161            // state (`RoomSubscriptionState`), and second it would try to
162            // re-subscribe with the next request. We don't want that. A room
163            // subscription should happen once, and next subscriptions should
164            // be ignored.
165            if let Entry::Vacant(entry) = room_subscriptions.entry((*room_id).to_owned()) {
166                if let Some(room) = self.inner.client.get_room(room_id) {
167                    room.mark_members_missing();
168                }
169
170                entry.insert((RoomSubscriptionState::default(), settings.clone()));
171
172                skip_over_current_sync_loop_iteration = true;
173            }
174        }
175
176        if cancel_in_flight_request && skip_over_current_sync_loop_iteration {
177            self.inner.internal_channel_send_if_possible(
178                SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
179            );
180        }
181    }
182
183    /// Lookup a specific room
184    pub async fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
185        self.inner.rooms.read().await.get(room_id).cloned()
186    }
187
188    /// Check the number of rooms.
189    pub fn get_number_of_rooms(&self) -> usize {
190        self.inner.rooms.blocking_read().len()
191    }
192
193    /// Find a list by its name, and do something on it if it exists.
194    pub async fn on_list<Function, FunctionOutput, R>(
195        &self,
196        list_name: &str,
197        function: Function,
198    ) -> Option<R>
199    where
200        Function: FnOnce(&SlidingSyncList) -> FunctionOutput,
201        FunctionOutput: Future<Output = R>,
202    {
203        let lists = self.inner.lists.read().await;
204
205        match lists.get(list_name) {
206            Some(list) => Some(function(list).await),
207            None => None,
208        }
209    }
210
211    /// Add the list to the list of lists.
212    ///
213    /// As lists need to have a unique `.name`, if a list with the same name
214    /// is found the new list will replace the old one and the return it or
215    /// `None`.
216    pub async fn add_list(
217        &self,
218        list_builder: SlidingSyncListBuilder,
219    ) -> Result<Option<SlidingSyncList>> {
220        let list = list_builder.build(self.inner.internal_channel.clone());
221
222        let old_list = self.inner.lists.write().await.insert(list.name().to_owned(), list);
223
224        self.inner.internal_channel_send_if_possible(
225            SlidingSyncInternalMessage::SyncLoopSkipOverCurrentIteration,
226        );
227
228        Ok(old_list)
229    }
230
231    /// Add a list that will be cached and reloaded from the cache.
232    ///
233    /// This will raise an error if a storage key was not set, or if there
234    /// was a I/O error reading from the cache.
235    ///
236    /// The rest of the semantics is the same as [`Self::add_list`].
237    pub async fn add_cached_list(
238        &self,
239        mut list_builder: SlidingSyncListBuilder,
240    ) -> Result<Option<SlidingSyncList>> {
241        let _timer = timer!(format!("restoring (loading+processing) list {}", list_builder.name));
242
243        list_builder.set_cached_and_reload(&self.inner.client, &self.inner.storage_key).await?;
244
245        self.add_list(list_builder).await
246    }
247
248    /// Lookup a set of rooms
249    pub async fn get_rooms<I: Iterator<Item = OwnedRoomId>>(
250        &self,
251        room_ids: I,
252    ) -> Vec<Option<SlidingSyncRoom>> {
253        let rooms = self.inner.rooms.read().await;
254
255        room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
256    }
257
258    /// Get all rooms.
259    pub async fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
260        self.inner.rooms.read().await.values().cloned().collect()
261    }
262
263    /// Handle the HTTP response.
264    #[instrument(skip_all)]
265    async fn handle_response(
266        &self,
267        sliding_sync_response: http::Response,
268        position: &mut SlidingSyncPositionMarkers,
269        requested_required_states: RequestedRequiredStates,
270    ) -> Result<UpdateSummary, crate::Error> {
271        let pos = Some(sliding_sync_response.pos.clone());
272
273        let must_process_rooms_response = self.must_process_rooms_response().await;
274
275        trace!(yes = must_process_rooms_response, "Must process rooms response?");
276
277        // Transform a Sliding Sync Response to a `SyncResponse`.
278        //
279        // We may not need the `sync_response` in the future (once `SyncResponse` will
280        // move to Sliding Sync, i.e. to `http::Response`), but processing the
281        // `sliding_sync_response` is vital, so it must be done somewhere; for now it
282        // happens here.
283
284        let mut sync_response = {
285            // Take the lock to avoid concurrent sliding syncs overwriting each other's room
286            // infos.
287            let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
288
289            let rooms = &*self.inner.rooms.read().await;
290            let mut response_processor =
291                SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
292
293            #[cfg(feature = "e2e-encryption")]
294            if self.is_e2ee_enabled() {
295                response_processor.handle_encryption(&sliding_sync_response.extensions).await?
296            }
297
298            // Only handle the room's subsection of the response, if this sliding sync was
299            // configured to do so.
300            if must_process_rooms_response {
301                response_processor
302                    .handle_room_response(&sliding_sync_response, &requested_required_states)
303                    .await?;
304            }
305
306            response_processor.process_and_take_response().await?
307        };
308
309        debug!(?sync_response, "Sliding Sync response has been handled by the client");
310
311        // Commit sticky parameters, if needed.
312        if let Some(ref txn_id) = sliding_sync_response.txn_id {
313            let txn_id = txn_id.as_str().into();
314            self.inner.sticky.write().unwrap().maybe_commit(txn_id);
315            let mut lists = self.inner.lists.write().await;
316            lists.values_mut().for_each(|list| list.maybe_commit_sticky(txn_id));
317        }
318
319        let update_summary = {
320            // Update the rooms.
321            let updated_rooms = {
322                let mut rooms_map = self.inner.rooms.write().await;
323
324                let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
325
326                for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
327                    // `sync_response` contains the rooms with decrypted events if any, so look at
328                    // the timeline events here first if the room exists.
329                    // Otherwise, let's look at the timeline inside the `sliding_sync_response`.
330                    let timeline =
331                        if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
332                            joined_room.timeline.events
333                        } else {
334                            room_data.timeline.drain(..).map(TimelineEvent::new).collect()
335                        };
336
337                    match rooms_map.get_mut(&room_id) {
338                        // The room existed before, let's update it.
339                        Some(room) => {
340                            room.update(room_data, timeline);
341                        }
342
343                        // First time we need this room, let's create it.
344                        None => {
345                            rooms_map.insert(
346                                room_id.clone(),
347                                SlidingSyncRoom::new(
348                                    room_id.clone(),
349                                    room_data.prev_batch,
350                                    timeline,
351                                ),
352                            );
353                        }
354                    }
355
356                    updated_rooms.push(room_id);
357                }
358
359                // There might be other rooms that were only mentioned in the sliding sync
360                // extensions part of the response, and thus would result in rooms present in
361                // the `sync_response.join`. Mark them as updated too.
362                //
363                // Since we've removed rooms that were in the room subsection from
364                // `sync_response.rooms.join`, the remaining ones aren't already present in
365                // `updated_rooms` and wouldn't cause any duplicates.
366                updated_rooms.extend(sync_response.rooms.join.keys().cloned());
367
368                updated_rooms
369            };
370
371            // Update the lists.
372            let updated_lists = {
373                debug!(
374                    lists = ?sliding_sync_response.lists,
375                    "Update lists"
376                );
377
378                let mut updated_lists = Vec::with_capacity(sliding_sync_response.lists.len());
379                let mut lists = self.inner.lists.write().await;
380
381                // Iterate on known lists, not on lists in the response. Rooms may have been
382                // updated that were not involved in any list update.
383                for (name, list) in lists.iter_mut() {
384                    if let Some(updates) = sliding_sync_response.lists.get(name) {
385                        let maximum_number_of_rooms: u32 =
386                            updates.count.try_into().expect("failed to convert `count` to `u32`");
387
388                        if list.update(Some(maximum_number_of_rooms))? {
389                            updated_lists.push(name.clone());
390                        }
391                    } else if list.update(None)? {
392                        updated_lists.push(name.clone());
393                    }
394                }
395
396                // Report about unknown lists.
397                for name in sliding_sync_response.lists.keys() {
398                    if !lists.contains_key(name) {
399                        error!("Response for list `{name}` - unknown to us; skipping");
400                    }
401                }
402
403                updated_lists
404            };
405
406            UpdateSummary { lists: updated_lists, rooms: updated_rooms }
407        };
408
409        // Everything went well, we can update the position markers.
410        //
411        // Save the new position markers.
412        position.pos = pos;
413
414        Ok(update_summary)
415    }
416
417    async fn generate_sync_request(
418        &self,
419        txn_id: &mut LazyTransactionId,
420    ) -> Result<(http::Request, RequestConfig, OwnedMutexGuard<SlidingSyncPositionMarkers>)> {
421        // Collect requests for lists.
422        let mut requests_lists = BTreeMap::new();
423
424        let require_timeout = {
425            let lists = self.inner.lists.read().await;
426
427            // Start at `true` in case there is zero list.
428            let mut require_timeout = true;
429
430            for (name, list) in lists.iter() {
431                requests_lists.insert(name.clone(), list.next_request(txn_id)?);
432                require_timeout = require_timeout && list.requires_timeout();
433            }
434
435            require_timeout
436        };
437
438        // Collect the `pos`.
439        //
440        // Wait on the `position` mutex to be available. It means no request nor
441        // response is running. The `position` mutex is released whether the response
442        // has been fully handled successfully, in this case the `pos` is updated, or
443        // the response handling has failed, in this case the `pos` hasn't been updated
444        // and the same `pos` will be used for this new request.
445        let mut position_guard = self.inner.position.clone().lock_owned().await;
446
447        let to_device_enabled =
448            self.inner.sticky.read().unwrap().data().extensions.to_device.enabled == Some(true);
449
450        let restored_fields = if self.inner.share_pos || to_device_enabled {
451            let lists = self.inner.lists.read().await;
452            restore_sliding_sync_state(&self.inner.client, &self.inner.storage_key, &lists).await?
453        } else {
454            None
455        };
456
457        // Update pos: either the one restored from the database, if any and the sliding
458        // sync was configured so, or read it from the memory cache.
459        let pos = if self.inner.share_pos {
460            if let Some(fields) = &restored_fields {
461                // Override the memory one with the database one, for consistency.
462                if fields.pos != position_guard.pos {
463                    info!(
464                        "Pos from previous request ('{:?}') was different from \
465                         pos in database ('{:?}').",
466                        position_guard.pos, fields.pos
467                    );
468                    position_guard.pos = fields.pos.clone();
469                }
470                fields.pos.clone()
471            } else {
472                position_guard.pos.clone()
473            }
474        } else {
475            position_guard.pos.clone()
476        };
477
478        Span::current().record("pos", &pos);
479
480        // When the client sends a request with no `pos`, MSC4186 returns no device
481        // lists updates, as it only returns changes since the provided `pos`
482        // (which is `null` in this case); this is in line with sync v2.
483        //
484        // Therefore, with MSC4186, the device list cache must be marked as to be
485        // re-downloaded if the `since` token is `None`, otherwise it's easy to miss
486        // device lists updates that happened between the previous request and the new
487        // “initial” request.
488        #[cfg(feature = "e2e-encryption")]
489        if pos.is_none() && self.is_e2ee_enabled() {
490            info!("Marking all tracked users as dirty");
491
492            let olm_machine = self.inner.client.olm_machine().await;
493            let olm_machine = olm_machine.as_ref().ok_or(Error::NoOlmMachine)?;
494            olm_machine.mark_all_tracked_users_as_dirty().await?;
495        }
496
497        // Configure the timeout.
498        //
499        // The `timeout` query is necessary when all lists require it. Please see
500        // [`SlidingSyncList::requires_timeout`].
501        let timeout = require_timeout.then(|| self.inner.poll_timeout);
502
503        let mut request = assign!(http::Request::new(), {
504            conn_id: Some(self.inner.id.clone()),
505            pos,
506            timeout,
507            lists: requests_lists,
508        });
509
510        // Apply sticky parameters, if needs be.
511        self.inner.sticky.write().unwrap().maybe_apply(&mut request, txn_id);
512
513        // Extensions are now applied (via sticky parameters).
514        //
515        // Override the to-device token if the extension is enabled.
516        if to_device_enabled {
517            request.extensions.to_device.since =
518                restored_fields.and_then(|fields| fields.to_device_token);
519        }
520
521        // Apply the transaction id if one was generated.
522        if let Some(txn_id) = txn_id.get() {
523            request.txn_id = Some(txn_id.to_string());
524        }
525
526        Ok((
527            // The request itself.
528            request,
529            // Configure long-polling. We need some time for the long-poll itself,
530            // and extra time for the network delays.
531            RequestConfig::default()
532                .timeout(self.inner.poll_timeout + self.inner.network_timeout)
533                .retry_limit(3),
534            position_guard,
535        ))
536    }
537
538    /// Send a sliding sync request.
539    ///
540    /// This method contains the sending logic.
541    async fn send_sync_request(
542        &self,
543        request: http::Request,
544        request_config: RequestConfig,
545        mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
546    ) -> Result<UpdateSummary> {
547        debug!("Sending request");
548
549        // Prepare the request.
550        let requested_required_states = RequestedRequiredStates::from(&request);
551        let request = self.inner.client.send(request).with_request_config(request_config);
552
553        // Send the request and get a response with end-to-end encryption support.
554        //
555        // Sending the `/sync` request out when end-to-end encryption is enabled means
556        // that we need to also send out any outgoing e2ee related request out
557        // coming from the `OlmMachine::outgoing_requests()` method.
558
559        #[cfg(feature = "e2e-encryption")]
560        let response = {
561            if self.is_e2ee_enabled() {
562                // Here, we need to run 2 things:
563                //
564                // 1. Send the sliding sync request and get a response,
565                // 2. Send the E2EE requests.
566                //
567                // We don't want to use a `join` or `try_join` because we want to fail if and
568                // only if sending the sliding sync request fails. Failing to send the E2EE
569                // requests should just result in a log.
570                //
571                // We also want to give the priority to sliding sync request. E2EE requests are
572                // sent concurrently to the sliding sync request, but the priority is on waiting
573                // a sliding sync response.
574                //
575                // If sending sliding sync request fails, the sending of E2EE requests must be
576                // aborted as soon as possible.
577
578                let client = self.inner.client.clone();
579                let e2ee_uploads = spawn(async move {
580                    if let Err(error) = client.send_outgoing_requests().await {
581                        error!(?error, "Error while sending outgoing E2EE requests");
582                    }
583                })
584                // Ensure that the task is not running in detached mode. It is aborted when it's
585                // dropped.
586                .abort_on_drop();
587
588                // Wait on the sliding sync request success or failure early.
589                let response = request.await?;
590
591                // At this point, if `request` has been resolved successfully, we wait on
592                // `e2ee_uploads`. It did run concurrently, so it should not be blocking for too
593                // long. Otherwise —if `request` has failed— `e2ee_uploads` has
594                // been dropped, so aborted.
595                e2ee_uploads.await.map_err(|error| Error::JoinError {
596                    task_description: "e2ee_uploads".to_owned(),
597                    error,
598                })?;
599
600                response
601            } else {
602                request.await?
603            }
604        };
605
606        // Send the request and get a response _without_ end-to-end encryption support.
607        #[cfg(not(feature = "e2e-encryption"))]
608        let response = request.await?;
609
610        debug!("Received response");
611
612        // At this point, the request has been sent, and a response has been received.
613        //
614        // We must ensure the handling of the response cannot be stopped/
615        // cancelled. It must be done entirely, otherwise we can have
616        // corrupted/incomplete states for Sliding Sync and other parts of
617        // the code.
618        //
619        // That's why we are running the handling of the response in a spawned
620        // future that cannot be cancelled by anything.
621        let this = self.clone();
622
623        // Spawn a new future to ensure that the code inside this future cannot be
624        // cancelled if this method is cancelled.
625        let future = async move {
626            debug!("Start handling response");
627
628            // In case the task running this future is detached, we must
629            // ensure responses are handled one at a time. At this point we still own
630            // `position_guard`, so we're fine.
631
632            // Handle the response.
633            let updates = this
634                .handle_response(response, &mut position_guard, requested_required_states)
635                .await?;
636
637            this.cache_to_storage(&position_guard).await?;
638
639            // Release the position guard lock.
640            // It means that other responses can be generated and then handled later.
641            drop(position_guard);
642
643            debug!("Done handling response");
644
645            Ok(updates)
646        };
647
648        spawn(future.instrument(Span::current())).await.unwrap()
649    }
650
651    /// Is the e2ee extension enabled for this sliding sync instance?
652    #[cfg(feature = "e2e-encryption")]
653    fn is_e2ee_enabled(&self) -> bool {
654        self.inner.sticky.read().unwrap().data().extensions.e2ee.enabled == Some(true)
655    }
656
657    #[cfg(not(feature = "e2e-encryption"))]
658    fn is_e2ee_enabled(&self) -> bool {
659        false
660    }
661
662    /// Should we process the room's subpart of a response?
663    async fn must_process_rooms_response(&self) -> bool {
664        // We consider that we must, if there's any room subscription or there's any
665        // list.
666        !self.inner.sticky.read().unwrap().data().room_subscriptions.is_empty()
667            || !self.inner.lists.read().await.is_empty()
668    }
669
670    #[instrument(skip_all, fields(pos, conn_id = self.inner.id))]
671    async fn sync_once(&self) -> Result<UpdateSummary> {
672        let (request, request_config, position_guard) =
673            self.generate_sync_request(&mut LazyTransactionId::new()).await?;
674
675        // Send the request, kaboom.
676        let summaries = self.send_sync_request(request, request_config, position_guard).await?;
677
678        // Notify a new sync was received
679        self.inner.client.inner.sync_beat.notify(usize::MAX);
680
681        Ok(summaries)
682    }
683
684    /// Create a _new_ Sliding Sync sync loop.
685    ///
686    /// This method returns a `Stream`, which will send requests and will handle
687    /// responses automatically. Lists and rooms are updated automatically.
688    ///
689    /// This function returns `Ok(…)` if everything went well, otherwise it will
690    /// return `Err(…)`. An `Err` will _always_ lead to the `Stream`
691    /// termination.
692    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
693    #[instrument(name = "sync_stream", skip_all, fields(conn_id = self.inner.id, with_e2ee = self.is_e2ee_enabled()))]
694    pub fn sync(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
695        debug!("Starting sync stream");
696
697        let mut internal_channel_receiver = self.inner.internal_channel.subscribe();
698
699        stream! {
700            loop {
701                debug!("Sync stream is running");
702
703                select! {
704                    biased;
705
706                    internal_message = internal_channel_receiver.recv() => {
707                        use SlidingSyncInternalMessage::*;
708
709                        debug!(?internal_message, "Sync stream has received an internal message");
710
711                        match internal_message {
712                            Err(_) | Ok(SyncLoopStop) => {
713                                break;
714                            }
715
716                            Ok(SyncLoopSkipOverCurrentIteration) => {
717                                continue;
718                            }
719                        }
720                    }
721
722                    update_summary = self.sync_once() => {
723                        match update_summary {
724                            Ok(updates) => {
725                                yield Ok(updates);
726                            }
727
728                            // Here, errors we **cannot** ignore, and that must stop the sync loop.
729                            Err(error) => {
730                                if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
731                                    // The Sliding Sync session has expired. Let's reset `pos` and sticky parameters.
732                                    self.expire_session().await;
733                                }
734
735                                yield Err(error);
736
737                                // Terminates the loop, and terminates the stream.
738                                break;
739                            }
740                        }
741                    }
742                }
743            }
744
745            debug!("Sync stream has exited.");
746        }
747    }
748
749    /// Force to stop the sync loop ([`Self::sync`]) if it's running.
750    ///
751    /// Usually, dropping the `Stream` returned by [`Self::sync`] should be
752    /// enough to “stop” it, but depending of how this `Stream` is used, it
753    /// might not be obvious to drop it immediately (thinking of using this API
754    /// over FFI; the foreign-language might not be able to drop a value
755    /// immediately). Thus, calling this method will ensure that the sync loop
756    /// stops gracefully and as soon as it returns.
757    pub fn stop_sync(&self) -> Result<()> {
758        Ok(self.inner.internal_channel_send(SlidingSyncInternalMessage::SyncLoopStop)?)
759    }
760
761    /// Expire the current Sliding Sync session on the client-side.
762    ///
763    /// Expiring a Sliding Sync session means: resetting `pos`. It also resets
764    /// sticky parameters.
765    ///
766    /// This should only be used when it's clear that this session was about to
767    /// expire anyways, and should be used only in very specific cases (e.g.
768    /// multiple sliding syncs being run in parallel, and one of them has
769    /// expired).
770    ///
771    /// This method **MUST** be called when the sync loop is stopped.
772    #[doc(hidden)]
773    pub async fn expire_session(&self) {
774        info!("Session expired; resetting `pos` and sticky parameters");
775
776        {
777            let mut position = self.inner.position.lock().await;
778            position.pos = None;
779
780            if let Err(err) = self.cache_to_storage(&position).await {
781                error!(
782                    "couldn't invalidate sliding sync frozen state when expiring session: {err}"
783                );
784            }
785        }
786
787        {
788            let mut sticky = self.inner.sticky.write().unwrap();
789
790            // Clear all room subscriptions: we don't want to resend all room subscriptions
791            // when the session will restart.
792            sticky.data_mut().room_subscriptions.clear();
793        }
794
795        self.inner.lists.read().await.values().for_each(|list| list.invalidate_sticky_data());
796    }
797}
798
799impl SlidingSyncInner {
800    /// Send a message over the internal channel.
801    #[instrument]
802    fn internal_channel_send(&self, message: SlidingSyncInternalMessage) -> Result<(), Error> {
803        self.internal_channel.send(message).map(|_| ()).map_err(|_| Error::InternalChannelIsBroken)
804    }
805
806    /// Send a message over the internal channel if there is a receiver, i.e. if
807    /// the sync loop is running.
808    #[instrument]
809    fn internal_channel_send_if_possible(&self, message: SlidingSyncInternalMessage) {
810        // If there is no receiver, the send will fail, but that's OK here.
811        let _ = self.internal_channel.send(message);
812    }
813}
814
815#[derive(Copy, Clone, Debug, PartialEq)]
816enum SlidingSyncInternalMessage {
817    /// Instruct the sync loop to stop.
818    SyncLoopStop,
819
820    /// Instruct the sync loop to skip over any remaining work in its iteration,
821    /// and to jump to the next iteration.
822    SyncLoopSkipOverCurrentIteration,
823}
824
825#[cfg(any(test, feature = "testing"))]
826impl SlidingSync {
827    /// Set a new value for `pos`.
828    pub async fn set_pos(&self, new_pos: String) {
829        let mut position_lock = self.inner.position.lock().await;
830        position_lock.pos = Some(new_pos);
831    }
832
833    /// Read the static extension configuration for this Sliding Sync.
834    ///
835    /// Note: this is not the next content of the sticky parameters, but rightly
836    /// the static configuration that was set during creation of this
837    /// Sliding Sync.
838    pub fn extensions_config(&self) -> http::request::Extensions {
839        let sticky = self.inner.sticky.read().unwrap();
840        sticky.data().extensions.clone()
841    }
842}
843
844#[derive(Clone, Debug)]
845pub(super) struct SlidingSyncPositionMarkers {
846    /// An ephemeral position in the current stream, as received from the
847    /// previous `/sync` response, or `None` for the first request.
848    ///
849    /// Should not be persisted.
850    pos: Option<String>,
851}
852
853/// Frozen bits of a Sliding Sync that are stored in the *state* store.
854#[derive(Debug, Serialize, Deserialize)]
855struct FrozenSlidingSync {
856    /// Deprecated: prefer storing in the crypto store.
857    #[serde(skip_serializing_if = "Option::is_none")]
858    to_device_since: Option<String>,
859    #[serde(default, skip_serializing_if = "Vec::is_empty")]
860    rooms: Vec<FrozenSlidingSyncRoom>,
861}
862
863impl FrozenSlidingSync {
864    fn new(rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
865        // The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
866        Self {
867            to_device_since: None,
868            rooms: rooms
869                .iter()
870                .map(|(_room_id, sliding_sync_room)| FrozenSlidingSyncRoom::from(sliding_sync_room))
871                .collect::<Vec<_>>(),
872        }
873    }
874}
875
876#[derive(Serialize, Deserialize)]
877struct FrozenSlidingSyncPos {
878    #[serde(skip_serializing_if = "Option::is_none")]
879    pos: Option<String>,
880}
881
882/// A summary of the updates received after a sync (like in
883/// [`SlidingSync::sync`]).
884#[derive(Debug, Clone)]
885pub struct UpdateSummary {
886    /// The names of the lists that have seen an update.
887    pub lists: Vec<String>,
888    /// The rooms that have seen updates
889    pub rooms: Vec<OwnedRoomId>,
890}
891
892/// A very basic bool-ish enum to represent the state of a
893/// [`http::request::RoomSubscription`]. A `RoomSubscription` that has been sent
894/// once should ideally not being sent again, to mostly save bandwidth.
895#[derive(Debug, Default)]
896enum RoomSubscriptionState {
897    /// The `RoomSubscription` has not been sent or received correctly from the
898    /// server, i.e. the `RoomSubscription` —which is part of the sticky
899    /// parameters— has not been committed.
900    #[default]
901    Pending,
902
903    /// The `RoomSubscription` has been sent and received correctly by the
904    /// server.
905    Applied,
906}
907
908/// The set of sticky parameters owned by the `SlidingSyncInner` instance, and
909/// sent in the request.
910#[derive(Debug)]
911pub(super) struct SlidingSyncStickyParameters {
912    /// Room subscriptions, i.e. rooms that may be out-of-scope of all lists
913    /// but one wants to receive updates.
914    room_subscriptions:
915        BTreeMap<OwnedRoomId, (RoomSubscriptionState, http::request::RoomSubscription)>,
916
917    /// The intended state of the extensions being supplied to sliding /sync
918    /// calls.
919    extensions: http::request::Extensions,
920}
921
922impl SlidingSyncStickyParameters {
923    /// Create a new set of sticky parameters.
924    pub fn new(
925        room_subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
926        extensions: http::request::Extensions,
927    ) -> Self {
928        Self {
929            room_subscriptions: room_subscriptions
930                .into_iter()
931                .map(|(room_id, room_subscription)| {
932                    (room_id, (RoomSubscriptionState::Pending, room_subscription))
933                })
934                .collect(),
935            extensions,
936        }
937    }
938}
939
940impl StickyData for SlidingSyncStickyParameters {
941    type Request = http::Request;
942
943    fn apply(&self, request: &mut Self::Request) {
944        request.room_subscriptions = self
945            .room_subscriptions
946            .iter()
947            .filter(|(_, (state, _))| matches!(state, RoomSubscriptionState::Pending))
948            .map(|(room_id, (_, room_subscription))| (room_id.clone(), room_subscription.clone()))
949            .collect();
950        request.extensions = self.extensions.clone();
951    }
952
953    fn on_commit(&mut self) {
954        // All room subscriptions are marked as `Applied`.
955        for (state, _room_subscription) in self.room_subscriptions.values_mut() {
956            if matches!(state, RoomSubscriptionState::Pending) {
957                *state = RoomSubscriptionState::Applied;
958            }
959        }
960    }
961}
962
963#[cfg(all(test, not(target_family = "wasm")))]
964#[allow(clippy::dbg_macro)]
965mod tests {
966    use std::{
967        collections::BTreeMap,
968        future::ready,
969        ops::Not,
970        sync::{Arc, Mutex},
971        time::Duration,
972    };
973
974    use assert_matches::assert_matches;
975    use event_listener::Listener;
976    use futures_util::{future::join_all, pin_mut, StreamExt};
977    use matrix_sdk_base::RequestedRequiredStates;
978    use matrix_sdk_test::async_test;
979    use ruma::{
980        api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
981        OwnedRoomId, TransactionId,
982    };
983    use serde::Deserialize;
984    use serde_json::json;
985    use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
986
987    use super::{
988        http,
989        sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
990        FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
991        SlidingSyncStickyParameters,
992    };
993    use crate::{
994        sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
995    };
996
997    #[derive(Copy, Clone)]
998    struct SlidingSyncMatcher;
999
1000    impl Match for SlidingSyncMatcher {
1001        fn matches(&self, request: &Request) -> bool {
1002            request.url.path() == "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
1003                && request.method == Method::POST
1004        }
1005    }
1006
1007    async fn new_sliding_sync(
1008        lists: Vec<SlidingSyncListBuilder>,
1009    ) -> Result<(MockServer, SlidingSync)> {
1010        let server = MockServer::start().await;
1011        let client = logged_in_client(Some(server.uri())).await;
1012
1013        let mut sliding_sync_builder = client.sliding_sync("test-slidingsync")?;
1014
1015        for list in lists {
1016            sliding_sync_builder = sliding_sync_builder.add_list(list);
1017        }
1018
1019        let sliding_sync = sliding_sync_builder.build().await?;
1020
1021        Ok((server, sliding_sync))
1022    }
1023
1024    #[async_test]
1025    async fn test_subscribe_to_rooms() -> Result<()> {
1026        let (server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1027            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1028        .await?;
1029
1030        let stream = sliding_sync.sync();
1031        pin_mut!(stream);
1032
1033        let room_id_0 = room_id!("!r0:bar.org");
1034        let room_id_1 = room_id!("!r1:bar.org");
1035        let room_id_2 = room_id!("!r2:bar.org");
1036
1037        {
1038            let _mock_guard = Mock::given(SlidingSyncMatcher)
1039                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1040                    "pos": "1",
1041                    "lists": {},
1042                    "rooms": {
1043                        room_id_0: {
1044                            "name": "Room #0",
1045                            "initial": true,
1046                        },
1047                        room_id_1: {
1048                            "name": "Room #1",
1049                            "initial": true,
1050                        },
1051                        room_id_2: {
1052                            "name": "Room #2",
1053                            "initial": true,
1054                        },
1055                    }
1056                })))
1057                .mount_as_scoped(&server)
1058                .await;
1059
1060            let _ = stream.next().await.unwrap()?;
1061        }
1062
1063        let room0 = sliding_sync.inner.client.get_room(room_id_0).unwrap();
1064
1065        // Members aren't synced.
1066        // We need to make them synced, so that we can test that subscribing to a room
1067        // make members not synced. That's a desired feature.
1068        assert!(room0.are_members_synced().not());
1069
1070        {
1071            struct MemberMatcher(OwnedRoomId);
1072
1073            impl Match for MemberMatcher {
1074                fn matches(&self, request: &Request) -> bool {
1075                    request.url.path()
1076                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1077                        && request.method == Method::GET
1078                }
1079            }
1080
1081            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1082                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1083                    "chunk": [],
1084                })))
1085                .mount_as_scoped(&server)
1086                .await;
1087
1088            assert_matches!(room0.request_members().await, Ok(()));
1089        }
1090
1091        // Members are now synced! We can start subscribing and see how it goes.
1092        assert!(room0.are_members_synced());
1093
1094        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, true);
1095
1096        // OK, we have subscribed to some rooms. Let's check on `room0` if members are
1097        // now marked as not synced.
1098        assert!(room0.are_members_synced().not());
1099
1100        {
1101            let sticky = sliding_sync.inner.sticky.read().unwrap();
1102            let room_subscriptions = &sticky.data().room_subscriptions;
1103
1104            assert!(room_subscriptions.contains_key(room_id_0));
1105            assert!(room_subscriptions.contains_key(room_id_1));
1106            assert!(!room_subscriptions.contains_key(room_id_2));
1107        }
1108
1109        // Subscribing to the same room doesn't reset the member sync state.
1110
1111        {
1112            struct MemberMatcher(OwnedRoomId);
1113
1114            impl Match for MemberMatcher {
1115                fn matches(&self, request: &Request) -> bool {
1116                    request.url.path()
1117                        == format!("/_matrix/client/r0/rooms/{room_id}/members", room_id = self.0)
1118                        && request.method == Method::GET
1119                }
1120            }
1121
1122            let _mock_guard = Mock::given(MemberMatcher(room_id_0.to_owned()))
1123                .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1124                    "chunk": [],
1125                })))
1126                .mount_as_scoped(&server)
1127                .await;
1128
1129            assert_matches!(room0.request_members().await, Ok(()));
1130        }
1131
1132        // Members are synced, good, good.
1133        assert!(room0.are_members_synced());
1134
1135        sliding_sync.subscribe_to_rooms(&[room_id_0], None, false);
1136
1137        // Members are still synced: because we have already subscribed to the
1138        // room, the members aren't marked as unsynced.
1139        assert!(room0.are_members_synced());
1140
1141        Ok(())
1142    }
1143
1144    #[async_test]
1145    async fn test_room_subscriptions_are_reset_when_session_expires() -> Result<()> {
1146        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1147            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1148        .await?;
1149
1150        let room_id_0 = room_id!("!r0:bar.org");
1151        let room_id_1 = room_id!("!r1:bar.org");
1152        let room_id_2 = room_id!("!r2:bar.org");
1153
1154        // Subscribe to two rooms.
1155        sliding_sync.subscribe_to_rooms(&[room_id_0, room_id_1], None, false);
1156
1157        {
1158            let sticky = sliding_sync.inner.sticky.read().unwrap();
1159            let room_subscriptions = &sticky.data().room_subscriptions;
1160
1161            assert!(room_subscriptions.contains_key(room_id_0));
1162            assert!(room_subscriptions.contains_key(room_id_1));
1163            assert!(room_subscriptions.contains_key(room_id_2).not());
1164        }
1165
1166        // Subscribe to one more room.
1167        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1168
1169        {
1170            let sticky = sliding_sync.inner.sticky.read().unwrap();
1171            let room_subscriptions = &sticky.data().room_subscriptions;
1172
1173            assert!(room_subscriptions.contains_key(room_id_0));
1174            assert!(room_subscriptions.contains_key(room_id_1));
1175            assert!(room_subscriptions.contains_key(room_id_2));
1176        }
1177
1178        // Suddenly, the session expires!
1179        sliding_sync.expire_session().await;
1180
1181        {
1182            let sticky = sliding_sync.inner.sticky.read().unwrap();
1183            let room_subscriptions = &sticky.data().room_subscriptions;
1184
1185            assert!(room_subscriptions.is_empty());
1186        }
1187
1188        // Subscribe to one room again.
1189        sliding_sync.subscribe_to_rooms(&[room_id_2], None, false);
1190
1191        {
1192            let sticky = sliding_sync.inner.sticky.read().unwrap();
1193            let room_subscriptions = &sticky.data().room_subscriptions;
1194
1195            assert!(room_subscriptions.contains_key(room_id_0).not());
1196            assert!(room_subscriptions.contains_key(room_id_1).not());
1197            assert!(room_subscriptions.contains_key(room_id_2));
1198        }
1199
1200        Ok(())
1201    }
1202
1203    #[async_test]
1204    async fn test_to_device_token_properly_cached() -> Result<()> {
1205        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1206            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1207        .await?;
1208
1209        // FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved
1210        // in the crypto store since PR #2323.
1211        let frozen = FrozenSlidingSync::new(&*sliding_sync.inner.rooms.read().await);
1212        assert!(frozen.to_device_since.is_none());
1213
1214        Ok(())
1215    }
1216
1217    #[async_test]
1218    async fn test_add_list() -> Result<()> {
1219        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
1220            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
1221        .await?;
1222
1223        let _stream = sliding_sync.sync();
1224        pin_mut!(_stream);
1225
1226        sliding_sync
1227            .add_list(
1228                SlidingSyncList::builder("bar")
1229                    .sync_mode(SlidingSyncMode::new_selective().add_range(50..=60)),
1230            )
1231            .await?;
1232
1233        let lists = sliding_sync.inner.lists.read().await;
1234
1235        assert!(lists.contains_key("foo"));
1236        assert!(lists.contains_key("bar"));
1237
1238        // this test also ensures that Tokio is not panicking when calling `add_list`.
1239
1240        Ok(())
1241    }
1242
1243    #[test]
1244    fn test_sticky_parameters_api_invalidated_flow() {
1245        let r0 = room_id!("!r0.matrix.org");
1246        let r1 = room_id!("!r1:matrix.org");
1247
1248        let mut room_subscriptions = BTreeMap::new();
1249        room_subscriptions.insert(r0.to_owned(), Default::default());
1250
1251        // At first it's invalidated.
1252        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1253            room_subscriptions,
1254            Default::default(),
1255        ));
1256        assert!(sticky.is_invalidated());
1257
1258        // Then when we create a request, the sticky parameters are applied.
1259        let txn_id: &TransactionId = "tid123".into();
1260
1261        let mut request = http::Request::default();
1262        request.txn_id = Some(txn_id.to_string());
1263
1264        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1265
1266        assert!(request.txn_id.is_some());
1267        assert_eq!(request.room_subscriptions.len(), 1);
1268        assert!(request.room_subscriptions.contains_key(r0));
1269
1270        let tid = request.txn_id.unwrap();
1271
1272        sticky.maybe_commit(tid.as_str().into());
1273        assert!(!sticky.is_invalidated());
1274
1275        // Applying new parameters will invalidate again.
1276        sticky
1277            .data_mut()
1278            .room_subscriptions
1279            .insert(r1.to_owned(), (Default::default(), Default::default()));
1280        assert!(sticky.is_invalidated());
1281
1282        // Committing with the wrong transaction id will keep it invalidated.
1283        sticky.maybe_commit("wrong tid today, my love has gone away 🎵".into());
1284        assert!(sticky.is_invalidated());
1285
1286        // Restarting a request will only remember the last generated transaction id.
1287        let txn_id1: &TransactionId = "tid456".into();
1288        let mut request1 = http::Request::default();
1289        request1.txn_id = Some(txn_id1.to_string());
1290        sticky.maybe_apply(&mut request1, &mut LazyTransactionId::from_owned(txn_id1.to_owned()));
1291
1292        assert!(sticky.is_invalidated());
1293        // The first room subscription has been applied to `request`, so it's not
1294        // reapplied here. It's a particular logic of `room_subscriptions`, it's not
1295        // related to the sticky design.
1296        assert_eq!(request1.room_subscriptions.len(), 1);
1297        assert!(request1.room_subscriptions.contains_key(r1));
1298
1299        let txn_id2: &TransactionId = "tid789".into();
1300        let mut request2 = http::Request::default();
1301        request2.txn_id = Some(txn_id2.to_string());
1302
1303        sticky.maybe_apply(&mut request2, &mut LazyTransactionId::from_owned(txn_id2.to_owned()));
1304        assert!(sticky.is_invalidated());
1305        // `request2` contains `r1` because the sticky parameters have not been
1306        // committed, so it's still marked as pending.
1307        assert_eq!(request2.room_subscriptions.len(), 1);
1308        assert!(request2.room_subscriptions.contains_key(r1));
1309
1310        // Here we commit with the not most-recent TID, so it keeps the invalidated
1311        // status.
1312        sticky.maybe_commit(txn_id1);
1313        assert!(sticky.is_invalidated());
1314
1315        // But here we use the latest TID, so the commit is effective.
1316        sticky.maybe_commit(txn_id2);
1317        assert!(!sticky.is_invalidated());
1318    }
1319
1320    #[test]
1321    fn test_room_subscriptions_are_sticky() {
1322        let r0 = room_id!("!r0.matrix.org");
1323        let r1 = room_id!("!r1:matrix.org");
1324
1325        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1326            BTreeMap::new(),
1327            Default::default(),
1328        ));
1329
1330        // A room subscription is added, applied, and committed.
1331        {
1332            // Insert `r0`.
1333            sticky
1334                .data_mut()
1335                .room_subscriptions
1336                .insert(r0.to_owned(), (Default::default(), Default::default()));
1337
1338            // Then the sticky parameters are applied.
1339            let txn_id: &TransactionId = "tid0".into();
1340            let mut request = http::Request::default();
1341            request.txn_id = Some(txn_id.to_string());
1342
1343            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1344
1345            assert!(request.txn_id.is_some());
1346            assert_eq!(request.room_subscriptions.len(), 1);
1347            assert!(request.room_subscriptions.contains_key(r0));
1348
1349            // Then the sticky parameters are committed.
1350            let tid = request.txn_id.unwrap();
1351
1352            sticky.maybe_commit(tid.as_str().into());
1353        }
1354
1355        // A room subscription is added, applied, but NOT committed.
1356        {
1357            // Insert `r1`.
1358            sticky
1359                .data_mut()
1360                .room_subscriptions
1361                .insert(r1.to_owned(), (Default::default(), Default::default()));
1362
1363            // Then the sticky parameters are applied.
1364            let txn_id: &TransactionId = "tid1".into();
1365            let mut request = http::Request::default();
1366            request.txn_id = Some(txn_id.to_string());
1367
1368            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1369
1370            assert!(request.txn_id.is_some());
1371            assert_eq!(request.room_subscriptions.len(), 1);
1372            // `r0` is not present, it's only `r1`.
1373            assert!(request.room_subscriptions.contains_key(r1));
1374
1375            // Then the sticky parameters are NOT committed.
1376            // It can happen if the request has failed to be sent for example,
1377            // or if the response didn't match.
1378        }
1379
1380        // A previously added room subscription is re-added, applied, and committed.
1381        {
1382            // Then the sticky parameters are applied.
1383            let txn_id: &TransactionId = "tid2".into();
1384            let mut request = http::Request::default();
1385            request.txn_id = Some(txn_id.to_string());
1386
1387            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1388
1389            assert!(request.txn_id.is_some());
1390            assert_eq!(request.room_subscriptions.len(), 1);
1391            // `r0` is not present, it's only `r1`.
1392            assert!(request.room_subscriptions.contains_key(r1));
1393
1394            // Then the sticky parameters are committed.
1395            let tid = request.txn_id.unwrap();
1396
1397            sticky.maybe_commit(tid.as_str().into());
1398        }
1399
1400        // All room subscriptions have been committed.
1401        {
1402            // Then the sticky parameters are applied.
1403            let txn_id: &TransactionId = "tid3".into();
1404            let mut request = http::Request::default();
1405            request.txn_id = Some(txn_id.to_string());
1406
1407            sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1408
1409            assert!(request.txn_id.is_some());
1410            // All room subscriptions have been sent.
1411            assert!(request.room_subscriptions.is_empty());
1412        }
1413    }
1414
1415    #[test]
1416    fn test_extensions_are_sticky() {
1417        let mut extensions = http::request::Extensions::default();
1418        extensions.account_data.enabled = Some(true);
1419
1420        // At first it's invalidated.
1421        let mut sticky = SlidingSyncStickyManager::new(SlidingSyncStickyParameters::new(
1422            Default::default(),
1423            extensions,
1424        ));
1425
1426        assert!(sticky.is_invalidated(), "invalidated because of non default parameters");
1427
1428        // `StickyParameters::new` follows its caller's intent when it comes to e2ee and
1429        // to-device.
1430        let extensions = &sticky.data().extensions;
1431        assert_eq!(extensions.e2ee.enabled, None);
1432        assert_eq!(extensions.to_device.enabled, None);
1433        assert_eq!(extensions.to_device.since, None);
1434
1435        // What the user explicitly enabled is… enabled.
1436        assert_eq!(extensions.account_data.enabled, Some(true));
1437
1438        let txn_id: &TransactionId = "tid123".into();
1439        let mut request = http::Request::default();
1440        request.txn_id = Some(txn_id.to_string());
1441        sticky.maybe_apply(&mut request, &mut LazyTransactionId::from_owned(txn_id.to_owned()));
1442        assert!(sticky.is_invalidated());
1443        assert_eq!(request.extensions.to_device.enabled, None);
1444        assert_eq!(request.extensions.to_device.since, None);
1445        assert_eq!(request.extensions.e2ee.enabled, None);
1446        assert_eq!(request.extensions.account_data.enabled, Some(true));
1447    }
1448
1449    #[async_test]
1450    async fn test_sticky_extensions_plus_since() -> Result<()> {
1451        let server = MockServer::start().await;
1452        let client = logged_in_client(Some(server.uri())).await;
1453
1454        let sync = client
1455            .sliding_sync("test-slidingsync")?
1456            .add_list(SlidingSyncList::builder("new_list"))
1457            .build()
1458            .await?;
1459
1460        // No extensions have been explicitly enabled here.
1461        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.to_device.enabled, None);
1462        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.e2ee.enabled, None);
1463        assert_eq!(sync.inner.sticky.read().unwrap().data().extensions.account_data.enabled, None);
1464
1465        // Now enable e2ee and to-device.
1466        let sync = client
1467            .sliding_sync("test-slidingsync")?
1468            .add_list(SlidingSyncList::builder("new_list"))
1469            .with_to_device_extension(
1470                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
1471            )
1472            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1473            .build()
1474            .await?;
1475
1476        // Even without a since token, the first request will contain the extensions
1477        // configuration, at least.
1478        let txn_id = TransactionId::new();
1479        let (request, _, _) = sync
1480            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1481            .await?;
1482
1483        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1484        assert_eq!(request.extensions.to_device.enabled, Some(true));
1485        assert!(request.extensions.to_device.since.is_none());
1486
1487        {
1488            // Committing with another transaction id doesn't validate anything.
1489            let mut sticky = sync.inner.sticky.write().unwrap();
1490            assert!(sticky.is_invalidated());
1491            sticky.maybe_commit(
1492                "hopefully the rng won't generate this very specific transaction id".into(),
1493            );
1494            assert!(sticky.is_invalidated());
1495        }
1496
1497        // Regenerating a request will yield the same one.
1498        let txn_id2 = TransactionId::new();
1499        let (request, _, _) = sync
1500            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id2.to_owned()))
1501            .await?;
1502
1503        assert_eq!(request.extensions.e2ee.enabled, Some(true));
1504        assert_eq!(request.extensions.to_device.enabled, Some(true));
1505        assert!(request.extensions.to_device.since.is_none());
1506
1507        assert!(txn_id != txn_id2, "the two requests must not share the same transaction id");
1508
1509        {
1510            // Committing with the expected transaction id will validate it.
1511            let mut sticky = sync.inner.sticky.write().unwrap();
1512            assert!(sticky.is_invalidated());
1513            sticky.maybe_commit(txn_id2.as_str().into());
1514            assert!(!sticky.is_invalidated());
1515        }
1516
1517        // The next request should contain no sticky parameters.
1518        let txn_id = TransactionId::new();
1519        let (request, _, _) = sync
1520            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1521            .await?;
1522        assert!(request.extensions.e2ee.enabled.is_none());
1523        assert!(request.extensions.to_device.enabled.is_none());
1524        assert!(request.extensions.to_device.since.is_none());
1525
1526        // If there's a to-device `since` token, we make sure we put the token
1527        // into the extension config. The rest doesn't need to be re-enabled due to
1528        // stickiness.
1529        let _since_token = "since";
1530
1531        #[cfg(feature = "e2e-encryption")]
1532        {
1533            use matrix_sdk_base::crypto::store::Changes;
1534            if let Some(olm_machine) = &*client.olm_machine().await {
1535                olm_machine
1536                    .store()
1537                    .save_changes(Changes {
1538                        next_batch_token: Some(_since_token.to_owned()),
1539                        ..Default::default()
1540                    })
1541                    .await?;
1542            }
1543        }
1544
1545        let txn_id = TransactionId::new();
1546        let (request, _, _) = sync
1547            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1548            .await?;
1549
1550        assert!(request.extensions.e2ee.enabled.is_none());
1551        assert!(request.extensions.to_device.enabled.is_none());
1552
1553        #[cfg(feature = "e2e-encryption")]
1554        assert_eq!(request.extensions.to_device.since.as_deref(), Some(_since_token));
1555
1556        Ok(())
1557    }
1558
1559    // With MSC4186, with the `e2ee` extension enabled, if a request has no `pos`,
1560    // all the tracked users by the `OlmMachine` must be marked as dirty, i.e.
1561    // `/key/query` requests must be sent. See the code to see the details.
1562    //
1563    // This test is asserting that.
1564    #[async_test]
1565    #[cfg(feature = "e2e-encryption")]
1566    async fn test_no_pos_with_e2ee_marks_all_tracked_users_as_dirty() -> anyhow::Result<()> {
1567        use matrix_sdk_base::crypto::types::requests::{AnyIncomingResponse, AnyOutgoingRequest};
1568        use matrix_sdk_test::ruma_response_from_json;
1569        use ruma::user_id;
1570
1571        let server = MockServer::start().await;
1572        let client = logged_in_client(Some(server.uri())).await;
1573
1574        let alice = user_id!("@alice:localhost");
1575        let bob = user_id!("@bob:localhost");
1576        let me = user_id!("@example:localhost");
1577
1578        // Track and mark users are not dirty, so that we can check they are “dirty”
1579        // after that. Dirty here means that a `/key/query` must be sent.
1580        {
1581            let olm_machine = client.olm_machine().await;
1582            let olm_machine = olm_machine.as_ref().unwrap();
1583
1584            olm_machine.update_tracked_users([alice, bob]).await?;
1585
1586            // Assert requests.
1587            let outgoing_requests = olm_machine.outgoing_requests().await?;
1588
1589            assert_eq!(outgoing_requests.len(), 2);
1590            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysUpload(_));
1591            assert_matches!(outgoing_requests[1].request(), AnyOutgoingRequest::KeysQuery(_));
1592
1593            // Fake responses.
1594            olm_machine
1595                .mark_request_as_sent(
1596                    outgoing_requests[0].request_id(),
1597                    AnyIncomingResponse::KeysUpload(&ruma_response_from_json(&json!({
1598                        "one_time_key_counts": {}
1599                    }))),
1600                )
1601                .await?;
1602
1603            olm_machine
1604                .mark_request_as_sent(
1605                    outgoing_requests[1].request_id(),
1606                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1607                        "device_keys": {
1608                            alice: {},
1609                            bob: {},
1610                        }
1611                    }))),
1612                )
1613                .await?;
1614
1615            // Once more.
1616            let outgoing_requests = olm_machine.outgoing_requests().await?;
1617
1618            assert_eq!(outgoing_requests.len(), 1);
1619            assert_matches!(outgoing_requests[0].request(), AnyOutgoingRequest::KeysQuery(_));
1620
1621            olm_machine
1622                .mark_request_as_sent(
1623                    outgoing_requests[0].request_id(),
1624                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1625                        "device_keys": {
1626                            me: {},
1627                        }
1628                    }))),
1629                )
1630                .await?;
1631
1632            // No more.
1633            let outgoing_requests = olm_machine.outgoing_requests().await?;
1634
1635            assert!(outgoing_requests.is_empty());
1636        }
1637
1638        let sync = client
1639            .sliding_sync("test-slidingsync")?
1640            .add_list(SlidingSyncList::builder("new_list"))
1641            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
1642            .build()
1643            .await?;
1644
1645        // First request: no `pos`.
1646        let txn_id = TransactionId::new();
1647        let (_request, _, _) = sync
1648            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1649            .await?;
1650
1651        // Now, tracked users must be dirty.
1652        {
1653            let olm_machine = client.olm_machine().await;
1654            let olm_machine = olm_machine.as_ref().unwrap();
1655
1656            // Assert requests.
1657            let outgoing_requests = olm_machine.outgoing_requests().await?;
1658
1659            assert_eq!(outgoing_requests.len(), 1);
1660            assert_matches!(
1661                outgoing_requests[0].request(),
1662                AnyOutgoingRequest::KeysQuery(request) => {
1663                    assert!(request.device_keys.contains_key(alice));
1664                    assert!(request.device_keys.contains_key(bob));
1665                    assert!(request.device_keys.contains_key(me));
1666                }
1667            );
1668
1669            // Fake responses.
1670            olm_machine
1671                .mark_request_as_sent(
1672                    outgoing_requests[0].request_id(),
1673                    AnyIncomingResponse::KeysQuery(&ruma_response_from_json(&json!({
1674                        "device_keys": {
1675                            alice: {},
1676                            bob: {},
1677                            me: {},
1678                        }
1679                    }))),
1680                )
1681                .await?;
1682        }
1683
1684        // Second request: with a `pos` this time.
1685        sync.set_pos("chocolat".to_owned()).await;
1686
1687        let txn_id = TransactionId::new();
1688        let (_request, _, _) = sync
1689            .generate_sync_request(&mut LazyTransactionId::from_owned(txn_id.to_owned()))
1690            .await?;
1691
1692        // Tracked users are not marked as dirty.
1693        {
1694            let olm_machine = client.olm_machine().await;
1695            let olm_machine = olm_machine.as_ref().unwrap();
1696
1697            // Assert requests.
1698            let outgoing_requests = olm_machine.outgoing_requests().await?;
1699
1700            assert!(outgoing_requests.is_empty());
1701        }
1702
1703        Ok(())
1704    }
1705
1706    #[async_test]
1707    async fn test_unknown_pos_resets_pos_and_sticky_parameters() -> Result<()> {
1708        let server = MockServer::start().await;
1709        let client = logged_in_client(Some(server.uri())).await;
1710
1711        let sliding_sync = client
1712            .sliding_sync("test-slidingsync")?
1713            .with_to_device_extension(
1714                assign!(http::request::ToDevice::default(), { enabled: Some(true) }),
1715            )
1716            .build()
1717            .await?;
1718
1719        // First request asks to enable the extension.
1720        let (request, _, _) =
1721            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1722        assert!(request.extensions.to_device.enabled.is_some());
1723
1724        let sync = sliding_sync.sync();
1725        pin_mut!(sync);
1726
1727        // `pos` is `None` to start with.
1728        assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1729
1730        #[derive(Deserialize)]
1731        struct PartialRequest {
1732            txn_id: Option<String>,
1733        }
1734
1735        {
1736            let _mock_guard = Mock::given(SlidingSyncMatcher)
1737                .respond_with(|request: &Request| {
1738                    // Repeat the txn_id in the response, if set.
1739                    let request: PartialRequest = request.body_json().unwrap();
1740
1741                    ResponseTemplate::new(200).set_body_json(json!({
1742                        "txn_id": request.txn_id,
1743                        "pos": "0",
1744                    }))
1745                })
1746                .mount_as_scoped(&server)
1747                .await;
1748
1749            let next = sync.next().await;
1750            assert_matches!(next, Some(Ok(_update_summary)));
1751
1752            // `pos` has been updated.
1753            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1754        }
1755
1756        // Next request doesn't ask to enable the extension.
1757        let (request, _, _) =
1758            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1759        assert!(request.extensions.to_device.enabled.is_none());
1760
1761        // Next request is successful.
1762        {
1763            let _mock_guard = Mock::given(SlidingSyncMatcher)
1764                .respond_with(|request: &Request| {
1765                    // Repeat the txn_id in the response, if set.
1766                    let request: PartialRequest = request.body_json().unwrap();
1767
1768                    ResponseTemplate::new(200).set_body_json(json!({
1769                        "txn_id": request.txn_id,
1770                        "pos": "1",
1771                    }))
1772                })
1773                .mount_as_scoped(&server)
1774                .await;
1775
1776            let next = sync.next().await;
1777            assert_matches!(next, Some(Ok(_update_summary)));
1778
1779            // `pos` has been updated.
1780            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
1781        }
1782
1783        // Next request is successful despite it receives an already
1784        // received `pos` from the server.
1785        {
1786            let _mock_guard = Mock::given(SlidingSyncMatcher)
1787                .respond_with(|request: &Request| {
1788                    // Repeat the txn_id in the response, if set.
1789                    let request: PartialRequest = request.body_json().unwrap();
1790
1791                    ResponseTemplate::new(200).set_body_json(json!({
1792                        "txn_id": request.txn_id,
1793                        "pos": "0", // <- already received!
1794                    }))
1795                })
1796                .up_to_n_times(1) // run this mock only once.
1797                .mount_as_scoped(&server)
1798                .await;
1799
1800            let next = sync.next().await;
1801            assert_matches!(next, Some(Ok(_update_summary)));
1802
1803            // `pos` has been updated.
1804            assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1805        }
1806
1807        // Stop responding with successful requests!
1808        //
1809        // When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
1810        // so they're reset. It also resets the `pos`.
1811        {
1812            let _mock_guard = Mock::given(SlidingSyncMatcher)
1813                .respond_with(ResponseTemplate::new(400).set_body_json(json!({
1814                    "error": "foo",
1815                    "errcode": "M_UNKNOWN_POS",
1816                })))
1817                .mount_as_scoped(&server)
1818                .await;
1819
1820            let next = sync.next().await;
1821
1822            // The expected error is returned.
1823            assert_matches!(next, Some(Err(err)) if err.client_api_error_kind() == Some(&ErrorKind::UnknownPos));
1824
1825            // `pos` has been reset.
1826            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1827
1828            // Next request asks to enable the extension again.
1829            let (request, _, _) =
1830                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1831
1832            assert!(request.extensions.to_device.enabled.is_some());
1833
1834            // `sync` has been stopped.
1835            assert!(sync.next().await.is_none());
1836        }
1837
1838        Ok(())
1839    }
1840
1841    #[cfg(feature = "e2e-encryption")]
1842    #[async_test]
1843    async fn test_sliding_sync_doesnt_remember_pos() -> Result<()> {
1844        let server = MockServer::start().await;
1845
1846        #[derive(Deserialize)]
1847        struct PartialRequest {
1848            txn_id: Option<String>,
1849        }
1850
1851        let server_pos = Arc::new(Mutex::new(0));
1852        let _mock_guard = Mock::given(SlidingSyncMatcher)
1853            .respond_with(move |request: &Request| {
1854                // Repeat the txn_id in the response, if set.
1855                let request: PartialRequest = request.body_json().unwrap();
1856                let pos = {
1857                    let mut pos = server_pos.lock().unwrap();
1858                    let prev = *pos;
1859                    *pos += 1;
1860                    prev
1861                };
1862
1863                ResponseTemplate::new(200).set_body_json(json!({
1864                    "txn_id": request.txn_id,
1865                    "pos": pos.to_string(),
1866                }))
1867            })
1868            .mount_as_scoped(&server)
1869            .await;
1870
1871        let client = logged_in_client(Some(server.uri())).await;
1872
1873        let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1874
1875        // `pos` is `None` to start with.
1876        {
1877            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1878
1879            let (request, _, _) =
1880                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1881            assert!(request.pos.is_none());
1882        }
1883
1884        let sync = sliding_sync.sync();
1885        pin_mut!(sync);
1886
1887        // Sync goes well, and then the position is saved both into the internal memory
1888        // and the database.
1889        let next = sync.next().await;
1890        assert_matches!(next, Some(Ok(_update_summary)));
1891
1892        assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1893
1894        let restored_fields = restore_sliding_sync_state(
1895            &client,
1896            &sliding_sync.inner.storage_key,
1897            &*sliding_sync.inner.lists.read().await,
1898        )
1899        .await?
1900        .expect("must have restored fields");
1901
1902        // While it has been saved into the database, it's not necessarily going to be
1903        // used later!
1904        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
1905
1906        // Now, even if we mess with the position stored in the database, the sliding
1907        // sync instance isn't configured to reload the stream position from the
1908        // database, so it won't be changed.
1909        {
1910            let other_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1911
1912            let mut position_guard = other_sync.inner.position.lock().await;
1913            position_guard.pos = Some("yolo".to_owned());
1914
1915            other_sync.cache_to_storage(&position_guard).await?;
1916        }
1917
1918        // It's still 0, not "yolo".
1919        {
1920            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("0"));
1921            let (request, _, _) =
1922                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1923            assert_eq!(request.pos.as_deref(), Some("0"));
1924        }
1925
1926        // Recreating a sliding sync with the same ID doesn't preload the pos, if not
1927        // asked to.
1928        {
1929            let sliding_sync = client.sliding_sync("forgetful-sync")?.build().await?;
1930            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1931        }
1932
1933        Ok(())
1934    }
1935
1936    #[cfg(feature = "e2e-encryption")]
1937    #[async_test]
1938    async fn test_sliding_sync_does_remember_pos() -> Result<()> {
1939        let server = MockServer::start().await;
1940
1941        #[derive(Deserialize)]
1942        struct PartialRequest {
1943            txn_id: Option<String>,
1944        }
1945
1946        let server_pos = Arc::new(Mutex::new(0));
1947        let _mock_guard = Mock::given(SlidingSyncMatcher)
1948            .respond_with(move |request: &Request| {
1949                // Repeat the txn_id in the response, if set.
1950                let request: PartialRequest = request.body_json().unwrap();
1951                let pos = {
1952                    let mut pos = server_pos.lock().unwrap();
1953                    let prev = *pos;
1954                    *pos += 1;
1955                    prev
1956                };
1957
1958                ResponseTemplate::new(200).set_body_json(json!({
1959                    "txn_id": request.txn_id,
1960                    "pos": pos.to_string(),
1961                }))
1962            })
1963            .mount_as_scoped(&server)
1964            .await;
1965
1966        let client = logged_in_client(Some(server.uri())).await;
1967
1968        let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
1969
1970        // `pos` is `None` to start with.
1971        {
1972            let (request, _, _) =
1973                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
1974
1975            assert!(request.pos.is_none());
1976            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
1977        }
1978
1979        let sync = sliding_sync.sync();
1980        pin_mut!(sync);
1981
1982        // Sync goes well, and then the position is saved both into the internal memory
1983        // and the database.
1984        let next = sync.next().await;
1985        assert_matches!(next, Some(Ok(_update_summary)));
1986
1987        assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
1988
1989        let restored_fields = restore_sliding_sync_state(
1990            &client,
1991            &sliding_sync.inner.storage_key,
1992            &*sliding_sync.inner.lists.read().await,
1993        )
1994        .await?
1995        .expect("must have restored fields");
1996
1997        // While it has been saved into the database, it's not necessarily going to be
1998        // used later!
1999        assert_eq!(restored_fields.pos.as_deref(), Some("0"));
2000
2001        // Another process modifies the stream position under our feet...
2002        {
2003            let other_sync = client.sliding_sync("elephant-sync")?.build().await?;
2004
2005            let mut position_guard = other_sync.inner.position.lock().await;
2006            position_guard.pos = Some("42".to_owned());
2007
2008            other_sync.cache_to_storage(&position_guard).await?;
2009        }
2010
2011        // It's alright, the next request will load it from the database.
2012        {
2013            let (request, _, _) =
2014                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2015            assert_eq!(request.pos.as_deref(), Some("42"));
2016            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2017        }
2018
2019        // Recreating a sliding sync with the same ID will reload it too.
2020        {
2021            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2022            assert_eq!(sliding_sync.inner.position.lock().await.pos.as_deref(), Some("42"));
2023
2024            let (request, _, _) =
2025                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2026            assert_eq!(request.pos.as_deref(), Some("42"));
2027        }
2028
2029        // Invalidating the session will remove the in-memory value AND the database
2030        // value.
2031        sliding_sync.expire_session().await;
2032
2033        {
2034            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2035
2036            let (request, _, _) =
2037                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2038            assert!(request.pos.is_none());
2039        }
2040
2041        // And new sliding syncs with the same ID won't find it either.
2042        {
2043            let sliding_sync = client.sliding_sync("elephant-sync")?.share_pos().build().await?;
2044            assert!(sliding_sync.inner.position.lock().await.pos.is_none());
2045
2046            let (request, _, _) =
2047                sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2048            assert!(request.pos.is_none());
2049        }
2050
2051        Ok(())
2052    }
2053
2054    #[async_test]
2055    async fn test_stop_sync_loop() -> Result<()> {
2056        let (_server, sliding_sync) = new_sliding_sync(vec![SlidingSyncList::builder("foo")
2057            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10))])
2058        .await?;
2059
2060        // Start the sync loop.
2061        let stream = sliding_sync.sync();
2062        pin_mut!(stream);
2063
2064        // The sync loop is actually running.
2065        assert!(stream.next().await.is_some());
2066
2067        // Stop the sync loop.
2068        sliding_sync.stop_sync()?;
2069
2070        // The sync loop is actually stopped.
2071        assert!(stream.next().await.is_none());
2072
2073        // Start a new sync loop.
2074        let stream = sliding_sync.sync();
2075        pin_mut!(stream);
2076
2077        // The sync loop is actually running.
2078        assert!(stream.next().await.is_some());
2079
2080        Ok(())
2081    }
2082
2083    #[async_test]
2084    async fn test_process_read_receipts() -> Result<()> {
2085        let room = owned_room_id!("!pony:example.org");
2086
2087        let server = MockServer::start().await;
2088        let client = logged_in_client(Some(server.uri())).await;
2089
2090        let sliding_sync = client
2091            .sliding_sync("test")?
2092            .with_receipt_extension(
2093                assign!(http::request::Receipts::default(), { enabled: Some(true) }),
2094            )
2095            .add_list(
2096                SlidingSyncList::builder("all")
2097                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2098            )
2099            .build()
2100            .await?;
2101
2102        // Initial state.
2103        {
2104            let server_response = assign!(http::Response::new("0".to_owned()), {
2105                rooms: BTreeMap::from([(
2106                    room.clone(),
2107                    http::response::Room::default(),
2108                )])
2109            });
2110
2111            let _summary = {
2112                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2113                sliding_sync
2114                    .handle_response(
2115                        server_response.clone(),
2116                        &mut pos_guard,
2117                        RequestedRequiredStates::default(),
2118                    )
2119                    .await?
2120            };
2121        }
2122
2123        let server_response = assign!(http::Response::new("1".to_owned()), {
2124            extensions: assign!(http::response::Extensions::default(), {
2125                receipts: assign!(http::response::Receipts::default(), {
2126                    rooms: BTreeMap::from([
2127                        (
2128                            room.clone(),
2129                            Raw::from_json_string(
2130                                json!({
2131                                    "room_id": room,
2132                                    "type": "m.receipt",
2133                                    "content": {
2134                                        "$event:bar.org": {
2135                                            "m.read": {
2136                                                client.user_id().unwrap(): {
2137                                                    "ts": 1436451550,
2138                                                }
2139                                            }
2140                                        }
2141                                    }
2142                                })
2143                                .to_string(),
2144                            ).unwrap()
2145                        )
2146                    ])
2147                })
2148            })
2149        });
2150
2151        let summary = {
2152            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2153            sliding_sync
2154                .handle_response(
2155                    server_response.clone(),
2156                    &mut pos_guard,
2157                    RequestedRequiredStates::default(),
2158                )
2159                .await?
2160        };
2161
2162        assert!(summary.rooms.contains(&room));
2163
2164        Ok(())
2165    }
2166
2167    #[async_test]
2168    async fn test_process_marked_unread_room_account_data() -> Result<()> {
2169        let room_id = owned_room_id!("!unicorn:example.org");
2170
2171        let server = MockServer::start().await;
2172        let client = logged_in_client(Some(server.uri())).await;
2173
2174        // Setup sliding sync with with one room and one list
2175
2176        let sliding_sync = client
2177            .sliding_sync("test")?
2178            .with_account_data_extension(
2179                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2180            )
2181            .add_list(
2182                SlidingSyncList::builder("all")
2183                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2184            )
2185            .build()
2186            .await?;
2187
2188        // Initial state.
2189        {
2190            let server_response = assign!(http::Response::new("0".to_owned()), {
2191                rooms: BTreeMap::from([(
2192                    room_id.clone(),
2193                    http::response::Room::default(),
2194                )])
2195            });
2196
2197            let _summary = {
2198                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2199                sliding_sync
2200                    .handle_response(
2201                        server_response.clone(),
2202                        &mut pos_guard,
2203                        RequestedRequiredStates::default(),
2204                    )
2205                    .await?
2206            };
2207        }
2208
2209        // Simulate a response that only changes the marked unread state of the room to
2210        // true
2211
2212        let server_response = make_mark_unread_response("1", room_id.clone(), true, false);
2213
2214        let update_summary = {
2215            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2216            sliding_sync
2217                .handle_response(
2218                    server_response.clone(),
2219                    &mut pos_guard,
2220                    RequestedRequiredStates::default(),
2221                )
2222                .await?
2223        };
2224
2225        // Check that the list list and entry received the update
2226
2227        assert!(update_summary.rooms.contains(&room_id));
2228
2229        let room = client.get_room(&room_id).unwrap();
2230
2231        // Check the actual room data, this powers RoomInfo
2232
2233        assert!(room.is_marked_unread());
2234
2235        // Change it back to false and check if it updates
2236
2237        let server_response = make_mark_unread_response("2", room_id.clone(), false, true);
2238
2239        let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2240        sliding_sync
2241            .handle_response(
2242                server_response.clone(),
2243                &mut pos_guard,
2244                RequestedRequiredStates::default(),
2245            )
2246            .await?;
2247
2248        let room = client.get_room(&room_id).unwrap();
2249
2250        assert!(!room.is_marked_unread());
2251
2252        Ok(())
2253    }
2254
2255    fn make_mark_unread_response(
2256        response_number: &str,
2257        room_id: OwnedRoomId,
2258        unread: bool,
2259        add_rooms_section: bool,
2260    ) -> http::Response {
2261        let rooms = if add_rooms_section {
2262            BTreeMap::from([(room_id.clone(), http::response::Room::default())])
2263        } else {
2264            BTreeMap::new()
2265        };
2266
2267        let extensions = assign!(http::response::Extensions::default(), {
2268            account_data: assign!(http::response::AccountData::default(), {
2269                rooms: BTreeMap::from([
2270                    (
2271                        room_id,
2272                        vec![
2273                            Raw::from_json_string(
2274                                json!({
2275                                    "content": {
2276                                        "unread": unread
2277                                    },
2278                                    "type": "com.famedly.marked_unread"
2279                                })
2280                                .to_string(),
2281                            ).unwrap()
2282                        ]
2283                    )
2284                ])
2285            })
2286        });
2287
2288        assign!(http::Response::new(response_number.to_owned()), { rooms: rooms, extensions: extensions })
2289    }
2290
2291    #[async_test]
2292    async fn test_process_rooms_account_data() -> Result<()> {
2293        let room = owned_room_id!("!pony:example.org");
2294
2295        let server = MockServer::start().await;
2296        let client = logged_in_client(Some(server.uri())).await;
2297
2298        let sliding_sync = client
2299            .sliding_sync("test")?
2300            .with_account_data_extension(
2301                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
2302            )
2303            .add_list(
2304                SlidingSyncList::builder("all")
2305                    .sync_mode(SlidingSyncMode::new_selective().add_range(0..=100)),
2306            )
2307            .build()
2308            .await?;
2309
2310        // Initial state.
2311        {
2312            let server_response = assign!(http::Response::new("0".to_owned()), {
2313                rooms: BTreeMap::from([(
2314                    room.clone(),
2315                    http::response::Room::default(),
2316                )])
2317            });
2318
2319            let _summary = {
2320                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2321                sliding_sync
2322                    .handle_response(
2323                        server_response.clone(),
2324                        &mut pos_guard,
2325                        RequestedRequiredStates::default(),
2326                    )
2327                    .await?
2328            };
2329        }
2330
2331        let server_response = assign!(http::Response::new("1".to_owned()), {
2332            extensions: assign!(http::response::Extensions::default(), {
2333                account_data: assign!(http::response::AccountData::default(), {
2334                    rooms: BTreeMap::from([
2335                        (
2336                            room.clone(),
2337                            vec![
2338                                Raw::from_json_string(
2339                                    json!({
2340                                        "content": {
2341                                            "tags": {
2342                                                "u.work": {
2343                                                    "order": 0.9
2344                                                }
2345                                            }
2346                                        },
2347                                        "type": "m.tag"
2348                                    })
2349                                    .to_string(),
2350                                ).unwrap()
2351                            ]
2352                        )
2353                    ])
2354                })
2355            })
2356        });
2357        let summary = {
2358            let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2359            sliding_sync
2360                .handle_response(
2361                    server_response.clone(),
2362                    &mut pos_guard,
2363                    RequestedRequiredStates::default(),
2364                )
2365                .await?
2366        };
2367
2368        assert!(summary.rooms.contains(&room));
2369
2370        Ok(())
2371    }
2372
2373    #[async_test]
2374    #[cfg(feature = "e2e-encryption")]
2375    async fn test_process_only_encryption_events() -> Result<()> {
2376        use ruma::OneTimeKeyAlgorithm;
2377
2378        let room = owned_room_id!("!croissant:example.org");
2379
2380        let server = MockServer::start().await;
2381        let client = logged_in_client(Some(server.uri())).await;
2382
2383        let server_response = assign!(http::Response::new("0".to_owned()), {
2384            rooms: BTreeMap::from([(
2385                room.clone(),
2386                assign!(http::response::Room::default(), {
2387                    name: Some("Croissants lovers".to_owned()),
2388                    timeline: Vec::new(),
2389                }),
2390            )]),
2391
2392            extensions: assign!(http::response::Extensions::default(), {
2393                e2ee: assign!(http::response::E2EE::default(), {
2394                    device_one_time_keys_count: BTreeMap::from([(OneTimeKeyAlgorithm::SignedCurve25519, uint!(42))])
2395                }),
2396                to_device: Some(assign!(http::response::ToDevice::default(), {
2397                    next_batch: "to-device-token".to_owned(),
2398                })),
2399            })
2400        });
2401
2402        // Don't process non-encryption events if the sliding sync is configured for
2403        // encryption only.
2404
2405        let sliding_sync = client
2406            .sliding_sync("test")?
2407            .with_to_device_extension(
2408                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2409            )
2410            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2411            .build()
2412            .await?;
2413
2414        {
2415            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2416
2417            sliding_sync
2418                .handle_response(
2419                    server_response.clone(),
2420                    &mut position_guard,
2421                    RequestedRequiredStates::default(),
2422                )
2423                .await?;
2424        }
2425
2426        // E2EE has been properly handled.
2427        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2428        assert_eq!(uploaded_key_count, 42);
2429
2430        {
2431            let olm_machine = &*client.olm_machine_for_testing().await;
2432            assert_eq!(
2433                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2434                Some("to-device-token")
2435            );
2436        }
2437
2438        // Room events haven't.
2439        assert!(client.get_room(&room).is_none());
2440
2441        // Conversely, only process room lists events if the sliding sync was configured
2442        // as so.
2443        let client = logged_in_client(Some(server.uri())).await;
2444
2445        let sliding_sync = client
2446            .sliding_sync("test")?
2447            .add_list(SlidingSyncList::builder("thelist"))
2448            .build()
2449            .await?;
2450
2451        {
2452            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2453
2454            sliding_sync
2455                .handle_response(
2456                    server_response.clone(),
2457                    &mut position_guard,
2458                    RequestedRequiredStates::default(),
2459                )
2460                .await?;
2461        }
2462
2463        // E2EE response has been ignored.
2464        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2465        assert_eq!(uploaded_key_count, 0);
2466
2467        {
2468            let olm_machine = &*client.olm_machine_for_testing().await;
2469            assert_eq!(
2470                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2471                None
2472            );
2473        }
2474
2475        // The room is now known.
2476        assert!(client.get_room(&room).is_some());
2477
2478        // And it's also possible to set up both.
2479        let client = logged_in_client(Some(server.uri())).await;
2480
2481        let sliding_sync = client
2482            .sliding_sync("test")?
2483            .add_list(SlidingSyncList::builder("thelist"))
2484            .with_to_device_extension(
2485                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2486            )
2487            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2488            .build()
2489            .await?;
2490
2491        {
2492            let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
2493
2494            sliding_sync
2495                .handle_response(
2496                    server_response.clone(),
2497                    &mut position_guard,
2498                    RequestedRequiredStates::default(),
2499                )
2500                .await?;
2501        }
2502
2503        // E2EE has been properly handled.
2504        let uploaded_key_count = client.encryption().uploaded_key_count().await?;
2505        assert_eq!(uploaded_key_count, 42);
2506
2507        {
2508            let olm_machine = &*client.olm_machine_for_testing().await;
2509            assert_eq!(
2510                olm_machine.as_ref().unwrap().store().next_batch_token().await?.as_deref(),
2511                Some("to-device-token")
2512            );
2513        }
2514
2515        // The room is now known.
2516        assert!(client.get_room(&room).is_some());
2517
2518        Ok(())
2519    }
2520
2521    #[async_test]
2522    async fn test_lock_multiple_requests() -> Result<()> {
2523        let server = MockServer::start().await;
2524        let client = logged_in_client(Some(server.uri())).await;
2525
2526        let pos = Arc::new(Mutex::new(0));
2527        let _mock_guard = Mock::given(SlidingSyncMatcher)
2528            .respond_with(move |_: &Request| {
2529                let mut pos = pos.lock().unwrap();
2530                *pos += 1;
2531                ResponseTemplate::new(200).set_body_json(json!({
2532                    "pos": pos.to_string(),
2533                    "lists": {},
2534                    "rooms": {}
2535                }))
2536            })
2537            .mount_as_scoped(&server)
2538            .await;
2539
2540        let sliding_sync = client
2541            .sliding_sync("test")?
2542            .with_to_device_extension(
2543                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2544            )
2545            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2546            .build()
2547            .await?;
2548
2549        // Spawn two requests in parallel. Before #2430, this lead to a deadlock and the
2550        // test would never terminate.
2551        let requests = join_all([sliding_sync.sync_once(), sliding_sync.sync_once()]);
2552
2553        for result in requests.await {
2554            result?;
2555        }
2556
2557        Ok(())
2558    }
2559
2560    #[async_test]
2561    async fn test_aborted_request_doesnt_update_future_requests() -> Result<()> {
2562        let server = MockServer::start().await;
2563        let client = logged_in_client(Some(server.uri())).await;
2564
2565        let pos = Arc::new(Mutex::new(0));
2566        let _mock_guard = Mock::given(SlidingSyncMatcher)
2567            .respond_with(move |_: &Request| {
2568                let mut pos = pos.lock().unwrap();
2569                *pos += 1;
2570                // Respond slowly enough that we can skip one iteration.
2571                ResponseTemplate::new(200)
2572                    .set_body_json(json!({
2573                        "pos": pos.to_string(),
2574                        "lists": {},
2575                        "rooms": {}
2576                    }))
2577                    .set_delay(Duration::from_secs(2))
2578            })
2579            .mount_as_scoped(&server)
2580            .await;
2581
2582        let sliding_sync =
2583            client
2584                .sliding_sync("test")?
2585                .add_list(SlidingSyncList::builder("room-list").sync_mode(
2586                    SlidingSyncMode::new_growing(10).maximum_number_of_rooms_to_fetch(100),
2587                ))
2588                .add_list(
2589                    SlidingSyncList::builder("another-list")
2590                        .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2591                )
2592                .build()
2593                .await?;
2594
2595        let stream = sliding_sync.sync();
2596        pin_mut!(stream);
2597
2598        let cloned_sync = sliding_sync.clone();
2599        tokio::spawn(async move {
2600            tokio::time::sleep(Duration::from_millis(100)).await;
2601
2602            cloned_sync
2603                .on_list("another-list", |list| {
2604                    list.set_sync_mode(SlidingSyncMode::new_selective().add_range(10..=20));
2605                    ready(())
2606                })
2607                .await;
2608        });
2609
2610        assert_matches!(stream.next().await, Some(Ok(_)));
2611
2612        sliding_sync.stop_sync().unwrap();
2613
2614        assert_matches!(stream.next().await, None);
2615
2616        let mut num_requests = 0;
2617
2618        for request in server.received_requests().await.unwrap() {
2619            if !SlidingSyncMatcher.matches(&request) {
2620                continue;
2621            }
2622
2623            let another_list_ranges = if num_requests == 0 {
2624                // First request
2625                json!([[0, 10]])
2626            } else {
2627                // Second request
2628                json!([[10, 20]])
2629            };
2630
2631            num_requests += 1;
2632            assert!(num_requests <= 2, "more than one request hit the server");
2633
2634            let json_value = serde_json::from_slice::<serde_json::Value>(&request.body).unwrap();
2635
2636            if let Err(err) = assert_json_diff::assert_json_matches_no_panic(
2637                &json_value,
2638                &json!({
2639                    "conn_id": "test",
2640                    "lists": {
2641                        "room-list": {
2642                            "ranges": [[0, 9]],
2643                            "required_state": [
2644                                ["m.room.encryption", ""],
2645                                ["m.room.tombstone", ""]
2646                            ],
2647                        },
2648                        "another-list": {
2649                            "ranges": another_list_ranges,
2650                            "required_state": [
2651                                ["m.room.encryption", ""],
2652                                ["m.room.tombstone", ""]
2653                            ],
2654                        },
2655                    }
2656                }),
2657                assert_json_diff::Config::new(assert_json_diff::CompareMode::Inclusive),
2658            ) {
2659                dbg!(json_value);
2660                panic!("json differ: {err}");
2661            }
2662        }
2663
2664        assert_eq!(num_requests, 2);
2665
2666        Ok(())
2667    }
2668
2669    #[async_test]
2670    async fn test_timeout_zero_list() -> Result<()> {
2671        let (_server, sliding_sync) = new_sliding_sync(vec![]).await?;
2672
2673        let (request, _, _) =
2674            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2675
2676        // Zero list means sliding sync is fully loaded, so there is a timeout to wait
2677        // on new update to pop.
2678        assert!(request.timeout.is_some());
2679
2680        Ok(())
2681    }
2682
2683    #[async_test]
2684    async fn test_timeout_one_list() -> Result<()> {
2685        let (_server, sliding_sync) = new_sliding_sync(vec![
2686            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10))
2687        ])
2688        .await?;
2689
2690        let (request, _, _) =
2691            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2692
2693        // The list does not require a timeout.
2694        assert!(request.timeout.is_none());
2695
2696        // Simulate a response.
2697        {
2698            let server_response = assign!(http::Response::new("0".to_owned()), {
2699                lists: BTreeMap::from([(
2700                    "foo".to_owned(),
2701                    assign!(http::response::List::default(), {
2702                        count: uint!(7),
2703                    })
2704                 )])
2705            });
2706
2707            let _summary = {
2708                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2709                sliding_sync
2710                    .handle_response(
2711                        server_response.clone(),
2712                        &mut pos_guard,
2713                        RequestedRequiredStates::default(),
2714                    )
2715                    .await?
2716            };
2717        }
2718
2719        let (request, _, _) =
2720            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2721
2722        // The list is now fully loaded, so it requires a timeout.
2723        assert!(request.timeout.is_some());
2724
2725        Ok(())
2726    }
2727
2728    #[async_test]
2729    async fn test_timeout_three_lists() -> Result<()> {
2730        let (_server, sliding_sync) = new_sliding_sync(vec![
2731            SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::new_growing(10)),
2732            SlidingSyncList::builder("bar").sync_mode(SlidingSyncMode::new_paging(10)),
2733            SlidingSyncList::builder("baz")
2734                .sync_mode(SlidingSyncMode::new_selective().add_range(0..=10)),
2735        ])
2736        .await?;
2737
2738        let (request, _, _) =
2739            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2740
2741        // Two lists don't require a timeout.
2742        assert!(request.timeout.is_none());
2743
2744        // Simulate a response.
2745        {
2746            let server_response = assign!(http::Response::new("0".to_owned()), {
2747                lists: BTreeMap::from([(
2748                    "foo".to_owned(),
2749                    assign!(http::response::List::default(), {
2750                        count: uint!(7),
2751                    })
2752                 )])
2753            });
2754
2755            let _summary = {
2756                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2757                sliding_sync
2758                    .handle_response(
2759                        server_response.clone(),
2760                        &mut pos_guard,
2761                        RequestedRequiredStates::default(),
2762                    )
2763                    .await?
2764            };
2765        }
2766
2767        let (request, _, _) =
2768            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2769
2770        // One don't require a timeout.
2771        assert!(request.timeout.is_none());
2772
2773        // Simulate a response.
2774        {
2775            let server_response = assign!(http::Response::new("1".to_owned()), {
2776                lists: BTreeMap::from([(
2777                    "bar".to_owned(),
2778                    assign!(http::response::List::default(), {
2779                        count: uint!(7),
2780                    })
2781                 )])
2782            });
2783
2784            let _summary = {
2785                let mut pos_guard = sliding_sync.inner.position.clone().lock_owned().await;
2786                sliding_sync
2787                    .handle_response(
2788                        server_response.clone(),
2789                        &mut pos_guard,
2790                        RequestedRequiredStates::default(),
2791                    )
2792                    .await?
2793            };
2794        }
2795
2796        let (request, _, _) =
2797            sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
2798
2799        // All lists require a timeout.
2800        assert!(request.timeout.is_some());
2801
2802        Ok(())
2803    }
2804
2805    #[async_test]
2806    async fn test_sync_beat_is_notified_on_sync_response() -> Result<()> {
2807        let server = MockServer::start().await;
2808        let client = logged_in_client(Some(server.uri())).await;
2809
2810        let _mock_guard = Mock::given(SlidingSyncMatcher)
2811            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
2812                "pos": "0",
2813                "lists": {},
2814                "rooms": {}
2815            })))
2816            .mount_as_scoped(&server)
2817            .await;
2818
2819        let sliding_sync = client
2820            .sliding_sync("test")?
2821            .with_to_device_extension(
2822                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2823            )
2824            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2825            .build()
2826            .await?;
2827
2828        let sliding_sync = Arc::new(sliding_sync);
2829
2830        // Create the listener and perform a sync request
2831        let sync_beat_listener = client.inner.sync_beat.listen();
2832        sliding_sync.sync_once().await?;
2833
2834        // The sync beat listener should be notified shortly after
2835        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_some());
2836        Ok(())
2837    }
2838
2839    #[async_test]
2840    async fn test_sync_beat_is_not_notified_on_sync_failure() -> Result<()> {
2841        let server = MockServer::start().await;
2842        let client = logged_in_client(Some(server.uri())).await;
2843
2844        let _mock_guard = Mock::given(SlidingSyncMatcher)
2845            .respond_with(ResponseTemplate::new(404))
2846            .mount_as_scoped(&server)
2847            .await;
2848
2849        let sliding_sync = client
2850            .sliding_sync("test")?
2851            .with_to_device_extension(
2852                assign!(http::request::ToDevice::default(), { enabled: Some(true)}),
2853            )
2854            .with_e2ee_extension(assign!(http::request::E2EE::default(), { enabled: Some(true)}))
2855            .build()
2856            .await?;
2857
2858        let sliding_sync = Arc::new(sliding_sync);
2859
2860        // Create the listener and perform a sync request
2861        let sync_beat_listener = client.inner.sync_beat.listen();
2862        let sync_result = sliding_sync.sync_once().await;
2863        assert!(sync_result.is_err());
2864
2865        // The sync beat listener won't be notified in this case
2866        assert!(sync_beat_listener.wait_timeout(Duration::from_secs(1)).is_none());
2867
2868        Ok(())
2869    }
2870}