matrix_sdk_base/response_processors/room/
sync_v2.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ruma::{
18    api::client::sync::sync_events::v3::{InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom},
19    OwnedRoomId, OwnedUserId, RoomId,
20};
21use tokio::sync::broadcast::Sender;
22
23#[cfg(feature = "e2e-encryption")]
24use super::super::e2ee;
25use super::{
26    super::{account_data, ephemeral_events, notification, state_events, timeline, Context},
27    RoomCreationData,
28};
29use crate::{
30    sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate},
31    Result, RoomInfoNotableUpdate, RoomState,
32};
33
34/// Process updates of a joined room.
35#[allow(clippy::too_many_arguments)]
36pub async fn update_joined_room(
37    context: &mut Context,
38    room_creation_data: RoomCreationData<'_>,
39    joined_room: JoinedRoom,
40    updated_members_in_room: &mut BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>>,
41    notification: notification::Notification<'_>,
42    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
43) -> Result<JoinedRoomUpdate> {
44    let RoomCreationData {
45        room_id,
46        room_info_notable_update_sender,
47        requested_required_states,
48        ambiguity_cache,
49    } = room_creation_data;
50
51    let state_store = notification.state_store;
52
53    let room =
54        state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender);
55
56    let mut room_info = room.clone_info();
57
58    room_info.mark_as_joined();
59    room_info.update_from_ruma_summary(&joined_room.summary);
60    room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref());
61    room_info.mark_state_fully_synced();
62    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
63
64    let (raw_state_events, state_events) =
65        state_events::sync::collect(context, &joined_room.state.events);
66
67    let mut new_user_ids = state_events::sync::dispatch_and_get_new_users(
68        context,
69        (&raw_state_events, &state_events),
70        &mut room_info,
71        ambiguity_cache,
72    )
73    .await?;
74
75    ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id);
76
77    if joined_room.timeline.limited {
78        room_info.mark_members_missing();
79    }
80
81    let (raw_state_events_from_timeline, state_events_from_timeline) =
82        state_events::sync::collect_from_timeline(context, &joined_room.timeline.events);
83
84    let mut other_new_user_ids = state_events::sync::dispatch_and_get_new_users(
85        context,
86        (&raw_state_events_from_timeline, &state_events_from_timeline),
87        &mut room_info,
88        ambiguity_cache,
89    )
90    .await?;
91    new_user_ids.append(&mut other_new_user_ids);
92    updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
93
94    #[cfg(feature = "e2e-encryption")]
95    let olm_machine = e2ee.olm_machine;
96
97    let timeline = timeline::build(
98        context,
99        &room,
100        &mut room_info,
101        timeline::builder::Timeline::from(joined_room.timeline),
102        notification,
103        #[cfg(feature = "e2e-encryption")]
104        e2ee,
105    )
106    .await?;
107
108    // Save the new `RoomInfo`.
109    context.state_changes.add_room(room_info);
110
111    account_data::for_room(context, room_id, &joined_room.account_data.events, state_store).await;
112
113    // `processors::account_data::from_room` might have updated the `RoomInfo`.
114    // Let's fetch it again.
115    //
116    // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines
117    // above.
118    let mut room_info = context
119        .state_changes
120        .room_infos
121        .get(room_id)
122        .expect("`RoomInfo` must exist in `StateChanges` at this point")
123        .clone();
124
125    #[cfg(feature = "e2e-encryption")]
126    e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
127        context,
128        olm_machine,
129        &new_user_ids,
130        room_info.encryption_state(),
131        room.encryption_state(),
132        room_id,
133        state_store,
134    )
135    .await?;
136
137    let notification_count = joined_room.unread_notifications.into();
138    room_info.update_notification_count(notification_count);
139
140    context.state_changes.add_room(room_info);
141
142    Ok(JoinedRoomUpdate::new(
143        timeline,
144        joined_room.state.events,
145        joined_room.account_data.events,
146        joined_room.ephemeral.events,
147        notification_count,
148        ambiguity_cache.changes.remove(room_id).unwrap_or_default(),
149    ))
150}
151
152/// Process historical updates of a left room.
153#[allow(clippy::too_many_arguments)]
154pub async fn update_left_room(
155    context: &mut Context,
156    room_creation_data: RoomCreationData<'_>,
157    left_room: LeftRoom,
158    notification: notification::Notification<'_>,
159    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
160) -> Result<LeftRoomUpdate> {
161    let RoomCreationData {
162        room_id,
163        room_info_notable_update_sender,
164        requested_required_states,
165        ambiguity_cache,
166    } = room_creation_data;
167
168    let state_store = notification.state_store;
169
170    let room =
171        state_store.get_or_create_room(room_id, RoomState::Left, room_info_notable_update_sender);
172
173    let mut room_info = room.clone_info();
174    room_info.mark_as_left();
175    room_info.mark_state_partially_synced();
176    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
177
178    let (raw_state_events, state_events) =
179        state_events::sync::collect(context, &left_room.state.events);
180
181    let _ = state_events::sync::dispatch_and_get_new_users(
182        context,
183        (&raw_state_events, &state_events),
184        &mut room_info,
185        ambiguity_cache,
186    )
187    .await?;
188
189    let (raw_state_events_from_timeline, state_events_from_timeline) =
190        state_events::sync::collect_from_timeline(context, &left_room.timeline.events);
191
192    let _ = state_events::sync::dispatch_and_get_new_users(
193        context,
194        (&raw_state_events_from_timeline, &state_events_from_timeline),
195        &mut room_info,
196        ambiguity_cache,
197    )
198    .await?;
199
200    let timeline = timeline::build(
201        context,
202        &room,
203        &mut room_info,
204        timeline::builder::Timeline::from(left_room.timeline),
205        notification,
206        #[cfg(feature = "e2e-encryption")]
207        e2ee,
208    )
209    .await?;
210
211    // Save the new `RoomInfo`.
212    context.state_changes.add_room(room_info);
213
214    account_data::for_room(context, room_id, &left_room.account_data.events, state_store).await;
215
216    let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
217
218    Ok(LeftRoomUpdate::new(
219        timeline,
220        left_room.state.events,
221        left_room.account_data.events,
222        ambiguity_changes,
223    ))
224}
225
226/// Process updates of an invited room.
227pub async fn update_invited_room(
228    context: &mut Context,
229    room_id: &RoomId,
230    invited_room: InvitedRoom,
231    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
232    notification: notification::Notification<'_>,
233) -> Result<InvitedRoomUpdate> {
234    let state_store = notification.state_store;
235
236    let room = state_store.get_or_create_room(
237        room_id,
238        RoomState::Invited,
239        room_info_notable_update_sender,
240    );
241
242    let (raw_events, events) =
243        state_events::stripped::collect(context, &invited_room.invite_state.events);
244
245    let mut room_info = room.clone_info();
246    room_info.mark_as_invited();
247    room_info.mark_state_fully_synced();
248
249    state_events::stripped::dispatch_invite_or_knock(
250        context,
251        (&raw_events, &events),
252        &room,
253        &mut room_info,
254        notification,
255    )
256    .await?;
257
258    context.state_changes.add_room(room_info);
259
260    Ok(invited_room)
261}
262
263/// Process updates of a knocked room.
264pub async fn update_knocked_room(
265    context: &mut Context,
266    room_id: &RoomId,
267    knocked_room: KnockedRoom,
268    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
269    notification: notification::Notification<'_>,
270) -> Result<KnockedRoomUpdate> {
271    let state_store = notification.state_store;
272
273    let room = state_store.get_or_create_room(
274        room_id,
275        RoomState::Knocked,
276        room_info_notable_update_sender,
277    );
278
279    let (raw_events, events) =
280        state_events::stripped::collect(context, &knocked_room.knock_state.events);
281
282    let mut room_info = room.clone_info();
283    room_info.mark_as_knocked();
284    room_info.mark_state_fully_synced();
285
286    state_events::stripped::dispatch_invite_or_knock(
287        context,
288        (&raw_events, &events),
289        &room,
290        &mut room_info,
291        notification,
292    )
293    .await?;
294
295    context.state_changes.add_room(room_info);
296
297    Ok(knocked_room)
298}