Skip to main content

matrix_sdk/
sync.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The SDK's representation of the result of a `/sync` request.
16
17use std::{
18    collections::{BTreeMap, btree_map},
19    fmt,
20    time::Duration,
21};
22
23pub use matrix_sdk_base::sync::*;
24use matrix_sdk_base::{
25    debug::{
26        DebugInvitedRoom, DebugKnockedRoom, DebugListOfProcessedToDeviceEvents,
27        DebugListOfRawEventsNoId,
28    },
29    sleep::sleep,
30    sync::SyncResponse as BaseSyncResponse,
31    timer,
32};
33use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
34use ruma::{
35    OwnedRoomId, RoomId,
36    api::client::sync::sync_events::{
37        self,
38        v3::{InvitedRoom, KnockedRoom},
39    },
40    events::{AnyGlobalAccountDataEvent, presence::PresenceEvent},
41    serde::Raw,
42    time::Instant,
43};
44use tracing::{debug, error, instrument, warn};
45
46use crate::{Client, Result, Room, event_handler::HandlerKind};
47
48/// The processed response of a `/sync` request.
49#[derive(Clone, Default)]
50pub struct SyncResponse {
51    /// The batch token to supply in the `since` param of the next `/sync`
52    /// request.
53    pub next_batch: String,
54    /// Updates to rooms.
55    pub rooms: RoomUpdates,
56    /// Updates to the presence status of other users.
57    pub presence: Vec<Raw<PresenceEvent>>,
58    /// The global private data created by this user.
59    pub account_data: Vec<Raw<AnyGlobalAccountDataEvent>>,
60    /// Messages sent directly between devices.
61    pub to_device: Vec<ProcessedToDeviceEvent>,
62    /// New notifications per room.
63    pub notifications: BTreeMap<OwnedRoomId, Vec<Notification>>,
64}
65
66impl SyncResponse {
67    pub(crate) fn new(next_batch: String, base_response: BaseSyncResponse) -> Self {
68        let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } =
69            base_response;
70
71        Self { next_batch, rooms, presence, account_data, to_device, notifications }
72    }
73}
74
75#[cfg(not(tarpaulin_include))]
76impl fmt::Debug for SyncResponse {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("SyncResponse")
79            .field("next_batch", &self.next_batch)
80            .field("rooms", &self.rooms)
81            .field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
82            .field("to_device", &DebugListOfProcessedToDeviceEvents(&self.to_device))
83            .field("notifications", &self.notifications)
84            .finish_non_exhaustive()
85    }
86}
87
88/// A batch of updates to a room.
89#[derive(Clone)]
90pub enum RoomUpdate {
91    /// Updates to a room the user is no longer in.
92    Left {
93        /// Room object with general information on the room.
94        room: Room,
95        /// Updates to the room.
96        updates: LeftRoomUpdate,
97    },
98    /// Updates to a room the user is currently in.
99    Joined {
100        /// Room object with general information on the room.
101        room: Room,
102        /// Updates to the room.
103        updates: JoinedRoomUpdate,
104    },
105    /// Updates to a room the user is invited to.
106    Invited {
107        /// Room object with general information on the room.
108        room: Room,
109        /// Updates to the room.
110        updates: InvitedRoom,
111    },
112    /// Updates to a room the user knocked on.
113    Knocked {
114        /// Room object with general information on the room.
115        room: Room,
116        /// Updates to the room.
117        updates: KnockedRoom,
118    },
119}
120
121#[cfg(not(tarpaulin_include))]
122impl fmt::Debug for RoomUpdate {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        match self {
125            Self::Left { room, updates } => {
126                f.debug_struct("Left").field("room", room).field("updates", updates).finish()
127            }
128            Self::Joined { room, updates } => {
129                f.debug_struct("Joined").field("room", room).field("updates", updates).finish()
130            }
131            Self::Invited { room, updates } => f
132                .debug_struct("Invited")
133                .field("room", room)
134                .field("updates", &DebugInvitedRoom(updates))
135                .finish(),
136            Self::Knocked { room, updates } => f
137                .debug_struct("Knocked")
138                .field("room", room)
139                .field("updates", &DebugKnockedRoom(updates))
140                .finish(),
141        }
142    }
143}
144
145/// Internal functionality related to getting events from the server
146/// (`sync_events` endpoint)
147impl Client {
148    /// Receive a sync response, compute extra information out of it and store
149    /// the interesting bits in the database, then call all the handlers.
150    pub(crate) async fn process_sync(
151        &self,
152        response: sync_events::v3::Response,
153    ) -> Result<BaseSyncResponse> {
154        subscribe_to_room_latest_events(
155            self,
156            response.rooms.join.keys().chain(response.rooms.leave.keys()),
157        )
158        .await;
159
160        let response = Box::pin(self.base_client().receive_sync_response(response)).await?;
161
162        // Some new keys might have been received, so trigger a backup if needed.
163        #[cfg(feature = "e2e-encryption")]
164        self.encryption().backups().maybe_trigger_backup();
165
166        self.call_sync_response_handlers(&response).await?;
167
168        Ok(response)
169    }
170
171    /// Calls event handlers and notification handlers after a sync response has
172    /// been processed.
173    ///
174    /// At this point, the sync response's data has been taken into account and
175    /// persisted in the store, if needs be. This function is only calling
176    /// the event, room update and notification handlers.
177    #[tracing::instrument(skip(self, response))]
178    pub(crate) async fn call_sync_response_handlers(
179        &self,
180        response: &BaseSyncResponse,
181    ) -> Result<()> {
182        let _timer = timer!(tracing::Level::TRACE, "_method");
183
184        let BaseSyncResponse { rooms, presence, account_data, to_device, notifications } = response;
185
186        let now = Instant::now();
187        self.handle_sync_events(HandlerKind::GlobalAccountData, None, account_data).await?;
188        self.handle_sync_events(HandlerKind::Presence, None, presence).await?;
189        self.handle_sync_to_device_events(to_device).await?;
190
191        // Ignore errors when there are no receivers.
192        let _ = self.inner.room_updates_sender.send(rooms.clone());
193
194        for (room_id, room_info) in &rooms.joined {
195            let Some(room) = self.get_room(room_id) else {
196                error!(?room_id, "Can't call event handler, room not found");
197                continue;
198            };
199
200            self.send_room_update(room_id, || RoomUpdate::Joined {
201                room: room.clone(),
202                updates: room_info.clone(),
203            });
204
205            let JoinedRoomUpdate {
206                unread_notifications: _,
207                timeline,
208                state,
209                account_data,
210                ephemeral,
211                ambiguity_changes: _,
212                avatar_changes: _,
213            } = room_info;
214
215            let room = Some(&room);
216            self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
217            self.handle_sync_state_events(room, state).await?;
218            self.handle_sync_timeline_events(room, &timeline.events).await?;
219            // Handle ephemeral events after timeline, read receipts in here
220            // could refer to timeline events from the same response.
221            self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
222        }
223
224        for (room_id, room_info) in &rooms.left {
225            let Some(room) = self.get_room(room_id) else {
226                error!(?room_id, "Can't call event handler, room not found");
227                continue;
228            };
229
230            self.send_room_update(room_id, || RoomUpdate::Left {
231                room: room.clone(),
232                updates: room_info.clone(),
233            });
234
235            let LeftRoomUpdate { timeline, state, account_data, ambiguity_changes: _ } = room_info;
236
237            let room = Some(&room);
238            self.handle_sync_events(HandlerKind::RoomAccountData, room, account_data).await?;
239            self.handle_sync_state_events(room, state).await?;
240            self.handle_sync_timeline_events(room, &timeline.events).await?;
241        }
242
243        for (room_id, room_info) in &rooms.invited {
244            let Some(room) = self.get_room(room_id) else {
245                error!(?room_id, "Can't call event handler, room not found");
246                continue;
247            };
248
249            self.send_room_update(room_id, || RoomUpdate::Invited {
250                room: room.clone(),
251                updates: room_info.clone(),
252            });
253
254            let invite_state = &room_info.invite_state.events;
255            self.handle_sync_events(HandlerKind::StrippedState, Some(&room), invite_state).await?;
256        }
257
258        for (room_id, room_info) in &rooms.knocked {
259            let Some(room) = self.get_room(room_id) else {
260                error!(?room_id, "Can't call event handler, room not found");
261                continue;
262            };
263
264            self.send_room_update(room_id, || RoomUpdate::Knocked {
265                room: room.clone(),
266                updates: room_info.clone(),
267            });
268
269            let knock_state = &room_info.knock_state.events;
270            self.handle_sync_events(HandlerKind::StrippedState, Some(&room), knock_state).await?;
271        }
272
273        debug!("Ran event handlers in {:?}", now.elapsed());
274
275        let now = Instant::now();
276
277        // Construct notification event handler futures
278        let mut futures = Vec::new();
279        for handler in &*self.notification_handlers().await {
280            for (room_id, room_notifications) in notifications {
281                let Some(room) = self.get_room(room_id) else {
282                    warn!(?room_id, "Can't call notification handler, room not found");
283                    continue;
284                };
285
286                futures.extend(room_notifications.iter().map(|notification| {
287                    (handler)(notification.clone(), room.clone(), self.clone())
288                }));
289            }
290        }
291
292        // Run the notification handler futures with the
293        // `self.notification_handlers` lock no longer being held, in order.
294        for fut in futures {
295            fut.await;
296        }
297
298        debug!("Ran notification handlers in {:?}", now.elapsed());
299
300        Ok(())
301    }
302
303    fn send_room_update(&self, room_id: &RoomId, make_msg: impl FnOnce() -> RoomUpdate) {
304        if let btree_map::Entry::Occupied(entry) =
305            self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned())
306        {
307            let tx = entry.get();
308            if tx.receiver_count() == 0 {
309                entry.remove();
310            } else {
311                _ = tx.send(make_msg());
312            }
313        }
314    }
315
316    async fn sleep() {
317        sleep(Duration::from_secs(1)).await;
318    }
319
320    pub(crate) async fn sync_loop_helper(
321        &self,
322        sync_settings: &mut crate::config::SyncSettings,
323    ) -> Result<SyncResponse> {
324        let response = self.sync_once(sync_settings.clone()).await;
325
326        match response {
327            Ok(r) => {
328                sync_settings.token = r.next_batch.clone().into();
329                Ok(r)
330            }
331            Err(e) => {
332                error!("Received an invalid response: {e}");
333                Err(e)
334            }
335        }
336    }
337
338    pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
339        let now = Instant::now();
340
341        // If the last sync happened less than a second ago, sleep for a
342        // while to not hammer out requests if the server doesn't respect
343        // the sync timeout.
344        if let Some(t) = last_sync_time
345            && now - *t <= Duration::from_secs(1)
346        {
347            Self::sleep().await;
348        }
349
350        *last_sync_time = Some(now);
351    }
352}
353
354/// Call `LatestEvents::listen_to_room` for rooms in `response`.
355///
356/// That way, the latest event is computed and updated for all rooms receiving
357/// an update from the sync.
358#[instrument(skip_all)]
359pub(crate) async fn subscribe_to_room_latest_events<'a, R>(client: &'a Client, room_ids: R)
360where
361    R: Iterator<Item = &'a OwnedRoomId>,
362{
363    if !client.event_cache().has_subscribed() {
364        return;
365    }
366
367    let latest_events = client.latest_events().await;
368
369    for room_id in room_ids {
370        if let Err(error) = latest_events.listen_to_room(room_id).await {
371            error!(?error, ?room_id, "Failed to listen to the latest event for this room");
372        }
373    }
374}