matrix_sdk_base/response_processors/
state_events.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ruma::{events::AnySyncStateEvent, serde::Raw};
16use serde::Deserialize;
17use tracing::warn;
18
19use super::Context;
20
21/// Collect [`AnySyncStateEvent`].
22pub mod sync {
23    use std::{collections::BTreeSet, iter};
24
25    use ruma::{
26        events::{room::member::MembershipState, AnySyncTimelineEvent},
27        OwnedUserId,
28    };
29    use tracing::instrument;
30
31    use super::{super::profiles, AnySyncStateEvent, Context, Raw};
32    use crate::{
33        store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
34        RoomInfo,
35    };
36
37    /// Collect [`AnySyncStateEvent`] to [`AnySyncStateEvent`].
38    pub fn collect(
39        _context: &mut Context,
40        raw_events: &[Raw<AnySyncStateEvent>],
41    ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
42        super::collect(raw_events)
43    }
44
45    /// Collect [`AnySyncTimelineEvent`] to [`AnySyncStateEvent`].
46    ///
47    /// A [`AnySyncTimelineEvent`] can represent either message-like events or
48    /// state events. The message-like events are filtered out.
49    pub fn collect_from_timeline(
50        _context: &mut Context,
51        raw_events: &[Raw<AnySyncTimelineEvent>],
52    ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
53        super::collect(raw_events.iter().filter_map(|raw_event| {
54            // Only state events have a `state_key` field.
55            match raw_event.get_field::<&str>("state_key") {
56                Ok(Some(_)) => Some(raw_event.cast_ref()),
57                _ => None,
58            }
59        }))
60    }
61
62    /// Dispatch the sync state events and return the new users for this room.
63    ///
64    /// `raw_events` and `events` must be generated from [`collect`].
65    /// Events must be exactly the same list of events that are in
66    /// `raw_events`, but deserialised. We demand them here to avoid
67    /// deserialising multiple times.
68    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
69    pub async fn dispatch_and_get_new_users(
70        context: &mut Context,
71        (raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
72        room_info: &mut RoomInfo,
73        ambiguity_cache: &mut AmbiguityCache,
74    ) -> StoreResult<BTreeSet<OwnedUserId>> {
75        let mut user_ids = BTreeSet::new();
76
77        if raw_events.is_empty() {
78            return Ok(user_ids);
79        }
80
81        for (raw_event, event) in iter::zip(raw_events, events) {
82            room_info.handle_state_event(event);
83
84            if let AnySyncStateEvent::RoomMember(member) = event {
85                ambiguity_cache
86                    .handle_event(&context.state_changes, &room_info.room_id, member)
87                    .await?;
88
89                match member.membership() {
90                    MembershipState::Join | MembershipState::Invite => {
91                        user_ids.insert(member.state_key().to_owned());
92                    }
93                    _ => (),
94                }
95
96                profiles::upsert_or_delete(context, &room_info.room_id, member);
97            }
98
99            context
100                .state_changes
101                .state
102                .entry(room_info.room_id.to_owned())
103                .or_default()
104                .entry(event.event_type())
105                .or_default()
106                .insert(event.state_key().to_owned(), raw_event.clone());
107        }
108
109        Ok(user_ids)
110    }
111}
112
113/// Collect [`AnyStrippedStateEvent`].
114pub mod stripped {
115    use std::{collections::BTreeMap, iter};
116
117    use ruma::{events::AnyStrippedStateEvent, push::Action};
118    use tracing::instrument;
119
120    use super::{
121        super::{notification, timeline},
122        Context, Raw,
123    };
124    use crate::{Result, Room, RoomInfo};
125
126    /// Collect [`AnyStrippedStateEvent`] to [`AnyStrippedStateEvent`].
127    pub fn collect(
128        _context: &mut Context,
129        raw_events: &[Raw<AnyStrippedStateEvent>],
130    ) -> (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>) {
131        super::collect(raw_events)
132    }
133
134    /// Dispatch the stripped state events.
135    ///
136    /// `raw_events` and `events` must be generated from [`collect`].
137    /// Events must be exactly the same list of events that are in
138    /// `raw_events`, but deserialised. We demand them here to avoid
139    /// deserialising multiple times.
140    ///
141    /// Dispatch the stripped state events in `invite_state` or `knock_state`,
142    /// modifying the room's info and posting notifications as needed.
143    ///
144    /// * `raw_events` and `events` - The contents of `invite_state` in the form
145    ///   of list of pairs of raw stripped state events with their deserialized
146    ///   counterpart.
147    /// * `room` - The [`Room`] to modify.
148    /// * `room_info` - The current room's info.
149    /// * `push_rules` - The push rules for this room.
150    /// * `changes` - The accumulated list of changes to apply once the
151    ///   processing is finished.
152    /// * `notifications` - Notifications to post for the current room.
153    /// * `state_store` — The state store.
154    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
155    pub(crate) async fn dispatch_invite_or_knock(
156        context: &mut Context,
157        (raw_events, events): (&[Raw<AnyStrippedStateEvent>], &[AnyStrippedStateEvent]),
158        room: &Room,
159        room_info: &mut RoomInfo,
160        mut notification: notification::Notification<'_>,
161    ) -> Result<()> {
162        let mut state_events = BTreeMap::new();
163
164        for (raw_event, event) in iter::zip(raw_events, events) {
165            room_info.handle_stripped_state_event(event);
166            state_events
167                .entry(event.event_type())
168                .or_insert_with(BTreeMap::new)
169                .insert(event.state_key().to_owned(), raw_event.clone());
170        }
171
172        context
173            .state_changes
174            .stripped_state
175            .insert(room_info.room_id().to_owned(), state_events.clone());
176
177        // We need to check for notifications after we have handled all state
178        // events, to make sure we have the full push context.
179        if let Some(push_condition_room_ctx) =
180            timeline::get_push_room_context(context, room, room_info, notification.state_store)
181                .await?
182        {
183            let room_id = room.room_id();
184
185            // Check every event again for notification.
186            for event in state_events.values().flat_map(|map| map.values()) {
187                notification.push_notification_from_event_if(
188                    room_id,
189                    &push_condition_room_ctx,
190                    event,
191                    Action::should_notify,
192                );
193            }
194        }
195
196        Ok(())
197    }
198}
199
200fn collect<'a, I, T>(raw_events: I) -> (Vec<Raw<T>>, Vec<T>)
201where
202    I: IntoIterator<Item = &'a Raw<T>>,
203    T: Deserialize<'a> + 'a,
204{
205    raw_events
206        .into_iter()
207        .filter_map(|raw_event| match raw_event.deserialize() {
208            Ok(event) => Some((raw_event.clone(), event)),
209            Err(e) => {
210                warn!("Couldn't deserialize stripped state event: {e}");
211                None
212            }
213        })
214        .unzip()
215}
216
217#[cfg(test)]
218mod tests {
219    use matrix_sdk_test::{
220        async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
221        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
222    };
223    use ruma::{event_id, user_id};
224
225    use crate::test_utils::logged_in_base_client;
226
227    #[async_test]
228    async fn test_state_events_after_sync() {
229        // Given a room
230        let user_id = user_id!("@u:u.to");
231
232        let client = logged_in_base_client(Some(user_id)).await;
233        let mut sync_builder = SyncResponseBuilder::new();
234
235        let room_name = EventFactory::new()
236            .sender(user_id)
237            .room_topic("this is the test topic in the timeline")
238            .event_id(event_id!("$2"))
239            .into_raw_sync();
240
241        let response = sync_builder
242            .add_joined_room(
243                JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
244                    .add_timeline_event(room_name)
245                    .add_state_event(StateTestEvent::PowerLevels),
246            )
247            .build_sync_response();
248        client.receive_sync_response(response).await.unwrap();
249
250        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Just-created room not found!");
251
252        // ensure that we have the power levels
253        assert!(room.power_levels().await.is_ok());
254
255        // ensure that we have the topic
256        assert_eq!(room.topic().unwrap(), "this is the test topic in the timeline");
257    }
258}