matrix_sdk/sliding_sync/
builder.rs

1use std::{
2    collections::BTreeMap,
3    fmt::Debug,
4    sync::{Arc, RwLock as StdRwLock},
5    time::Duration,
6};
7
8use matrix_sdk_common::timer;
9use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId};
10use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
11
12use super::{
13    cache::{format_storage_key_prefix, restore_sliding_sync_state},
14    sticky_parameters::SlidingSyncStickyManager,
15    Error, SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers,
16    Version,
17};
18use crate::{sliding_sync::SlidingSyncStickyParameters, Client, Result};
19
20/// Configuration for a Sliding Sync instance.
21///
22/// Get a new builder with methods like [`crate::Client::sliding_sync`], or
23/// [`crate::SlidingSync::builder`].
24#[derive(Debug, Clone)]
25pub struct SlidingSyncBuilder {
26    id: String,
27    storage_key: String,
28    version: Option<Version>,
29    client: Client,
30    lists: Vec<SlidingSyncListBuilder>,
31    extensions: Option<http::request::Extensions>,
32    subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
33    poll_timeout: Duration,
34    network_timeout: Duration,
35    #[cfg(feature = "e2e-encryption")]
36    share_pos: bool,
37}
38
39impl SlidingSyncBuilder {
40    pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
41        if id.len() > 16 {
42            Err(Error::InvalidSlidingSyncIdentifier)
43        } else {
44            let storage_key =
45                format_storage_key_prefix(&id, client.user_id().ok_or(Error::UnauthenticatedUser)?);
46
47            Ok(Self {
48                id,
49                storage_key,
50                version: None,
51                client,
52                lists: Vec::new(),
53                extensions: None,
54                subscriptions: BTreeMap::new(),
55                poll_timeout: Duration::from_secs(30),
56                network_timeout: Duration::from_secs(30),
57                #[cfg(feature = "e2e-encryption")]
58                share_pos: false,
59            })
60        }
61    }
62
63    /// Set a specific version that will override the one from the [`Client`].
64    pub fn version(mut self, version: Version) -> Self {
65        self.version = Some(version);
66        self
67    }
68
69    /// Add the given list to the lists.
70    ///
71    /// Replace any list with the same name.
72    pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self {
73        self.lists.push(list_builder);
74        self
75    }
76
77    /// Enroll the list in caching, reloads it from the cache if possible, and
78    /// adds it to the list of lists.
79    ///
80    /// This will raise an error if there was a I/O error reading from the
81    /// cache.
82    ///
83    /// Replace any list with the same name.
84    pub async fn add_cached_list(self, mut list: SlidingSyncListBuilder) -> Result<Self> {
85        let _timer = timer!(format!("restoring (loading+processing) list {}", list.name));
86
87        list.set_cached_and_reload(&self.client, &self.storage_key).await?;
88
89        Ok(self.add_list(list))
90    }
91
92    /// Activate e2ee, to-device-message, account data, typing and receipt
93    /// extensions if not yet configured.
94    ///
95    /// Will leave any extension configuration found untouched, so the order
96    /// does not matter.
97    pub fn with_all_extensions(mut self) -> Self {
98        {
99            let cfg = self.extensions.get_or_insert_with(Default::default);
100            if cfg.to_device.enabled.is_none() {
101                cfg.to_device.enabled = Some(true);
102            }
103
104            if cfg.e2ee.enabled.is_none() {
105                cfg.e2ee.enabled = Some(true);
106            }
107
108            if cfg.account_data.enabled.is_none() {
109                cfg.account_data.enabled = Some(true);
110            }
111
112            if cfg.receipts.enabled.is_none() {
113                cfg.receipts.enabled = Some(true);
114            }
115
116            if cfg.typing.enabled.is_none() {
117                cfg.typing.enabled = Some(true);
118            }
119        }
120        self
121    }
122
123    /// Set the E2EE extension configuration.
124    pub fn with_e2ee_extension(mut self, e2ee: http::request::E2EE) -> Self {
125        self.extensions.get_or_insert_with(Default::default).e2ee = e2ee;
126        self
127    }
128
129    /// Unset the E2EE extension configuration.
130    pub fn without_e2ee_extension(mut self) -> Self {
131        self.extensions.get_or_insert_with(Default::default).e2ee = http::request::E2EE::default();
132        self
133    }
134
135    /// Set the ToDevice extension configuration.
136    pub fn with_to_device_extension(mut self, to_device: http::request::ToDevice) -> Self {
137        self.extensions.get_or_insert_with(Default::default).to_device = to_device;
138        self
139    }
140
141    /// Unset the ToDevice extension configuration.
142    pub fn without_to_device_extension(mut self) -> Self {
143        self.extensions.get_or_insert_with(Default::default).to_device =
144            http::request::ToDevice::default();
145        self
146    }
147
148    /// Set the account data extension configuration.
149    pub fn with_account_data_extension(mut self, account_data: http::request::AccountData) -> Self {
150        self.extensions.get_or_insert_with(Default::default).account_data = account_data;
151        self
152    }
153
154    /// Unset the account data extension configuration.
155    pub fn without_account_data_extension(mut self) -> Self {
156        self.extensions.get_or_insert_with(Default::default).account_data =
157            http::request::AccountData::default();
158        self
159    }
160
161    /// Set the Typing extension configuration.
162    pub fn with_typing_extension(mut self, typing: http::request::Typing) -> Self {
163        self.extensions.get_or_insert_with(Default::default).typing = typing;
164        self
165    }
166
167    /// Unset the Typing extension configuration.
168    pub fn without_typing_extension(mut self) -> Self {
169        self.extensions.get_or_insert_with(Default::default).typing =
170            http::request::Typing::default();
171        self
172    }
173
174    /// Set the Receipt extension configuration.
175    pub fn with_receipt_extension(mut self, receipt: http::request::Receipts) -> Self {
176        self.extensions.get_or_insert_with(Default::default).receipts = receipt;
177        self
178    }
179
180    /// Unset the Receipt extension configuration.
181    pub fn without_receipt_extension(mut self) -> Self {
182        self.extensions.get_or_insert_with(Default::default).receipts =
183            http::request::Receipts::default();
184        self
185    }
186
187    /// Sets a custom timeout duration for the sliding sync polling endpoint.
188    ///
189    /// This is the maximum time to wait before the sliding sync server returns
190    /// the long-polling request. If no events (or other data) become
191    /// available before this time elapses, the server will a return a
192    /// response with empty fields.
193    ///
194    /// There's an additional network timeout on top of that that can be
195    /// configured with [`Self::network_timeout`].
196    pub fn poll_timeout(mut self, timeout: Duration) -> Self {
197        self.poll_timeout = timeout;
198        self
199    }
200
201    /// Sets a custom network timeout for the sliding sync polling.
202    ///
203    /// This is not the polling timeout that can be configured with
204    /// [`Self::poll_timeout`], but an additional timeout that will be
205    /// added to the former.
206    pub fn network_timeout(mut self, timeout: Duration) -> Self {
207        self.network_timeout = timeout;
208        self
209    }
210
211    /// Should the sliding sync instance share its sync position through
212    /// storage?
213    ///
214    /// In general, sliding sync instances will cache the sync position (`pos`
215    /// field in the request) in internal memory. It can be useful, in
216    /// multi-process scenarios, to save it into some shared storage so that one
217    /// sliding sync instance running across two different processes can
218    /// continue with the same sync position it had before being stopped.
219    #[cfg(feature = "e2e-encryption")]
220    pub fn share_pos(mut self) -> Self {
221        self.share_pos = true;
222        self
223    }
224
225    /// Build the Sliding Sync.
226    ///
227    /// If `self.storage_key` is `Some(_)`, load the cached data from cold
228    /// storage.
229    pub async fn build(self) -> Result<SlidingSync> {
230        let client = self.client;
231
232        let version = self.version.unwrap_or_else(|| client.sliding_sync_version());
233
234        if matches!(version, Version::None) {
235            return Err(crate::error::Error::SlidingSync(Error::VersionIsMissing));
236        }
237
238        let (internal_channel_sender, _internal_channel_receiver) = channel(8);
239
240        let mut lists = BTreeMap::new();
241
242        for list_builder in self.lists {
243            let list = list_builder.build(internal_channel_sender.clone());
244
245            lists.insert(list.name().to_owned(), list);
246        }
247
248        // Reload existing state from the cache.
249        let restored_fields =
250            restore_sliding_sync_state(&client, &self.storage_key, &lists).await?;
251
252        let (pos, rooms) = if let Some(fields) = restored_fields {
253            #[cfg(feature = "e2e-encryption")]
254            let pos = if self.share_pos { fields.pos } else { None };
255            #[cfg(not(feature = "e2e-encryption"))]
256            let pos = None;
257
258            (pos, fields.rooms)
259        } else {
260            (None, BTreeMap::new())
261        };
262
263        #[cfg(feature = "e2e-encryption")]
264        let share_pos = self.share_pos;
265        #[cfg(not(feature = "e2e-encryption"))]
266        let share_pos = false;
267
268        let rooms = AsyncRwLock::new(rooms);
269        let lists = AsyncRwLock::new(lists);
270
271        Ok(SlidingSync::new(SlidingSyncInner {
272            id: self.id,
273
274            client,
275            storage_key: self.storage_key,
276            share_pos,
277
278            lists,
279            rooms,
280
281            position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
282
283            sticky: StdRwLock::new(SlidingSyncStickyManager::new(
284                SlidingSyncStickyParameters::new(
285                    self.subscriptions,
286                    self.extensions.unwrap_or_default(),
287                ),
288            )),
289
290            internal_channel: internal_channel_sender,
291
292            poll_timeout: self.poll_timeout,
293            network_timeout: self.network_timeout,
294        }))
295    }
296}