matrix_sdk/sliding_sync/
builder.rs1use std::{
2 collections::BTreeMap,
3 fmt::Debug,
4 sync::{Arc, RwLock as StdRwLock},
5 time::Duration,
6};
7
8use matrix_sdk_common::timer;
9use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId};
10use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
11
12use super::{
13 cache::{format_storage_key_prefix, restore_sliding_sync_state},
14 sticky_parameters::SlidingSyncStickyManager,
15 Error, SlidingSync, SlidingSyncInner, SlidingSyncListBuilder, SlidingSyncPositionMarkers,
16 Version,
17};
18use crate::{sliding_sync::SlidingSyncStickyParameters, Client, Result};
19
20#[derive(Debug, Clone)]
25pub struct SlidingSyncBuilder {
26 id: String,
27 storage_key: String,
28 version: Option<Version>,
29 client: Client,
30 lists: Vec<SlidingSyncListBuilder>,
31 extensions: Option<http::request::Extensions>,
32 subscriptions: BTreeMap<OwnedRoomId, http::request::RoomSubscription>,
33 poll_timeout: Duration,
34 network_timeout: Duration,
35 #[cfg(feature = "e2e-encryption")]
36 share_pos: bool,
37}
38
39impl SlidingSyncBuilder {
40 pub(super) fn new(id: String, client: Client) -> Result<Self, Error> {
41 if id.len() > 16 {
42 Err(Error::InvalidSlidingSyncIdentifier)
43 } else {
44 let storage_key =
45 format_storage_key_prefix(&id, client.user_id().ok_or(Error::UnauthenticatedUser)?);
46
47 Ok(Self {
48 id,
49 storage_key,
50 version: None,
51 client,
52 lists: Vec::new(),
53 extensions: None,
54 subscriptions: BTreeMap::new(),
55 poll_timeout: Duration::from_secs(30),
56 network_timeout: Duration::from_secs(30),
57 #[cfg(feature = "e2e-encryption")]
58 share_pos: false,
59 })
60 }
61 }
62
63 pub fn version(mut self, version: Version) -> Self {
65 self.version = Some(version);
66 self
67 }
68
69 pub fn add_list(mut self, list_builder: SlidingSyncListBuilder) -> Self {
73 self.lists.push(list_builder);
74 self
75 }
76
77 pub async fn add_cached_list(self, mut list: SlidingSyncListBuilder) -> Result<Self> {
85 let _timer = timer!(format!("restoring (loading+processing) list {}", list.name));
86
87 list.set_cached_and_reload(&self.client, &self.storage_key).await?;
88
89 Ok(self.add_list(list))
90 }
91
92 pub fn with_all_extensions(mut self) -> Self {
98 {
99 let cfg = self.extensions.get_or_insert_with(Default::default);
100 if cfg.to_device.enabled.is_none() {
101 cfg.to_device.enabled = Some(true);
102 }
103
104 if cfg.e2ee.enabled.is_none() {
105 cfg.e2ee.enabled = Some(true);
106 }
107
108 if cfg.account_data.enabled.is_none() {
109 cfg.account_data.enabled = Some(true);
110 }
111
112 if cfg.receipts.enabled.is_none() {
113 cfg.receipts.enabled = Some(true);
114 }
115
116 if cfg.typing.enabled.is_none() {
117 cfg.typing.enabled = Some(true);
118 }
119 }
120 self
121 }
122
123 pub fn with_e2ee_extension(mut self, e2ee: http::request::E2EE) -> Self {
125 self.extensions.get_or_insert_with(Default::default).e2ee = e2ee;
126 self
127 }
128
129 pub fn without_e2ee_extension(mut self) -> Self {
131 self.extensions.get_or_insert_with(Default::default).e2ee = http::request::E2EE::default();
132 self
133 }
134
135 pub fn with_to_device_extension(mut self, to_device: http::request::ToDevice) -> Self {
137 self.extensions.get_or_insert_with(Default::default).to_device = to_device;
138 self
139 }
140
141 pub fn without_to_device_extension(mut self) -> Self {
143 self.extensions.get_or_insert_with(Default::default).to_device =
144 http::request::ToDevice::default();
145 self
146 }
147
148 pub fn with_account_data_extension(mut self, account_data: http::request::AccountData) -> Self {
150 self.extensions.get_or_insert_with(Default::default).account_data = account_data;
151 self
152 }
153
154 pub fn without_account_data_extension(mut self) -> Self {
156 self.extensions.get_or_insert_with(Default::default).account_data =
157 http::request::AccountData::default();
158 self
159 }
160
161 pub fn with_typing_extension(mut self, typing: http::request::Typing) -> Self {
163 self.extensions.get_or_insert_with(Default::default).typing = typing;
164 self
165 }
166
167 pub fn without_typing_extension(mut self) -> Self {
169 self.extensions.get_or_insert_with(Default::default).typing =
170 http::request::Typing::default();
171 self
172 }
173
174 pub fn with_receipt_extension(mut self, receipt: http::request::Receipts) -> Self {
176 self.extensions.get_or_insert_with(Default::default).receipts = receipt;
177 self
178 }
179
180 pub fn without_receipt_extension(mut self) -> Self {
182 self.extensions.get_or_insert_with(Default::default).receipts =
183 http::request::Receipts::default();
184 self
185 }
186
187 pub fn poll_timeout(mut self, timeout: Duration) -> Self {
197 self.poll_timeout = timeout;
198 self
199 }
200
201 pub fn network_timeout(mut self, timeout: Duration) -> Self {
207 self.network_timeout = timeout;
208 self
209 }
210
211 #[cfg(feature = "e2e-encryption")]
220 pub fn share_pos(mut self) -> Self {
221 self.share_pos = true;
222 self
223 }
224
225 pub async fn build(self) -> Result<SlidingSync> {
230 let client = self.client;
231
232 let version = self.version.unwrap_or_else(|| client.sliding_sync_version());
233
234 if matches!(version, Version::None) {
235 return Err(crate::error::Error::SlidingSync(Error::VersionIsMissing));
236 }
237
238 let (internal_channel_sender, _internal_channel_receiver) = channel(8);
239
240 let mut lists = BTreeMap::new();
241
242 for list_builder in self.lists {
243 let list = list_builder.build(internal_channel_sender.clone());
244
245 lists.insert(list.name().to_owned(), list);
246 }
247
248 let restored_fields =
250 restore_sliding_sync_state(&client, &self.storage_key, &lists).await?;
251
252 let (pos, rooms) = if let Some(fields) = restored_fields {
253 #[cfg(feature = "e2e-encryption")]
254 let pos = if self.share_pos { fields.pos } else { None };
255 #[cfg(not(feature = "e2e-encryption"))]
256 let pos = None;
257
258 (pos, fields.rooms)
259 } else {
260 (None, BTreeMap::new())
261 };
262
263 #[cfg(feature = "e2e-encryption")]
264 let share_pos = self.share_pos;
265 #[cfg(not(feature = "e2e-encryption"))]
266 let share_pos = false;
267
268 let rooms = AsyncRwLock::new(rooms);
269 let lists = AsyncRwLock::new(lists);
270
271 Ok(SlidingSync::new(SlidingSyncInner {
272 id: self.id,
273
274 client,
275 storage_key: self.storage_key,
276 share_pos,
277
278 lists,
279 rooms,
280
281 position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
282
283 sticky: StdRwLock::new(SlidingSyncStickyManager::new(
284 SlidingSyncStickyParameters::new(
285 self.subscriptions,
286 self.extensions.unwrap_or_default(),
287 ),
288 )),
289
290 internal_channel: internal_channel_sender,
291
292 poll_timeout: self.poll_timeout,
293 network_timeout: self.network_timeout,
294 }))
295 }
296}