1use std::{
18 collections::{BTreeMap, btree_map},
19 fmt,
20 time::Duration,
21};
22
23pub use matrix_sdk_base::sync::*;
24use matrix_sdk_base::{
25 debug::{
26 DebugInvitedRoom, DebugKnockedRoom, DebugListOfProcessedToDeviceEvents,
27 DebugListOfRawEventsNoId,
28 },
29 sleep::sleep,
30 sync::SyncResponse as BaseSyncResponse,
31 timer,
32};
33use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
34use ruma::{
35 OwnedRoomId, RoomId,
36 api::client::sync::sync_events::{
37 self,
38 v3::{InvitedRoom, KnockedRoom},
39 },
40 events::{AnyGlobalAccountDataEvent, presence::PresenceEvent},
41 serde::Raw,
42 time::Instant,
43};
44use tracing::{debug, error, instrument, warn};
45
46use crate::{Client, Result, Room, event_handler::HandlerKind};
47
48#[derive(Clone, Default)]
50pub struct SyncResponse {
51 pub next_batch: String,
54 pub rooms: RoomUpdates,
56 pub presence: Vec<Raw<PresenceEvent>>,
58 pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
60 pub to_device: Vec<ProcessedToDeviceEvent>,
62 pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
64}
65
66impl SyncResponse {
67 pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
68 let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
69 base_response;
70
71 Self { next_batch, rooms, presence, account_data, to_device, notifications }
72 }
73}
74
75#[cfg(not(tarpaulin_include))]
76impl fmt::Debug for SyncResponse {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("SyncResponse")
79 .field("next_batch", &self.next_batch)
80 .field("rooms", &self.rooms)
81 .field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
82 .field("to_device", &DebugListOfProcessedToDeviceEvents(&self.to_device))
83 .field("notifications", &self.notifications)
84 .finish_non_exhaustive()
85 }
86}
87
88#[derive(Clone)]
90pub enum RoomUpdate {
91 Left {
93 room: Room,
95 updates: LeftRoomUpdate,
97 },
98 Joined {
100 room: Room,
102 updates: JoinedRoomUpdate,
104 },
105 Invited {
107 room: Room,
109 updates: InvitedRoom,
111 },
112 Knocked {
114 room: Room,
116 updates: KnockedRoom,
118 },
119}
120
121#[cfg(not(tarpaulin_include))]
122impl fmt::Debug for RoomUpdate {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 match self {
125 Self::Left { room, updates } => {
126 f.debug_struct("Left").field("room", room).field("updates", updates).finish()
127 }
128 Self::Joined { room, updates } => {
129 f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
130 }
131 Self::Invited { room, updates } => f
132 .debug_struct("Invited")
133 .field("room", room)
134 .field("updates", &DebugInvitedRoom(updates))
135 .finish(),
136 Self::Knocked { room, updates } => f
137 .debug_struct("Knocked")
138 .field("room", room)
139 .field("updates", &DebugKnockedRoom(updates))
140 .finish(),
141 }
142 }
143}
144
145impl Client {
148 pub(crate) async fn process_sync(
151 &self,
152 response: sync_events::v3::Response,
153 ) -> Result<BaseSyncResponse> {
154 subscribe_to_room_latest_events(
155 self,
156 response.rooms.join.keys().chain(response.rooms.leave.keys()),
157 )
158 .await;
159
160 let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
161
162 #[cfg(feature = "e2e-encryption")]
164 self.encryption().backups().maybe_trigger_backup();
165
166 self.call_sync_response_handlers(&response).await?;
167
168 Ok(response)
169 }
170
171 #[tracing::instrument(skip(self, response))]
178 pub(crate) async fn call_sync_response_handlers(
179 &self,
180 response: &BaseSyncResponse,
181 ) -> Result<()> {
182 let _timer = timer!(tracing::Level::TRACE, "_method");
183
184 let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
185
186 let now = Instant::now();
187 self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
188 self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
189 self.handle_sync_to_device_events(to_device).await?;
190
191 let _ = self.inner.room_updates_sender.send(rooms.clone());
193
194 for (room_id, room_info) in &rooms.joined {
195 let Some(room) = self.get_room(room_id) else {
196 error!(?room_id, "Can't call event handler, room not found");
197 continue;
198 };
199
200 self.send_room_update(room_id, || RoomUpdate::Joined {
201 room: room.clone(),
202 updates: room_info.clone(),
203 });
204
205 let JoinedRoomUpdate {
206 unread_notifications: _,
207 timeline,
208 state,
209 account_data,
210 ephemeral,
211 ambiguity_changes: _,
212 avatar_changes: _,
213 } = room_info;
214
215 let room = Some(&room);
216 self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
217 self.handle_sync_state_events(room, state).await?;
218 self.handle_sync_timeline_events(room, &timeline.events).await?;
219 self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
222 }
223
224 for (room_id, room_info) in &rooms.left {
225 let Some(room) = self.get_room(room_id) else {
226 error!(?room_id, "Can't call event handler, room not found");
227 continue;
228 };
229
230 self.send_room_update(room_id, || RoomUpdate::Left {
231 room: room.clone(),
232 updates: room_info.clone(),
233 });
234
235 let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
236
237 let room = Some(&room);
238 self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
239 self.handle_sync_state_events(room, state).await?;
240 self.handle_sync_timeline_events(room, &timeline.events).await?;
241 }
242
243 for (room_id, room_info) in &rooms.invited {
244 let Some(room) = self.get_room(room_id) else {
245 error!(?room_id, "Can't call event handler, room not found");
246 continue;
247 };
248
249 self.send_room_update(room_id, || RoomUpdate::Invited {
250 room: room.clone(),
251 updates: room_info.clone(),
252 });
253
254 let invite_state = &room_info.invite_state.events;
255 self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
256 }
257
258 for (room_id, room_info) in &rooms.knocked {
259 let Some(room) = self.get_room(room_id) else {
260 error!(?room_id, "Can't call event handler, room not found");
261 continue;
262 };
263
264 self.send_room_update(room_id, || RoomUpdate::Knocked {
265 room: room.clone(),
266 updates: room_info.clone(),
267 });
268
269 let knock_state = &room_info.knock_state.events;
270 self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
271 }
272
273 debug!("Ran event handlers in {:?}", now.elapsed());
274
275 let now = Instant::now();
276
277 let mut futures = Vec::new();
279 for handler in &*self.notification_handlers().await {
280 for (room_id, room_notifications) in notifications {
281 let Some(room) = self.get_room(room_id) else {
282 warn!(?room_id, "Can't call notification handler, room not found");
283 continue;
284 };
285
286 futures.extend(room_notifications.iter().map(|notification| {
287 (handler)(notification.clone(), room.clone(), self.clone())
288 }));
289 }
290 }
291
292 for fut in futures {
295 fut.await;
296 }
297
298 debug!("Ran notification handlers in {:?}", now.elapsed());
299
300 Ok(())
301 }
302
303 fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
304 if let btree_map::Entry::Occupied(entry) =
305 self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
306 {
307 let tx = entry.get();
308 if tx.receiver_count() == 0 {
309 entry.remove();
310 } else {
311 _ = tx.send(make_msg());
312 }
313 }
314 }
315
316 async fn sleep() {
317 sleep(Duration::from_secs(1)).await;
318 }
319
320 pub(crate) async fn sync_loop_helper(
321 &self,
322 sync_settings: &mut crate::config::SyncSettings,
323 ) -> Result<SyncResponse> {
324 let response = self.sync_once(sync_settings.clone()).await;
325
326 match response {
327 Ok(r) => {
328 sync_settings.token = r.next_batch.clone().into();
329 Ok(r)
330 }
331 Err(e) => {
332 error!("Received an invalid response: {e}");
333 Err(e)
334 }
335 }
336 }
337
338 pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
339 let now = Instant::now();
340
341 if let Some(t) = last_sync_time
345 && now - *t <= Duration::from_secs(1)
346 {
347 Self::sleep().await;
348 }
349
350 *last_sync_time = Some(now);
351 }
352}
353
354#[instrument(skip_all)]
359pub(crate) async fn subscribe_to_room_latest_events<'a, R>(client: &'a Client, room_ids: R)
360where
361 R: Iterator<Item = &'a OwnedRoomId>,
362{
363 if !client.event_cache().has_subscribed() {
364 return;
365 }
366
367 let latest_events = client.latest_events().await;
368
369 for room_id in room_ids {
370 if let Err(error) = latest_events.listen_to_room(room_id).await {
371 error!(?error, ?room_id, "Failed to listen to the latest event for this room");
372 }
373 }
374}