matrix_sdk/sliding_sync/
builder.rs

1use std::{
2    collections::BTreeMap,
3    fmt::Debug,
4    sync::{Arc, RwLock as StdRwLock},
5    time::Duration,
6};
7
8use cfg_if::cfg_if;
9use matrix_sdk_common::timer;
10use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId};
11use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
12
13use super::{
14    cache::format_storage_key_prefix, sticky_parameters::SlidingSyncStickyManager, Error,
15    SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers, Version,
16};
17use crate::{sliding_sync::SlidingSyncStickyParameters, Client, Result};
18
19/// Configuration for a Sliding Sync instance.
20///
21/// Get a new builder with methods like [`crate::Client::sliding_sync`], or
22/// [`crate::SlidingSync::builder`].
23#[derive(Debug, Clone)]
24pub struct SlidingSyncBuilder {
25    id: String,
26    storage_key: String,
27    version: Option<Version>,
28    client: Client,
29    lists: Vec<SlidingSyncListBuilder>,
30    extensions: Option<http::request::Extensions>,
31    subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
32    poll_timeout: Duration,
33    network_timeout: Duration,
34    #[cfg(feature = "e2e-encryption")]
35    share_pos: bool,
36}
37
38impl SlidingSyncBuilder {
39    pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
40        if id.len() > 16 {
41            Err(Error::InvalidSlidingSyncIdentifier)
42        } else {
43            let storage_key =
44                format_storage_key_prefix(&id, client.user_id().ok_or(Error::UnauthenticatedUser)?);
45
46            Ok(Self {
47                id,
48                storage_key,
49                version: None,
50                client,
51                lists: Vec::new(),
52                extensions: None,
53                subscriptions: BTreeMap::new(),
54                poll_timeout: Duration::from_secs(30),
55                network_timeout: Duration::from_secs(30),
56                #[cfg(feature = "e2e-encryption")]
57                share_pos: false,
58            })
59        }
60    }
61
62    /// Set a specific version that will override the one from the [`Client`].
63    pub fn version(mut self, version: Version) -> Self {
64        self.version = Some(version);
65        self
66    }
67
68    /// Add the given list to the lists.
69    ///
70    /// Replace any list with the same name.
71    pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self {
72        self.lists.push(list_builder);
73        self
74    }
75
76    /// Enroll the list in caching, reloads it from the cache if possible, and
77    /// adds it to the list of lists.
78    ///
79    /// This will raise an error if there was a I/O error reading from the
80    /// cache.
81    ///
82    /// Replace any list with the same name.
83    pub async fn add_cached_list(self, mut list: SlidingSyncListBuilder) -> Result<Self> {
84        let _timer = timer!(format!("restoring (loading+processing) list {}", list.name));
85
86        list.set_cached_and_reload(&self.client, &self.storage_key).await?;
87
88        Ok(self.add_list(list))
89    }
90
91    /// Activate e2ee, to-device-message, account data, typing and receipt
92    /// extensions if not yet configured.
93    ///
94    /// Will leave any extension configuration found untouched, so the order
95    /// does not matter.
96    pub fn with_all_extensions(mut self) -> Self {
97        {
98            let cfg = self.extensions.get_or_insert_with(Default::default);
99            if cfg.to_device.enabled.is_none() {
100                cfg.to_device.enabled = Some(true);
101            }
102
103            if cfg.e2ee.enabled.is_none() {
104                cfg.e2ee.enabled = Some(true);
105            }
106
107            if cfg.account_data.enabled.is_none() {
108                cfg.account_data.enabled = Some(true);
109            }
110
111            if cfg.receipts.enabled.is_none() {
112                cfg.receipts.enabled = Some(true);
113            }
114
115            if cfg.typing.enabled.is_none() {
116                cfg.typing.enabled = Some(true);
117            }
118        }
119        self
120    }
121
122    /// Set the E2EE extension configuration.
123    pub fn with_e2ee_extension(mut self, e2ee: http::request::E2EE) -> Self {
124        self.extensions.get_or_insert_with(Default::default).e2ee = e2ee;
125        self
126    }
127
128    /// Unset the E2EE extension configuration.
129    pub fn without_e2ee_extension(mut self) -> Self {
130        self.extensions.get_or_insert_with(Default::default).e2ee = http::request::E2EE::default();
131        self
132    }
133
134    /// Set the ToDevice extension configuration.
135    pub fn with_to_device_extension(mut self, to_device: http::request::ToDevice) -> Self {
136        self.extensions.get_or_insert_with(Default::default).to_device = to_device;
137        self
138    }
139
140    /// Unset the ToDevice extension configuration.
141    pub fn without_to_device_extension(mut self) -> Self {
142        self.extensions.get_or_insert_with(Default::default).to_device =
143            http::request::ToDevice::default();
144        self
145    }
146
147    /// Set the account data extension configuration.
148    pub fn with_account_data_extension(mut self, account_data: http::request::AccountData) -> Self {
149        self.extensions.get_or_insert_with(Default::default).account_data = account_data;
150        self
151    }
152
153    /// Unset the account data extension configuration.
154    pub fn without_account_data_extension(mut self) -> Self {
155        self.extensions.get_or_insert_with(Default::default).account_data =
156            http::request::AccountData::default();
157        self
158    }
159
160    /// Set the Typing extension configuration.
161    pub fn with_typing_extension(mut self, typing: http::request::Typing) -> Self {
162        self.extensions.get_or_insert_with(Default::default).typing = typing;
163        self
164    }
165
166    /// Unset the Typing extension configuration.
167    pub fn without_typing_extension(mut self) -> Self {
168        self.extensions.get_or_insert_with(Default::default).typing =
169            http::request::Typing::default();
170        self
171    }
172
173    /// Set the Receipt extension configuration.
174    pub fn with_receipt_extension(mut self, receipt: http::request::Receipts) -> Self {
175        self.extensions.get_or_insert_with(Default::default).receipts = receipt;
176        self
177    }
178
179    /// Unset the Receipt extension configuration.
180    pub fn without_receipt_extension(mut self) -> Self {
181        self.extensions.get_or_insert_with(Default::default).receipts =
182            http::request::Receipts::default();
183        self
184    }
185
186    /// Set the Threads subscriptions extension configuration.
187    pub fn with_thread_subscriptions_extension(
188        mut self,
189        thread_subscriptions: http::request::ThreadSubscriptions,
190    ) -> Self {
191        self.extensions.get_or_insert_with(Default::default).thread_subscriptions =
192            thread_subscriptions;
193        self
194    }
195
196    /// Unset the Threads subscriptions extension configuration.
197    pub fn without_thread_subscriptions_extension(mut self) -> Self {
198        self.extensions.get_or_insert_with(Default::default).thread_subscriptions =
199            Default::default();
200        self
201    }
202
203    /// Sets a custom timeout duration for the sliding sync polling endpoint.
204    ///
205    /// This is the maximum time to wait before the sliding sync server returns
206    /// the long-polling request. If no events (or other data) become
207    /// available before this time elapses, the server will a return a
208    /// response with empty fields.
209    ///
210    /// There's an additional network timeout on top of that that can be
211    /// configured with [`Self::network_timeout`].
212    pub fn poll_timeout(mut self, timeout: Duration) -> Self {
213        self.poll_timeout = timeout;
214        self
215    }
216
217    /// Sets a custom network timeout for the sliding sync polling.
218    ///
219    /// This is not the polling timeout that can be configured with
220    /// [`Self::poll_timeout`], but an additional timeout that will be
221    /// added to the former.
222    pub fn network_timeout(mut self, timeout: Duration) -> Self {
223        self.network_timeout = timeout;
224        self
225    }
226
227    /// Should the sliding sync instance share its sync position through
228    /// storage?
229    ///
230    /// In general, sliding sync instances will cache the sync position (`pos`
231    /// field in the request) in internal memory. It can be useful, in
232    /// multi-process scenarios, to save it into some shared storage so that one
233    /// sliding sync instance running across two different processes can
234    /// continue with the same sync position it had before being stopped.
235    #[cfg(feature = "e2e-encryption")]
236    pub fn share_pos(mut self) -> Self {
237        self.share_pos = true;
238        self
239    }
240
241    /// Build the Sliding Sync.
242    #[allow(clippy::unused_async)] // Async is only used if the e2e-encryption feature is enabled.
243    pub async fn build(self) -> Result<SlidingSync> {
244        let client = self.client;
245
246        let version = self.version.unwrap_or_else(|| client.sliding_sync_version());
247
248        if matches!(version, Version::None) {
249            return Err(crate::error::Error::SlidingSync(Box::new(Error::VersionIsMissing)));
250        }
251
252        let (internal_channel_sender, _internal_channel_receiver) = channel(8);
253
254        let mut lists = BTreeMap::new();
255
256        for list_builder in self.lists {
257            let list = list_builder.build(internal_channel_sender.clone());
258
259            lists.insert(list.name().to_owned(), list);
260        }
261
262        let (share_pos, pos) = {
263            cfg_if! {
264                if #[cfg(feature = "e2e-encryption")] {
265                    if self.share_pos {
266                        // If the sliding sync instance is configured to share its current sync
267                        // position, we will restore it from the cache.
268                        (true, super::cache::restore_sliding_sync_state(&client, &self.storage_key).await?.and_then(|fields| fields.pos))
269                    } else {
270                        (false, None)
271                    }
272                } else {
273                    (false, None)
274                }
275            }
276        };
277
278        let lists = AsyncRwLock::new(lists);
279
280        Ok(SlidingSync::new(SlidingSyncInner {
281            id: self.id,
282
283            client,
284            storage_key: self.storage_key,
285            share_pos,
286
287            lists,
288
289            position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
290
291            sticky: StdRwLock::new(SlidingSyncStickyManager::new(
292                SlidingSyncStickyParameters::new(
293                    self.subscriptions,
294                    self.extensions.unwrap_or_default(),
295                ),
296            )),
297
298            internal_channel: internal_channel_sender,
299
300            poll_timeout: self.poll_timeout,
301            network_timeout: self.network_timeout,
302        }))
303    }
304}