matrix_sdk/
sync.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The SDK's representation of the result of a `/sync` request.
16
17use std::{
18    collections::{btree_map, BTreeMap},
19    fmt,
20    time::Duration,
21};
22
23pub use matrix_sdk_base::sync::*;
24use matrix_sdk_base::{
25    debug::{DebugInvitedRoom, DebugKnockedRoom, DebugListOfRawEventsNoId},
26    sleep::sleep,
27    sync::SyncResponse as BaseSyncResponse,
28};
29use ruma::{
30    api::client::sync::sync_events::{
31        self,
32        v3::{InvitedRoom, KnockedRoom},
33    },
34    events::{presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyToDeviceEvent},
35    serde::Raw,
36    time::Instant,
37    OwnedRoomId, RoomId,
38};
39use tracing::{debug, error, warn};
40
41use crate::{event_handler::HandlerKind, Client, Result, Room};
42
43/// The processed response of a `/sync` request.
44#[derive(Clone, Default)]
45pub struct SyncResponse {
46    /// The batch token to supply in the `since` param of the next `/sync`
47    /// request.
48    pub next_batch: String,
49    /// Updates to rooms.
50    pub rooms: RoomUpdates,
51    /// Updates to the presence status of other users.
52    pub presence: Vec<Raw<PresenceEvent>>,
53    /// The global private data created by this user.
54    pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
55    /// Messages sent directly between devices.
56    pub to_device: Vec<Raw<AnyToDeviceEvent>>,
57    /// New notifications per room.
58    pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
59}
60
61impl SyncResponse {
62    pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
63        let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
64            base_response;
65
66        Self { next_batch, rooms, presence, account_data, to_device, notifications }
67    }
68}
69
70#[cfg(not(tarpaulin_include))]
71impl fmt::Debug for SyncResponse {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        f.debug_struct("SyncResponse")
74            .field("next_batch", &self.next_batch)
75            .field("rooms", &self.rooms)
76            .field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
77            .field("to_device", &DebugListOfRawEventsNoId(&self.to_device))
78            .field("notifications", &self.notifications)
79            .finish_non_exhaustive()
80    }
81}
82
83/// A batch of updates to a room.
84#[derive(Clone)]
85pub enum RoomUpdate {
86    /// Updates to a room the user is no longer in.
87    Left {
88        /// Room object with general information on the room.
89        room: Room,
90        /// Updates to the room.
91        updates: LeftRoomUpdate,
92    },
93    /// Updates to a room the user is currently in.
94    Joined {
95        /// Room object with general information on the room.
96        room: Room,
97        /// Updates to the room.
98        updates: JoinedRoomUpdate,
99    },
100    /// Updates to a room the user is invited to.
101    Invited {
102        /// Room object with general information on the room.
103        room: Room,
104        /// Updates to the room.
105        updates: InvitedRoom,
106    },
107    /// Updates to a room the user knocked on.
108    Knocked {
109        /// Room object with general information on the room.
110        room: Room,
111        /// Updates to the room.
112        updates: KnockedRoom,
113    },
114}
115
116#[cfg(not(tarpaulin_include))]
117impl fmt::Debug for RoomUpdate {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        match self {
120            Self::Left { room, updates } => {
121                f.debug_struct("Left").field("room", room).field("updates", updates).finish()
122            }
123            Self::Joined { room, updates } => {
124                f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
125            }
126            Self::Invited { room, updates } => f
127                .debug_struct("Invited")
128                .field("room", room)
129                .field("updates", &DebugInvitedRoom(updates))
130                .finish(),
131            Self::Knocked { room, updates } => f
132                .debug_struct("Knocked")
133                .field("room", room)
134                .field("updates", &DebugKnockedRoom(updates))
135                .finish(),
136        }
137    }
138}
139
140/// Internal functionality related to getting events from the server
141/// (`sync_events` endpoint)
142impl Client {
143    /// Receive a sync response, compute extra information out of it and store
144    /// the interesting bits in the database, then call all the handlers.
145    pub(crate) async fn process_sync(
146        &self,
147        response: sync_events::v3::Response,
148    ) -> Result<BaseSyncResponse> {
149        let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
150
151        // Some new keys might have been received, so trigger a backup if needed.
152        #[cfg(feature = "e2e-encryption")]
153        self.encryption().backups().maybe_trigger_backup();
154
155        self.call_sync_response_handlers(&response).await?;
156
157        Ok(response)
158    }
159
160    /// Calls event handlers and notification handlers after a sync response has
161    /// been processed.
162    ///
163    /// At this point, the sync response's data has been taken into account and
164    /// persisted in the store, if needs be. This function is only calling
165    /// the event, room update and notification handlers.
166    #[tracing::instrument(skip(self, response))]
167    pub(crate) async fn call_sync_response_handlers(
168        &self,
169        response: &BaseSyncResponse,
170    ) -> Result<()> {
171        let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
172
173        let now = Instant::now();
174        self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
175        self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
176        self.handle_sync_events(HandlerKind::ToDevice, None, to_device).await?;
177
178        // Ignore errors when there are no receivers.
179        let _ = self.inner.room_updates_sender.send(rooms.clone());
180
181        for (room_id, room_info) in &rooms.join {
182            let Some(room) = self.get_room(room_id) else {
183                error!(?room_id, "Can't call event handler, room not found");
184                continue;
185            };
186
187            self.send_room_update(room_id, || RoomUpdate::Joined {
188                room: room.clone(),
189                updates: room_info.clone(),
190            });
191
192            let JoinedRoomUpdate {
193                unread_notifications: _,
194                timeline,
195                state,
196                account_data,
197                ephemeral,
198                ambiguity_changes: _,
199            } = room_info;
200
201            let room = Some(&room);
202            self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
203            self.handle_sync_state_events(room, state).await?;
204            self.handle_sync_timeline_events(room, &timeline.events).await?;
205            // Handle ephemeral events after timeline, read receipts in here
206            // could refer to timeline events from the same response.
207            self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
208        }
209
210        for (room_id, room_info) in &rooms.leave {
211            let Some(room) = self.get_room(room_id) else {
212                error!(?room_id, "Can't call event handler, room not found");
213                continue;
214            };
215
216            self.send_room_update(room_id, || RoomUpdate::Left {
217                room: room.clone(),
218                updates: room_info.clone(),
219            });
220
221            let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
222
223            let room = Some(&room);
224            self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
225            self.handle_sync_state_events(room, state).await?;
226            self.handle_sync_timeline_events(room, &timeline.events).await?;
227        }
228
229        for (room_id, room_info) in &rooms.invite {
230            let Some(room) = self.get_room(room_id) else {
231                error!(?room_id, "Can't call event handler, room not found");
232                continue;
233            };
234
235            self.send_room_update(room_id, || RoomUpdate::Invited {
236                room: room.clone(),
237                updates: room_info.clone(),
238            });
239
240            let invite_state = &room_info.invite_state.events;
241            self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
242        }
243
244        for (room_id, room_info) in &rooms.knocked {
245            let Some(room) = self.get_room(room_id) else {
246                error!(?room_id, "Can't call event handler, room not found");
247                continue;
248            };
249
250            self.send_room_update(room_id, || RoomUpdate::Knocked {
251                room: room.clone(),
252                updates: room_info.clone(),
253            });
254
255            let knock_state = &room_info.knock_state.events;
256            self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
257        }
258
259        debug!("Ran event handlers in {:?}", now.elapsed());
260
261        let now = Instant::now();
262
263        // Construct notification event handler futures
264        let mut futures = Vec::new();
265        for handler in &*self.notification_handlers().await {
266            for (room_id, room_notifications) in notifications {
267                let Some(room) = self.get_room(room_id) else {
268                    warn!(?room_id, "Can't call notification handler, room not found");
269                    continue;
270                };
271
272                futures.extend(room_notifications.iter().map(|notification| {
273                    (handler)(notification.clone(), room.clone(), self.clone())
274                }));
275            }
276        }
277
278        // Run the notification handler futures with the
279        // `self.notification_handlers` lock no longer being held, in order.
280        for fut in futures {
281            fut.await;
282        }
283
284        debug!("Ran notification handlers in {:?}", now.elapsed());
285
286        Ok(())
287    }
288
289    fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
290        if let btree_map::Entry::Occupied(entry) =
291            self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
292        {
293            let tx = entry.get();
294            if tx.receiver_count() == 0 {
295                entry.remove();
296            } else {
297                _ = tx.send(make_msg());
298            }
299        }
300    }
301
302    async fn sleep() {
303        sleep(Duration::from_secs(1)).await;
304    }
305
306    pub(crate) async fn sync_loop_helper(
307        &self,
308        sync_settings: &mut crate::config::SyncSettings,
309    ) -> Result<SyncResponse> {
310        let response = self.sync_once(sync_settings.clone()).await;
311
312        match response {
313            Ok(r) => {
314                sync_settings.token = Some(r.next_batch.clone());
315                Ok(r)
316            }
317            Err(e) => {
318                error!("Received an invalid response: {e}");
319                Err(e)
320            }
321        }
322    }
323
324    pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
325        let now = Instant::now();
326
327        // If the last sync happened less than a second ago, sleep for a
328        // while to not hammer out requests if the server doesn't respect
329        // the sync timeout.
330        if let Some(t) = last_sync_time {
331            if now - *t <= Duration::from_secs(1) {
332                Self::sleep().await;
333            }
334        }
335
336        *last_sync_time = Some(now);
337    }
338}