matrix_sdk/sliding_sync/
builder.rs1use std::{
2 collections::BTreeMap,
3 fmt::Debug,
4 sync::{Arc, RwLock as StdRwLock},
5 time::Duration,
6};
7
8use cfg_if::cfg_if;
9use matrix_sdk_common::timer;
10use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId};
11use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
12
13use super::{
14 cache::format_storage_key_prefix, sticky_parameters::SlidingSyncStickyManager, Error,
15 SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers, Version,
16};
17use crate::{sliding_sync::SlidingSyncStickyParameters, Client, Result};
18
19#[derive(Debug, Clone)]
24pub struct SlidingSyncBuilder {
25 id: String,
26 storage_key: String,
27 version: Option<Version>,
28 client: Client,
29 lists: Vec<SlidingSyncListBuilder>,
30 extensions: Option<http::request::Extensions>,
31 subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
32 poll_timeout: Duration,
33 network_timeout: Duration,
34 #[cfg(feature = "e2e-encryption")]
35 share_pos: bool,
36}
37
38impl SlidingSyncBuilder {
39 pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
40 if id.len() > 16 {
41 Err(Error::InvalidSlidingSyncIdentifier)
42 } else {
43 let storage_key =
44 format_storage_key_prefix(&id, client.user_id().ok_or(Error::UnauthenticatedUser)?);
45
46 Ok(Self {
47 id,
48 storage_key,
49 version: None,
50 client,
51 lists: Vec::new(),
52 extensions: None,
53 subscriptions: BTreeMap::new(),
54 poll_timeout: Duration::from_secs(30),
55 network_timeout: Duration::from_secs(30),
56 #[cfg(feature = "e2e-encryption")]
57 share_pos: false,
58 })
59 }
60 }
61
62 pub fn version(mut self, version: Version) -> Self {
64 self.version = Some(version);
65 self
66 }
67
68 pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self {
72 self.lists.push(list_builder);
73 self
74 }
75
76 pub async fn add_cached_list(self, mut list: SlidingSyncListBuilder) -> Result<Self> {
84 let _timer = timer!(format!("restoring (loading+processing) list {}", list.name));
85
86 list.set_cached_and_reload(&self.client, &self.storage_key).await?;
87
88 Ok(self.add_list(list))
89 }
90
91 pub fn with_all_extensions(mut self) -> Self {
97 {
98 let cfg = self.extensions.get_or_insert_with(Default::default);
99 if cfg.to_device.enabled.is_none() {
100 cfg.to_device.enabled = Some(true);
101 }
102
103 if cfg.e2ee.enabled.is_none() {
104 cfg.e2ee.enabled = Some(true);
105 }
106
107 if cfg.account_data.enabled.is_none() {
108 cfg.account_data.enabled = Some(true);
109 }
110
111 if cfg.receipts.enabled.is_none() {
112 cfg.receipts.enabled = Some(true);
113 }
114
115 if cfg.typing.enabled.is_none() {
116 cfg.typing.enabled = Some(true);
117 }
118 }
119 self
120 }
121
122 pub fn with_e2ee_extension(mut self, e2ee: http::request::E2EE) -> Self {
124 self.extensions.get_or_insert_with(Default::default).e2ee = e2ee;
125 self
126 }
127
128 pub fn without_e2ee_extension(mut self) -> Self {
130 self.extensions.get_or_insert_with(Default::default).e2ee = http::request::E2EE::default();
131 self
132 }
133
134 pub fn with_to_device_extension(mut self, to_device: http::request::ToDevice) -> Self {
136 self.extensions.get_or_insert_with(Default::default).to_device = to_device;
137 self
138 }
139
140 pub fn without_to_device_extension(mut self) -> Self {
142 self.extensions.get_or_insert_with(Default::default).to_device =
143 http::request::ToDevice::default();
144 self
145 }
146
147 pub fn with_account_data_extension(mut self, account_data: http::request::AccountData) -> Self {
149 self.extensions.get_or_insert_with(Default::default).account_data = account_data;
150 self
151 }
152
153 pub fn without_account_data_extension(mut self) -> Self {
155 self.extensions.get_or_insert_with(Default::default).account_data =
156 http::request::AccountData::default();
157 self
158 }
159
160 pub fn with_typing_extension(mut self, typing: http::request::Typing) -> Self {
162 self.extensions.get_or_insert_with(Default::default).typing = typing;
163 self
164 }
165
166 pub fn without_typing_extension(mut self) -> Self {
168 self.extensions.get_or_insert_with(Default::default).typing =
169 http::request::Typing::default();
170 self
171 }
172
173 pub fn with_receipt_extension(mut self, receipt: http::request::Receipts) -> Self {
175 self.extensions.get_or_insert_with(Default::default).receipts = receipt;
176 self
177 }
178
179 pub fn without_receipt_extension(mut self) -> Self {
181 self.extensions.get_or_insert_with(Default::default).receipts =
182 http::request::Receipts::default();
183 self
184 }
185
186 pub fn with_thread_subscriptions_extension(
188 mut self,
189 thread_subscriptions: http::request::ThreadSubscriptions,
190 ) -> Self {
191 self.extensions.get_or_insert_with(Default::default).thread_subscriptions =
192 thread_subscriptions;
193 self
194 }
195
196 pub fn without_thread_subscriptions_extension(mut self) -> Self {
198 self.extensions.get_or_insert_with(Default::default).thread_subscriptions =
199 Default::default();
200 self
201 }
202
203 pub fn poll_timeout(mut self, timeout: Duration) -> Self {
213 self.poll_timeout = timeout;
214 self
215 }
216
217 pub fn network_timeout(mut self, timeout: Duration) -> Self {
223 self.network_timeout = timeout;
224 self
225 }
226
227 #[cfg(feature = "e2e-encryption")]
236 pub fn share_pos(mut self) -> Self {
237 self.share_pos = true;
238 self
239 }
240
241 #[allow(clippy::unused_async)] pub async fn build(self) -> Result<SlidingSync> {
244 let client = self.client;
245
246 let version = self.version.unwrap_or_else(|| client.sliding_sync_version());
247
248 if matches!(version, Version::None) {
249 return Err(crate::error::Error::SlidingSync(Box::new(Error::VersionIsMissing)));
250 }
251
252 let (internal_channel_sender, _internal_channel_receiver) = channel(8);
253
254 let mut lists = BTreeMap::new();
255
256 for list_builder in self.lists {
257 let list = list_builder.build(internal_channel_sender.clone());
258
259 lists.insert(list.name().to_owned(), list);
260 }
261
262 let (share_pos, pos) = {
263 cfg_if! {
264 if #[cfg(feature = "e2e-encryption")] {
265 if self.share_pos {
266 (true, super::cache::restore_sliding_sync_state(&client, &self.storage_key).await?.and_then(|fields| fields.pos))
269 } else {
270 (false, None)
271 }
272 } else {
273 (false, None)
274 }
275 }
276 };
277
278 let lists = AsyncRwLock::new(lists);
279
280 Ok(SlidingSync::new(SlidingSyncInner {
281 id: self.id,
282
283 client,
284 storage_key: self.storage_key,
285 share_pos,
286
287 lists,
288
289 position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
290
291 sticky: StdRwLock::new(SlidingSyncStickyManager::new(
292 SlidingSyncStickyParameters::new(
293 self.subscriptions,
294 self.extensions.unwrap_or_default(),
295 ),
296 )),
297
298 internal_channel: internal_channel_sender,
299
300 poll_timeout: self.poll_timeout,
301 network_timeout: self.network_timeout,
302 }))
303 }
304}