1use std::{
18 collections::{btree_map, BTreeMap},
19 fmt,
20 time::Duration,
21};
22
23pub use matrix_sdk_base::sync::*;
24use matrix_sdk_base::{
25 debug::{DebugInvitedRoom, DebugKnockedRoom, DebugListOfRawEventsNoId},
26 sleep::sleep,
27 sync::SyncResponse as BaseSyncResponse,
28};
29use ruma::{
30 api::client::sync::sync_events::{
31 self,
32 v3::{InvitedRoom, KnockedRoom},
33 },
34 events::{presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyToDeviceEvent},
35 serde::Raw,
36 time::Instant,
37 OwnedRoomId, RoomId,
38};
39use tracing::{debug, error, warn};
40
41use crate::{event_handler::HandlerKind, Client, Result, Room};
42
43#[derive(Clone, Default)]
45pub struct SyncResponse {
46 pub next_batch: String,
49 pub rooms: RoomUpdates,
51 pub presence: Vec<Raw<PresenceEvent>>,
53 pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
55 pub to_device: Vec<Raw<AnyToDeviceEvent>>,
57 pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
59}
60
61impl SyncResponse {
62 pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
63 let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
64 base_response;
65
66 Self { next_batch, rooms, presence, account_data, to_device, notifications }
67 }
68}
69
70#[cfg(not(tarpaulin_include))]
71impl fmt::Debug for SyncResponse {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 f.debug_struct("SyncResponse")
74 .field("next_batch", &self.next_batch)
75 .field("rooms", &self.rooms)
76 .field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
77 .field("to_device", &DebugListOfRawEventsNoId(&self.to_device))
78 .field("notifications", &self.notifications)
79 .finish_non_exhaustive()
80 }
81}
82
83#[derive(Clone)]
85pub enum RoomUpdate {
86 Left {
88 room: Room,
90 updates: LeftRoomUpdate,
92 },
93 Joined {
95 room: Room,
97 updates: JoinedRoomUpdate,
99 },
100 Invited {
102 room: Room,
104 updates: InvitedRoom,
106 },
107 Knocked {
109 room: Room,
111 updates: KnockedRoom,
113 },
114}
115
116#[cfg(not(tarpaulin_include))]
117impl fmt::Debug for RoomUpdate {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 match self {
120 Self::Left { room, updates } => {
121 f.debug_struct("Left").field("room", room).field("updates", updates).finish()
122 }
123 Self::Joined { room, updates } => {
124 f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
125 }
126 Self::Invited { room, updates } => f
127 .debug_struct("Invited")
128 .field("room", room)
129 .field("updates", &DebugInvitedRoom(updates))
130 .finish(),
131 Self::Knocked { room, updates } => f
132 .debug_struct("Knocked")
133 .field("room", room)
134 .field("updates", &DebugKnockedRoom(updates))
135 .finish(),
136 }
137 }
138}
139
140impl Client {
143 pub(crate) async fn process_sync(
146 &self,
147 response: sync_events::v3::Response,
148 ) -> Result<BaseSyncResponse> {
149 let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
150
151 #[cfg(feature = "e2e-encryption")]
153 self.encryption().backups().maybe_trigger_backup();
154
155 self.call_sync_response_handlers(&response).await?;
156
157 Ok(response)
158 }
159
160 #[tracing::instrument(skip(self, response))]
167 pub(crate) async fn call_sync_response_handlers(
168 &self,
169 response: &BaseSyncResponse,
170 ) -> Result<()> {
171 let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
172
173 let now = Instant::now();
174 self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
175 self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
176 self.handle_sync_events(HandlerKind::ToDevice, None, to_device).await?;
177
178 let _ = self.inner.room_updates_sender.send(rooms.clone());
180
181 for (room_id, room_info) in &rooms.join {
182 let Some(room) = self.get_room(room_id) else {
183 error!(?room_id, "Can't call event handler, room not found");
184 continue;
185 };
186
187 self.send_room_update(room_id, || RoomUpdate::Joined {
188 room: room.clone(),
189 updates: room_info.clone(),
190 });
191
192 let JoinedRoomUpdate {
193 unread_notifications: _,
194 timeline,
195 state,
196 account_data,
197 ephemeral,
198 ambiguity_changes: _,
199 } = room_info;
200
201 let room = Some(&room);
202 self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
203 self.handle_sync_state_events(room, state).await?;
204 self.handle_sync_timeline_events(room, &timeline.events).await?;
205 self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
208 }
209
210 for (room_id, room_info) in &rooms.leave {
211 let Some(room) = self.get_room(room_id) else {
212 error!(?room_id, "Can't call event handler, room not found");
213 continue;
214 };
215
216 self.send_room_update(room_id, || RoomUpdate::Left {
217 room: room.clone(),
218 updates: room_info.clone(),
219 });
220
221 let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
222
223 let room = Some(&room);
224 self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
225 self.handle_sync_state_events(room, state).await?;
226 self.handle_sync_timeline_events(room, &timeline.events).await?;
227 }
228
229 for (room_id, room_info) in &rooms.invite {
230 let Some(room) = self.get_room(room_id) else {
231 error!(?room_id, "Can't call event handler, room not found");
232 continue;
233 };
234
235 self.send_room_update(room_id, || RoomUpdate::Invited {
236 room: room.clone(),
237 updates: room_info.clone(),
238 });
239
240 let invite_state = &room_info.invite_state.events;
241 self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
242 }
243
244 for (room_id, room_info) in &rooms.knocked {
245 let Some(room) = self.get_room(room_id) else {
246 error!(?room_id, "Can't call event handler, room not found");
247 continue;
248 };
249
250 self.send_room_update(room_id, || RoomUpdate::Knocked {
251 room: room.clone(),
252 updates: room_info.clone(),
253 });
254
255 let knock_state = &room_info.knock_state.events;
256 self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
257 }
258
259 debug!("Ran event handlers in {:?}", now.elapsed());
260
261 let now = Instant::now();
262
263 let mut futures = Vec::new();
265 for handler in &*self.notification_handlers().await {
266 for (room_id, room_notifications) in notifications {
267 let Some(room) = self.get_room(room_id) else {
268 warn!(?room_id, "Can't call notification handler, room not found");
269 continue;
270 };
271
272 futures.extend(room_notifications.iter().map(|notification| {
273 (handler)(notification.clone(), room.clone(), self.clone())
274 }));
275 }
276 }
277
278 for fut in futures {
281 fut.await;
282 }
283
284 debug!("Ran notification handlers in {:?}", now.elapsed());
285
286 Ok(())
287 }
288
289 fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
290 if let btree_map::Entry::Occupied(entry) =
291 self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
292 {
293 let tx = entry.get();
294 if tx.receiver_count() == 0 {
295 entry.remove();
296 } else {
297 _ = tx.send(make_msg());
298 }
299 }
300 }
301
302 async fn sleep() {
303 sleep(Duration::from_secs(1)).await;
304 }
305
306 pub(crate) async fn sync_loop_helper(
307 &self,
308 sync_settings: &mut crate::config::SyncSettings,
309 ) -> Result<SyncResponse> {
310 let response = self.sync_once(sync_settings.clone()).await;
311
312 match response {
313 Ok(r) => {
314 sync_settings.token = Some(r.next_batch.clone());
315 Ok(r)
316 }
317 Err(e) => {
318 error!("Received an invalid response: {e}");
319 Err(e)
320 }
321 }
322 }
323
324 pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
325 let now = Instant::now();
326
327 if let Some(t) = last_sync_time {
331 if now - *t <= Duration::from_secs(1) {
332 Self::sleep().await;
333 }
334 }
335
336 *last_sync_time = Some(now);
337 }
338}