1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
1415use ruma::{events::AnySyncStateEvent, serde::Raw};
16use serde::Deserialize;
17use tracing::warn;
1819use super::Context;
2021/// Collect [`AnySyncStateEvent`].
22pub mod sync {
23use std::{collections::BTreeSet, iter};
2425use ruma::{
26 events::{room::member::MembershipState, AnySyncTimelineEvent},
27 OwnedUserId,
28 };
29use tracing::instrument;
3031use super::{super::profiles, AnySyncStateEvent, Context, Raw};
32use crate::{
33 store::{ambiguity_map::AmbiguityCache, Resultas StoreResult},
34 RoomInfo,
35 };
3637/// Collect [`AnySyncStateEvent`] to [`AnySyncStateEvent`].
38pub fn collect(
39 _context: &mut Context,
40 raw_events: &[Raw<AnySyncStateEvent>],
41 ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
42super::collect(raw_events)
43 }
4445/// Collect [`AnySyncTimelineEvent`] to [`AnySyncStateEvent`].
46 ///
47 /// A [`AnySyncTimelineEvent`] can represent either message-like events or
48 /// state events. The message-like events are filtered out.
49pub fn collect_from_timeline(
50 _context: &mut Context,
51 raw_events: &[Raw<AnySyncTimelineEvent>],
52 ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
53super::collect(raw_events.iter().filter_map(|raw_event| {
54// Only state events have a `state_key` field.
55match raw_event.get_field::<&str>("state_key") {
56Ok(Some(_)) => Some(raw_event.cast_ref()),
57_ => None,
58 }
59 }))
60 }
6162/// Dispatch the sync state events and return the new users for this room.
63 ///
64 /// `raw_events` and `events` must be generated from [`collect`].
65 /// Events must be exactly the same list of events that are in
66 /// `raw_events`, but deserialised. We demand them here to avoid
67 /// deserialising multiple times.
68#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
69pub async fn dispatch_and_get_new_users(
70 context: &mut Context,
71 (raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
72 room_info: &mut RoomInfo,
73 ambiguity_cache: &mut AmbiguityCache,
74 ) -> StoreResult<BTreeSet<OwnedUserId>> {
75let mut user_ids = BTreeSet::new();
7677if raw_events.is_empty() {
78return Ok(user_ids);
79 }
8081for (raw_event, event) in iter::zip(raw_events, events) {
82 room_info.handle_state_event(event);
8384if let AnySyncStateEvent::RoomMember(member) = event {
85 ambiguity_cache
86 .handle_event(&context.state_changes, &room_info.room_id, member)
87 .await?;
8889match member.membership() {
90 MembershipState::Join | MembershipState::Invite => {
91 user_ids.insert(member.state_key().to_owned());
92 }
93_ => (),
94 }
9596 profiles::upsert_or_delete(context, &room_info.room_id, member);
97 }
9899 context
100 .state_changes
101 .state
102 .entry(room_info.room_id.to_owned())
103 .or_default()
104 .entry(event.event_type())
105 .or_default()
106 .insert(event.state_key().to_owned(), raw_event.clone());
107 }
108109Ok(user_ids)
110 }
111}
112113/// Collect [`AnyStrippedStateEvent`].
114pub mod stripped {
115use std::{collections::BTreeMap, iter};
116117use ruma::{events::AnyStrippedStateEvent, push::Action};
118use tracing::instrument;
119120use super::{
121super::{notification, timeline},
122 Context, Raw,
123 };
124use crate::{Result, Room, RoomInfo};
125126/// Collect [`AnyStrippedStateEvent`] to [`AnyStrippedStateEvent`].
127pub fn collect(
128 _context: &mut Context,
129 raw_events: &[Raw<AnyStrippedStateEvent>],
130 ) -> (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>) {
131super::collect(raw_events)
132 }
133134/// Dispatch the stripped state events.
135 ///
136 /// `raw_events` and `events` must be generated from [`collect`].
137 /// Events must be exactly the same list of events that are in
138 /// `raw_events`, but deserialised. We demand them here to avoid
139 /// deserialising multiple times.
140 ///
141 /// Dispatch the stripped state events in `invite_state` or `knock_state`,
142 /// modifying the room's info and posting notifications as needed.
143 ///
144 /// * `raw_events` and `events` - The contents of `invite_state` in the form
145 /// of list of pairs of raw stripped state events with their deserialized
146 /// counterpart.
147 /// * `room` - The [`Room`] to modify.
148 /// * `room_info` - The current room's info.
149 /// * `push_rules` - The push rules for this room.
150 /// * `changes` - The accumulated list of changes to apply once the
151 /// processing is finished.
152 /// * `notifications` - Notifications to post for the current room.
153 /// * `state_store` — The state store.
154#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
155pub(crate) async fn dispatch_invite_or_knock(
156 context: &mut Context,
157 (raw_events, events): (&[Raw<AnyStrippedStateEvent>], &[AnyStrippedStateEvent]),
158 room: &Room,
159 room_info: &mut RoomInfo,
160mut notification: notification::Notification<'_>,
161 ) -> Result<()> {
162let mut state_events = BTreeMap::new();
163164for (raw_event, event) in iter::zip(raw_events, events) {
165 room_info.handle_stripped_state_event(event);
166 state_events
167 .entry(event.event_type())
168 .or_insert_with(BTreeMap::new)
169 .insert(event.state_key().to_owned(), raw_event.clone());
170 }
171172 context
173 .state_changes
174 .stripped_state
175 .insert(room_info.room_id().to_owned(), state_events.clone());
176177// We need to check for notifications after we have handled all state
178 // events, to make sure we have the full push context.
179if let Some(push_condition_room_ctx) =
180 timeline::get_push_room_context(context, room, room_info, notification.state_store)
181 .await?
182{
183let room_id = room.room_id();
184185// Check every event again for notification.
186for event in state_events.values().flat_map(|map| map.values()) {
187 notification.push_notification_from_event_if(
188 room_id,
189&push_condition_room_ctx,
190 event,
191 Action::should_notify,
192 );
193 }
194 }
195196Ok(())
197 }
198}
199200fn collect<'a, I, T>(raw_events: I) -> (Vec<Raw<T>>, Vec<T>)
201where
202I: IntoIterator<Item = &'a Raw<T>>,
203 T: Deserialize<'a> + 'a,
204{
205 raw_events
206 .into_iter()
207 .filter_map(|raw_event| match raw_event.deserialize() {
208Ok(event) => Some((raw_event.clone(), event)),
209Err(e) => {
210warn!("Couldn't deserialize stripped state event: {e}");
211None
212}
213 })
214 .unzip()
215}
216217#[cfg(test)]
218mod tests {
219use matrix_sdk_test::{
220 async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
221 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
222 };
223use ruma::{event_id, user_id};
224225use crate::test_utils::logged_in_base_client;
226227#[async_test]
228async fn test_state_events_after_sync() {
229// Given a room
230let user_id = user_id!("@u:u.to");
231232let client = logged_in_base_client(Some(user_id)).await;
233let mut sync_builder = SyncResponseBuilder::new();
234235let room_name = EventFactory::new()
236 .sender(user_id)
237 .room_topic("this is the test topic in the timeline")
238 .event_id(event_id!("$2"))
239 .into_raw_sync();
240241let response = sync_builder
242 .add_joined_room(
243 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
244 .add_timeline_event(room_name)
245 .add_state_event(StateTestEvent::PowerLevels),
246 )
247 .build_sync_response();
248 client.receive_sync_response(response).await.unwrap();
249250let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Just-created room not found!");
251252// ensure that we have the power levels
253assert!(room.power_levels().await.is_ok());
254255// ensure that we have the topic
256assert_eq!(room.topic().unwrap(), "this is the test topic in the timeline");
257 }
258}