Skip to main content

matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, timer};
28#[cfg(feature = "e2e-encryption")]
29use matrix_sdk_crypto::{
30    CollectStrategy, DecryptionSettings, EncryptionSettings, OlmError, OlmMachine,
31    TrustRequirement, store::DynCryptoStore, store::types::RoomPendingKeyBundleDetails,
32    types::requests::ToDeviceRequest,
33};
34#[cfg(doc)]
35use ruma::DeviceId;
36#[cfg(feature = "e2e-encryption")]
37use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
38use ruma::{
39    OwnedRoomId, OwnedUserId, RoomId, UserId,
40    api::client::{self as api, sync::sync_events::v5},
41    events::{
42        StateEvent, StateEventType,
43        ignored_user_list::IgnoredUserListEventContent,
44        push_rules::{PushRulesEvent, PushRulesEventContent},
45        room::member::SyncRoomMemberEvent,
46    },
47    push::Ruleset,
48    time::Instant,
49};
50use tokio::sync::{Mutex, broadcast};
51#[cfg(feature = "e2e-encryption")]
52use tokio::sync::{RwLock, RwLockReadGuard};
53use tracing::{Level, debug, enabled, info, instrument, warn};
54
55#[cfg(feature = "e2e-encryption")]
56use crate::RoomMemberships;
57use crate::{
58    RoomStateFilter, SessionMeta,
59    deserialized_responses::DisplayName,
60    error::{Error, Result},
61    event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
62    media::store::MediaStoreLock,
63    response_processors::{self as processors, Context},
64    room::{
65        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
66    },
67    store::{
68        BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult, RoomLoadSettings,
69        StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, StoreConfig,
70        ambiguity_map::AmbiguityCache,
71    },
72    sync::{RoomUpdates, SyncResponse},
73};
74
75/// A no (network) IO client implementation.
76///
77/// This client is a state machine that receives responses and events and
78/// accordingly updates its state. It is not designed to be used directly, but
79/// rather through `matrix_sdk::Client`.
80///
81/// ```rust
82/// use matrix_sdk_base::{BaseClient, ThreadingSupport, store::StoreConfig};
83/// use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
84///
85/// let client = BaseClient::new(
86///     StoreConfig::new(CrossProcessLockConfig::multi_process(
87///         "cross-process-holder-name".to_owned(),
88///     )),
89///     ThreadingSupport::Disabled,
90/// );
91/// ```
92#[derive(Clone)]
93pub struct BaseClient {
94    /// The state store.
95    pub(crate) state_store: BaseStateStore,
96
97    /// The store used by the event cache.
98    event_cache_store: EventCacheStoreLock,
99
100    /// The store used by the media cache.
101    media_store: MediaStoreLock,
102
103    /// The store used for encryption.
104    ///
105    /// This field is only meant to be used for `OlmMachine` initialization.
106    /// All operations on it happen inside the `OlmMachine`.
107    #[cfg(feature = "e2e-encryption")]
108    crypto_store: Arc<DynCryptoStore>,
109
110    /// The olm-machine that is created once the
111    /// [`SessionMeta`][crate::session::SessionMeta] is set via
112    /// [`BaseClient::activate`]
113    #[cfg(feature = "e2e-encryption")]
114    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
115
116    /// Observable of when a user is ignored/unignored.
117    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
118
119    /// The strategy to use for picking recipient devices, when sending an
120    /// encrypted message.
121    #[cfg(feature = "e2e-encryption")]
122    pub room_key_recipient_strategy: CollectStrategy,
123
124    /// The settings to use for decrypting events.
125    #[cfg(feature = "e2e-encryption")]
126    pub decryption_settings: DecryptionSettings,
127
128    /// If the client should handle verification events received when syncing.
129    #[cfg(feature = "e2e-encryption")]
130    pub handle_verification_events: bool,
131
132    /// Whether the client supports threads or not.
133    pub threading_support: ThreadingSupport,
134}
135
136#[cfg(not(tarpaulin_include))]
137impl fmt::Debug for BaseClient {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        f.debug_struct("BaseClient")
140            .field("session_meta", &self.state_store.session_meta())
141            .field("sync_token", &self.state_store.sync_token)
142            .finish_non_exhaustive()
143    }
144}
145
146/// Whether this client instance supports threading or not. Currently used to
147/// determine how the client handles read receipts and unread count computations
148/// on the base SDK level.
149///
150/// Timelines on the other hand have a separate `TimelineFocus`
151/// `hide_threaded_events` associated value that can be used to hide threaded
152/// events but also to enable threaded read receipt sending. This is because
153/// certain timeline instances should ignore threading no matter what's defined
154/// at the client level. One such example are media filtered timelines which
155/// should contain all the room's media no matter what thread its in (unless
156/// explicitly opted into).
157#[derive(Clone, Copy, Debug)]
158pub enum ThreadingSupport {
159    /// Threading enabled.
160    Enabled {
161        /// Enable client-wide thread subscriptions support (MSC4306 / MSC4308).
162        ///
163        /// This may cause filtering out of thread subscriptions, and loading
164        /// the thread subscriptions via the sliding sync extension,
165        /// when the room list service is being used.
166        with_subscriptions: bool,
167    },
168    /// Threading disabled.
169    Disabled,
170}
171
172impl BaseClient {
173    /// Create a new client.
174    ///
175    /// # Arguments
176    ///
177    /// * `config` - the configuration for the stores (state store, event cache
178    ///   store and crypto store).
179    pub fn new(config: StoreConfig, threading_support: ThreadingSupport) -> Self {
180        let store = BaseStateStore::new(config.state_store);
181
182        BaseClient {
183            state_store: store,
184            event_cache_store: config.event_cache_store,
185            media_store: config.media_store,
186            #[cfg(feature = "e2e-encryption")]
187            crypto_store: config.crypto_store,
188            #[cfg(feature = "e2e-encryption")]
189            olm_machine: Default::default(),
190            ignore_user_list_changes: Default::default(),
191            #[cfg(feature = "e2e-encryption")]
192            room_key_recipient_strategy: Default::default(),
193            #[cfg(feature = "e2e-encryption")]
194            decryption_settings: DecryptionSettings {
195                sender_device_trust_requirement: TrustRequirement::Untrusted,
196            },
197            #[cfg(feature = "e2e-encryption")]
198            handle_verification_events: true,
199            threading_support,
200        }
201    }
202
203    /// Clones the current base client to use the same crypto store but a
204    /// different, in-memory store config, and resets transient state.
205    #[cfg(feature = "e2e-encryption")]
206    pub async fn clone_with_in_memory_state_store(
207        &self,
208        cross_process_mode: CrossProcessLockConfig,
209        handle_verification_events: bool,
210    ) -> Result<Self> {
211        let config = StoreConfig::new(cross_process_mode).state_store(MemoryStore::new());
212        let config = config.crypto_store(self.crypto_store.clone());
213
214        let copy = Self {
215            state_store: BaseStateStore::new(config.state_store),
216            event_cache_store: config.event_cache_store,
217            media_store: config.media_store,
218            // We copy the crypto store as well as the `OlmMachine` for two reasons:
219            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
220            // 2. We need to ensure that the parent and child use the same data and caches inside
221            //    the `OlmMachine` so the various ratchets and places where new randomness gets
222            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
223            //    or Olm sessions when they encrypt or decrypt messages.
224            crypto_store: self.crypto_store.clone(),
225            olm_machine: self.olm_machine.clone(),
226            ignore_user_list_changes: Default::default(),
227            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
228            decryption_settings: self.decryption_settings.clone(),
229            handle_verification_events,
230            threading_support: self.threading_support,
231        };
232
233        copy.state_store.derive_from_other(&self.state_store).await?;
234
235        Ok(copy)
236    }
237
238    /// Clones the current base client to use the same crypto store but a
239    /// different, in-memory store config, and resets transient state.
240    #[cfg(not(feature = "e2e-encryption"))]
241    #[allow(clippy::unused_async)]
242    pub async fn clone_with_in_memory_state_store(
243        &self,
244        cross_process_store_config: CrossProcessLockConfig,
245        _handle_verification_events: bool,
246    ) -> Result<Self> {
247        let config = StoreConfig::new(cross_process_store_config).state_store(MemoryStore::new());
248        Ok(Self::new(config, ThreadingSupport::Disabled))
249    }
250
251    /// Get the session meta information.
252    ///
253    /// If the client is currently logged in, this will return a
254    /// [`SessionMeta`] object which contains the user ID and device ID.
255    /// Otherwise it returns `None`.
256    pub fn session_meta(&self) -> Option<&SessionMeta> {
257        self.state_store.session_meta()
258    }
259
260    /// Get all the rooms this client knows about.
261    pub fn rooms(&self) -> Vec<Room> {
262        self.state_store.rooms()
263    }
264
265    /// Get all the rooms this client knows about, filtered by room state.
266    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
267        self.state_store.rooms_filtered(filter)
268    }
269
270    /// Get a stream of all the rooms changes, in addition to the existing
271    /// rooms.
272    pub fn rooms_stream(
273        &self,
274    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
275        self.state_store.rooms_stream()
276    }
277
278    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
279    /// yet in the store
280    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
281        self.state_store.get_or_create_room(room_id, room_state)
282    }
283
284    /// Get a reference to the state store.
285    pub fn state_store(&self) -> &DynStateStore {
286        self.state_store.deref()
287    }
288
289    /// Get a reference to the event cache store.
290    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
291        &self.event_cache_store
292    }
293
294    /// Get a reference to the media store.
295    pub fn media_store(&self) -> &MediaStoreLock {
296        &self.media_store
297    }
298
299    /// Check whether the client has been activated.
300    ///
301    /// See [`BaseClient::activate`] to know what it means.
302    pub fn is_active(&self) -> bool {
303        self.state_store.session_meta().is_some()
304    }
305
306    /// Activate the client.
307    ///
308    /// A client is considered active when:
309    ///
310    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
311    /// 2. Has loaded cached data from storage,
312    /// 3. If encryption is enabled, it also initialized or restored its
313    ///    `OlmMachine`.
314    ///
315    /// # Arguments
316    ///
317    /// * `session_meta` - The meta of a session that the user already has from
318    ///   a previous login call.
319    ///
320    /// * `custom_account` - A custom
321    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
322    ///   identity and one-time keys of this [`BaseClient`]. If no account is
323    ///   provided, a new default one or one from the store will be used. If an
324    ///   account is provided and one already exists in the store for this
325    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
326    ///   useful if one wishes to create identity keys before knowing the
327    ///   user/device IDs, e.g., to use the identity key as the device ID.
328    ///
329    /// * `room_load_settings` — Specify how many rooms must be restored; use
330    ///   `::default()` if you don't know which value to pick.
331    ///
332    /// # Panics
333    ///
334    /// This method panics if it is called twice.
335    ///
336    /// [`UserId`]: ruma::UserId
337    pub async fn activate(
338        &self,
339        session_meta: SessionMeta,
340        room_load_settings: RoomLoadSettings,
341        #[cfg(feature = "e2e-encryption")] custom_account: Option<
342            crate::crypto::vodozemac::olm::Account,
343        >,
344    ) -> Result<()> {
345        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
346
347        self.state_store.load_rooms(&session_meta.user_id, room_load_settings).await?;
348        self.state_store.load_sync_token().await?;
349        self.state_store.set_session_meta(session_meta);
350
351        #[cfg(feature = "e2e-encryption")]
352        self.regenerate_olm(custom_account).await?;
353
354        Ok(())
355    }
356
357    /// Recreate an `OlmMachine` from scratch.
358    ///
359    /// In particular, this will clear all its caches.
360    #[cfg(feature = "e2e-encryption")]
361    pub async fn regenerate_olm(
362        &self,
363        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
364    ) -> Result<()> {
365        tracing::debug!("regenerating OlmMachine");
366        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
367
368        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
369        // because we suspect it has stale data.
370        let olm_machine = OlmMachine::with_store(
371            &session_meta.user_id,
372            &session_meta.device_id,
373            self.crypto_store.clone(),
374            custom_account,
375        )
376        .await
377        .map_err(OlmError::from)?;
378
379        *self.olm_machine.write().await = Some(olm_machine);
380        Ok(())
381    }
382
383    /// Get the current, if any, sync token of the client.
384    /// This will be None if the client didn't sync at least once.
385    pub async fn sync_token(&self) -> Option<String> {
386        self.state_store.sync_token.read().await.clone()
387    }
388
389    /// User has knocked on a room.
390    ///
391    /// Update the internal and cached state accordingly. Return the final Room.
392    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
393        let room = self.state_store.get_or_create_room(room_id, RoomState::Knocked);
394
395        if room.state() != RoomState::Knocked {
396            let _state_store_lock = self.state_store_lock().lock().await;
397
398            let mut room_info = room.clone_info();
399            room_info.mark_as_knocked();
400            room_info.mark_state_partially_synced();
401            room_info.mark_members_missing(); // the own member event changed
402
403            // We are no longer joined to the room, so the invite acceptance details are no
404            // longer relevant.
405            #[cfg(feature = "e2e-encryption")]
406            if let Some(olm_machine) = self.olm_machine().await.as_ref() {
407                olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
408            }
409
410            let mut changes = StateChanges::default();
411            changes.add_room(room_info.clone());
412            self.state_store.save_changes(&changes).await?; // Update the store
413            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
414        }
415
416        Ok(room)
417    }
418
419    /// The user has joined a room using this specific client.
420    ///
421    /// This method should be called if the user accepts an invite or if they
422    /// join a public room.
423    ///
424    /// The method will create a [`Room`] object if one does not exist yet and
425    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
426    /// object will be persisted in the cache. Please note that the [`Room`]
427    /// will be a stub until a sync has been received with the full room
428    /// state using [`BaseClient::receive_sync_response`].
429    ///
430    /// Update the internal and cached state accordingly. Return the final Room.
431    ///
432    /// # Arguments
433    ///
434    /// * `room_id` - The unique ID identifying the joined room.
435    /// * `inviter` - When joining this room in response to an invitation, the
436    ///   inviter should be recorded before sending the join request to the
437    ///   server. Providing the inviter here ensures that the
438    ///   [`RoomPendingKeyBundleDetails`] are stored for this room.
439    ///
440    /// # Examples
441    ///
442    /// ```rust
443    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport};
444    /// # use ruma::{OwnedRoomId, OwnedUserId, RoomId};
445    /// use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
446    /// # async {
447    /// # let client = BaseClient::new(StoreConfig::new(CrossProcessLockConfig::multi_process("example")), ThreadingSupport::Disabled);
448    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
449    /// # async fn maybe_get_inviter(room_id: &RoomId) -> anyhow::Result<Option<OwnedUserId>> { todo!() }
450    /// # let room_id: &RoomId = todo!();
451    /// let maybe_inviter = maybe_get_inviter(room_id).await?;
452    /// let room_id = send_join_request().await?;
453    /// let room = client.room_joined(&room_id, maybe_inviter).await?;
454    ///
455    /// assert_eq!(room.state(), RoomState::Joined);
456    /// # matrix_sdk_test::TestResult::Ok(()) };
457    /// ```
458    pub async fn room_joined(
459        &self,
460        room_id: &RoomId,
461        inviter: Option<OwnedUserId>,
462    ) -> Result<Room> {
463        let room = self.state_store.get_or_create_room(room_id, RoomState::Joined);
464
465        // If the state isn't `RoomState::Joined` then this means that we knew about
466        // this room before. Let's modify the existing state now.
467        if room.state() != RoomState::Joined {
468            let _state_store_lock = self.state_store_lock().lock().await;
469
470            let mut room_info = room.clone_info();
471
472            room_info.mark_as_joined();
473            room_info.mark_state_partially_synced();
474            room_info.mark_members_missing(); // the own member event changed
475
476            #[cfg(feature = "e2e-encryption")]
477            {
478                // If our previous state was an invite and we're now in the joined state, this
479                // means that the user has explicitly accepted an invite. Let's
480                // remember some details about the invite.
481                //
482                // This is somewhat of a workaround for our lack of cryptographic membership.
483                // Later on we will decide if historic room keys should be accepted
484                // based on this info. If a user has accepted an invite and we receive a room
485                // key bundle shortly after, we might accept it. If we don't do
486                // this, the homeserver could trick us into accepting any historic room key
487                // bundle.
488                let previous_state = room.state();
489                if previous_state == RoomState::Invited
490                    && let Some(inviter) = inviter
491                    && let Some(olm_machine) = self.olm_machine().await.as_ref()
492                {
493                    olm_machine.store().store_room_pending_key_bundle(room_id, &inviter).await?
494                }
495            }
496            #[cfg(not(feature = "e2e-encryption"))]
497            {
498                // suppress unused argument warning
499                let _ = inviter;
500            }
501
502            let mut changes = StateChanges::default();
503            changes.add_room(room_info.clone());
504
505            self.state_store.save_changes(&changes).await?; // Update the store
506
507            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
508        }
509
510        Ok(room)
511    }
512
513    /// User has left a room.
514    ///
515    /// Update the internal and cached state accordingly.
516    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
517        let room = self.state_store.get_or_create_room(room_id, RoomState::Left);
518
519        if room.state() != RoomState::Left {
520            let _state_store_lock = self.state_store_lock().lock().await;
521
522            let mut room_info = room.clone_info();
523            room_info.mark_as_left();
524            room_info.mark_state_partially_synced();
525            room_info.mark_members_missing(); // the own member event changed
526
527            // We are no longer joined to the room, so the invite acceptance details are no
528            // longer relevant.
529            #[cfg(feature = "e2e-encryption")]
530            if let Some(olm_machine) = self.olm_machine().await.as_ref() {
531                olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
532            }
533
534            let mut changes = StateChanges::default();
535            changes.add_room(room_info.clone());
536            self.state_store.save_changes(&changes).await?; // Update the store
537            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
538        }
539
540        Ok(())
541    }
542
543    /// Get a lock to the state store, with an exclusive access.
544    ///
545    /// It doesn't give an access to the state store itself. It's rather a lock
546    /// to synchronise all accesses to the state store.
547    pub fn state_store_lock(&self) -> &Mutex<()> {
548        self.state_store.lock()
549    }
550
551    /// Receive a response from a sync call.
552    ///
553    /// # Arguments
554    ///
555    /// * `response` - The response that we received after a successful sync.
556    #[instrument(skip_all)]
557    pub async fn receive_sync_response(
558        &self,
559        response: api::sync::sync_events::v3::Response,
560    ) -> Result<SyncResponse> {
561        self.receive_sync_response_with_requested_required_states(
562            response,
563            &RequestedRequiredStates::default(),
564        )
565        .await
566    }
567
568    /// Receive a response from a sync call, with the requested required state
569    /// events.
570    ///
571    /// # Arguments
572    ///
573    /// * `response` - The response that we received after a successful sync.
574    /// * `requested_required_states` - The requested required state events.
575    pub async fn receive_sync_response_with_requested_required_states(
576        &self,
577        response: api::sync::sync_events::v3::Response,
578        requested_required_states: &RequestedRequiredStates,
579    ) -> Result<SyncResponse> {
580        // The server might respond multiple times with the same sync token, in
581        // that case we already received this response and there's nothing to
582        // do.
583        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
584            info!("Got the same sync response twice");
585            return Ok(SyncResponse::default());
586        }
587
588        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
589
590        let user_id = self
591            .session_meta()
592            .expect("Sync shouldn't run without an authenticated user")
593            .user_id
594            .to_owned();
595
596        #[cfg(feature = "e2e-encryption")]
597        let olm_machine = self.olm_machine().await;
598
599        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
600
601        #[cfg(feature = "e2e-encryption")]
602        let processors::e2ee::to_device::Output { processed_to_device_events: to_device } =
603            processors::e2ee::to_device::from_sync_v2(
604                &response,
605                olm_machine.as_ref(),
606                &self.decryption_settings,
607            )
608            .await?;
609
610        #[cfg(not(feature = "e2e-encryption"))]
611        let to_device = response
612            .to_device
613            .events
614            .into_iter()
615            .map(|raw| {
616                use matrix_sdk_common::deserialized_responses::{
617                    ProcessedToDeviceEvent, ToDeviceUnableToDecryptInfo,
618                    ToDeviceUnableToDecryptReason,
619                };
620
621                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
622                    if event_type == "m.room.encrypted" {
623                        ProcessedToDeviceEvent::UnableToDecrypt {
624                            encrypted_event: raw,
625                            utd_info: ToDeviceUnableToDecryptInfo {
626                                reason: ToDeviceUnableToDecryptReason::EncryptionIsDisabled,
627                            },
628                        }
629                    } else {
630                        ProcessedToDeviceEvent::PlainText(raw)
631                    }
632                } else {
633                    // Exclude events with no type
634                    ProcessedToDeviceEvent::Invalid(raw)
635                }
636            })
637            .collect();
638
639        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
640
641        let global_account_data_processor =
642            processors::account_data::global(&response.account_data.events);
643
644        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
645
646        let mut room_updates = RoomUpdates::default();
647        let mut notifications = Default::default();
648
649        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
650            BTreeMap::new();
651
652        #[cfg(feature = "e2e-encryption")]
653        let e2ee_context = processors::e2ee::E2EE::new(
654            olm_machine.as_ref(),
655            &self.decryption_settings,
656            self.handle_verification_events,
657        );
658
659        for (room_id, joined_room) in response.rooms.join {
660            let joined_room_update = processors::room::sync_v2::update_joined_room(
661                &mut context,
662                processors::room::RoomCreationData::new(
663                    &room_id,
664                    requested_required_states,
665                    &mut ambiguity_cache,
666                ),
667                joined_room,
668                &mut updated_members_in_room,
669                processors::notification::Notification::new(
670                    &push_rules,
671                    &mut notifications,
672                    &self.state_store,
673                ),
674                #[cfg(feature = "e2e-encryption")]
675                &e2ee_context,
676            )
677            .await?;
678
679            room_updates.joined.insert(room_id, joined_room_update);
680        }
681
682        for (room_id, left_room) in response.rooms.leave {
683            let left_room_update = processors::room::sync_v2::update_left_room(
684                &mut context,
685                processors::room::RoomCreationData::new(
686                    &room_id,
687                    requested_required_states,
688                    &mut ambiguity_cache,
689                ),
690                left_room,
691                processors::notification::Notification::new(
692                    &push_rules,
693                    &mut notifications,
694                    &self.state_store,
695                ),
696                #[cfg(feature = "e2e-encryption")]
697                &e2ee_context,
698            )
699            .await?;
700
701            room_updates.left.insert(room_id, left_room_update);
702        }
703
704        for (room_id, invited_room) in response.rooms.invite {
705            let invited_room_update = processors::room::sync_v2::update_invited_room(
706                &mut context,
707                &room_id,
708                &user_id,
709                invited_room,
710                processors::notification::Notification::new(
711                    &push_rules,
712                    &mut notifications,
713                    &self.state_store,
714                ),
715                #[cfg(feature = "e2e-encryption")]
716                &e2ee_context,
717            )
718            .await?;
719
720            room_updates.invited.insert(room_id, invited_room_update);
721        }
722
723        for (room_id, knocked_room) in response.rooms.knock {
724            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
725                &mut context,
726                &room_id,
727                &user_id,
728                knocked_room,
729                processors::notification::Notification::new(
730                    &push_rules,
731                    &mut notifications,
732                    &self.state_store,
733                ),
734                #[cfg(feature = "e2e-encryption")]
735                &e2ee_context,
736            )
737            .await?;
738
739            room_updates.knocked.insert(room_id, knocked_room_update);
740        }
741
742        global_account_data_processor.apply(&mut context, &self.state_store).await;
743
744        context.state_changes.presence = response
745            .presence
746            .events
747            .iter()
748            .filter_map(|e| {
749                let event = e.deserialize().ok()?;
750                Some((event.sender, e.clone()))
751            })
752            .collect();
753
754        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
755
756        {
757            let _state_store_lock = self.state_store_lock().lock().await;
758
759            processors::changes::save_and_apply(
760                context,
761                &self.state_store,
762                &self.ignore_user_list_changes,
763                Some(response.next_batch.clone()),
764            )
765            .await?;
766        }
767
768        let mut context = Context::default();
769
770        // Now that all the rooms information have been saved, update the display name
771        // of the updated rooms (which relies on information stored in the database).
772        processors::room::display_name::update_for_rooms(
773            &mut context,
774            &room_updates,
775            &self.state_store,
776        )
777        .await;
778
779        // Save the new display name updates if any.
780        {
781            let _state_store_lock = self.state_store_lock().lock().await;
782
783            processors::changes::save_only(context, &self.state_store).await?;
784        }
785
786        for (room_id, member_ids) in updated_members_in_room {
787            if let Some(room) = self.get_room(&room_id) {
788                let _ =
789                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
790            }
791        }
792
793        if enabled!(Level::INFO) {
794            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
795        }
796
797        let response = SyncResponse {
798            rooms: room_updates,
799            presence: response.presence.events,
800            account_data: response.account_data.events,
801            to_device,
802            notifications,
803        };
804
805        Ok(response)
806    }
807
808    /// Receive a get member events response and convert it to a deserialized
809    /// `MembersResponse`
810    ///
811    /// This client-server request must be made without filters to make sure all
812    /// members are received. Otherwise, an error is returned.
813    ///
814    /// # Arguments
815    ///
816    /// * `room_id` - The room id this response belongs to.
817    ///
818    /// * `response` - The raw response that was received from the server.
819    #[instrument(skip_all, fields(?room_id))]
820    pub async fn receive_all_members(
821        &self,
822        room_id: &RoomId,
823        request: &api::membership::get_member_events::v3::Request,
824        response: &api::membership::get_member_events::v3::Response,
825    ) -> Result<()> {
826        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
827        {
828            // This function assumes all members are loaded at once to optimise how display
829            // name disambiguation works. Using it with partial member list results
830            // would produce incorrect disambiguated display name entries
831            return Err(Error::InvalidReceiveMembersParameters);
832        }
833
834        let Some(room) = self.state_store.room(room_id) else {
835            // The room is unknown to us: leave early.
836            return Ok(());
837        };
838
839        let mut chunk = Vec::with_capacity(response.chunk.len());
840        let mut context = Context::default();
841
842        #[cfg(feature = "e2e-encryption")]
843        let mut user_ids = BTreeSet::new();
844
845        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
846
847        for raw_event in &response.chunk {
848            let member = match raw_event.deserialize() {
849                Ok(ev) => ev,
850                Err(e) => {
851                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
852                    debug!(event_id, "Failed to deserialize member event: {e}");
853                    continue;
854                }
855            };
856
857            // TODO: All the actions in this loop used to be done only when the membership
858            // event was not in the store before. This was changed with the new room API,
859            // because e.g. leaving a room makes members events outdated and they need to be
860            // fetched by `members`. Therefore, they need to be overwritten here, even
861            // if they exist.
862            // However, this makes a new problem occur where setting the member events here
863            // potentially races with the sync.
864            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
865
866            #[cfg(feature = "e2e-encryption")]
867            match member.membership() {
868                MembershipState::Join | MembershipState::Invite => {
869                    user_ids.insert(member.state_key().to_owned());
870                }
871                _ => (),
872            }
873
874            if let StateEvent::Original(e) = &member
875                && let Some(d) = &e.content.displayname
876            {
877                let display_name = DisplayName::new(d);
878                ambiguity_map.entry(display_name).or_default().insert(member.state_key().clone());
879            }
880
881            let sync_member: SyncRoomMemberEvent = member.clone().into();
882            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
883
884            context
885                .state_changes
886                .state
887                .entry(room_id.to_owned())
888                .or_default()
889                .entry(member.event_type())
890                .or_default()
891                .insert(member.state_key().to_string(), raw_event.clone().cast());
892            chunk.push(member);
893        }
894
895        #[cfg(feature = "e2e-encryption")]
896        processors::e2ee::tracked_users::update(
897            self.olm_machine().await.as_ref(),
898            room.encryption_state(),
899            &user_ids,
900        )
901        .await?;
902
903        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
904
905        {
906            let _state_store_lock = self.state_store_lock().lock().await;
907
908            let mut room_info = room.clone_info();
909            room_info.mark_members_synced();
910            context.state_changes.add_room(room_info);
911
912            processors::changes::save_and_apply(
913                context,
914                &self.state_store,
915                &self.ignore_user_list_changes,
916                None,
917            )
918            .await?;
919        }
920
921        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
922
923        #[cfg(feature = "e2e-encryption")]
924        if let Some(olm) = self.olm_machine().await.as_ref() {
925            // With the introduction of MSC4268, it is no longer sufficient to check for
926            // changes to session recipients when we send a message, since we may miss
927            // join/leave pairs in our view of the room state. Instead, we should rotate
928            // the room key whenever we fully reload the member list as a precaution.
929            tracing::debug!("Rotating room key due to full member list reload");
930            if let Err(e) = olm.discard_room_key(room_id).await {
931                tracing::warn!("Error discarding room key: {e:?}");
932            }
933        }
934
935        Ok(())
936    }
937
938    /// Receive a successful filter upload response, the filter id will be
939    /// stored under the given name in the store.
940    ///
941    /// The filter id can later be retrieved with the [`get_filter`] method.
942    ///
943    ///
944    /// # Arguments
945    ///
946    /// * `filter_name` - The name that should be used to persist the filter id
947    ///   in the store.
948    ///
949    /// * `response` - The successful filter upload response containing the
950    ///   filter id.
951    ///
952    /// [`get_filter`]: #method.get_filter
953    pub async fn receive_filter_upload(
954        &self,
955        filter_name: &str,
956        response: &api::filter::create_filter::v3::Response,
957    ) -> Result<()> {
958        Ok(self
959            .state_store
960            .set_kv_data(
961                StateStoreDataKey::Filter(filter_name),
962                StateStoreDataValue::Filter(response.filter_id.clone()),
963            )
964            .await?)
965    }
966
967    /// Get the filter id of a previously uploaded filter.
968    ///
969    /// *Note*: A filter will first need to be uploaded and persisted using
970    /// [`receive_filter_upload`].
971    ///
972    /// # Arguments
973    ///
974    /// * `filter_name` - The name of the filter that was previously used to
975    ///   persist the filter.
976    ///
977    /// [`receive_filter_upload`]: #method.receive_filter_upload
978    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
979        let filter = self
980            .state_store
981            .get_kv_data(StateStoreDataKey::Filter(filter_name))
982            .await?
983            .map(|d| d.into_filter().expect("State store data not a filter"));
984
985        Ok(filter)
986    }
987
988    /// Get a to-device request that will share a room key with users in a room.
989    #[cfg(feature = "e2e-encryption")]
990    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
991        match self.olm_machine().await.as_ref() {
992            Some(o) => {
993                let Some(room) = self.get_room(room_id) else {
994                    return Err(Error::InsufficientData);
995                };
996
997                let history_visibility = room.history_visibility_or_default();
998                let Some(room_encryption_event) = room.encryption_settings() else {
999                    return Err(Error::EncryptionNotEnabled);
1000                };
1001
1002                // Don't share the group session with members that are invited
1003                // if the history visibility is set to `Joined`
1004                let filter = if history_visibility == HistoryVisibility::Joined {
1005                    RoomMemberships::JOIN
1006                } else {
1007                    RoomMemberships::ACTIVE
1008                };
1009
1010                let members = self.state_store.get_user_ids(room_id, filter).await?;
1011
1012                let Some(settings) = EncryptionSettings::from_possibly_redacted(
1013                    room_encryption_event,
1014                    history_visibility,
1015                    self.room_key_recipient_strategy.clone(),
1016                ) else {
1017                    return Err(Error::EncryptionNotEnabled);
1018                };
1019
1020                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1021            }
1022            None => panic!("Olm machine wasn't started"),
1023        }
1024    }
1025
1026    /// Get the room with the given room id.
1027    ///
1028    /// # Arguments
1029    ///
1030    /// * `room_id` - The id of the room that should be fetched.
1031    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1032        self.state_store.room(room_id)
1033    }
1034
1035    /// Forget the room with the given room ID.
1036    ///
1037    /// The room will be dropped from the room list and the store.
1038    ///
1039    /// # Arguments
1040    ///
1041    /// * `room_id` - The id of the room that should be forgotten.
1042    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1043        // Forget the room in the state store.
1044        self.state_store.forget_room(room_id).await?;
1045
1046        // Remove the room in the event cache store too.
1047        match self.event_cache_store().lock().await? {
1048            // If the lock is clear, we can do the operation as expected.
1049            // If the lock is dirty, we can ignore to refresh the state, we just need to remove a
1050            // room. Also, we must not mark the lock as non-dirty because other operations may be
1051            // critical and may need to refresh the `EventCache`' state.
1052            EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
1053                guard.remove_room(room_id).await?
1054            }
1055        }
1056
1057        Ok(())
1058    }
1059
1060    /// Get the olm machine.
1061    #[cfg(feature = "e2e-encryption")]
1062    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1063        self.olm_machine.read().await
1064    }
1065
1066    /// Get the push rules.
1067    ///
1068    /// Gets the push rules previously processed, otherwise get them from the
1069    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1070    /// is logged in.
1071    pub(crate) async fn get_push_rules(
1072        &self,
1073        global_account_data_processor: &processors::account_data::Global,
1074    ) -> Result<Ruleset> {
1075        let _timer = timer!(Level::TRACE, "get_push_rules");
1076        if let Some(event) = global_account_data_processor
1077            .push_rules()
1078            .and_then(|ev| ev.deserialize_as_unchecked::<PushRulesEvent>().ok())
1079        {
1080            Ok(event.content.global)
1081        } else if let Some(event) = self
1082            .state_store
1083            .get_account_data_event_static::<PushRulesEventContent>()
1084            .await?
1085            .and_then(|ev| ev.deserialize().ok())
1086        {
1087            Ok(event.content.global)
1088        } else if let Some(session_meta) = self.state_store.session_meta() {
1089            Ok(Ruleset::server_default(&session_meta.user_id))
1090        } else {
1091            Ok(Ruleset::new())
1092        }
1093    }
1094
1095    /// Returns a subscriber that publishes an event every time the ignore user
1096    /// list changes
1097    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1098        self.ignore_user_list_changes.subscribe()
1099    }
1100
1101    /// Returns a new receiver that gets future room info notable updates.
1102    ///
1103    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1104    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1105        self.state_store.room_info_notable_update_sender.subscribe()
1106    }
1107
1108    /// Checks whether the provided `user_id` belongs to an ignored user.
1109    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1110        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1111        {
1112            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1113                Ok(current_ignored_user_list) => {
1114                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1115                }
1116                Err(error) => {
1117                    warn!(?error, "Failed to deserialize the ignored user list event");
1118                    false
1119                }
1120            },
1121            Ok(None) => false,
1122            Err(error) => {
1123                warn!(?error, "Could not get the ignored user list from the state store");
1124                false
1125            }
1126        }
1127    }
1128
1129    /// Check the record of whether we are waiting for an [MSC4268] key bundle
1130    /// for the given room.
1131    ///
1132    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1133    #[cfg(feature = "e2e-encryption")]
1134    pub async fn get_pending_key_bundle_details_for_room(
1135        &self,
1136        room_id: &RoomId,
1137    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
1138        let result = match self.olm_machine().await.as_ref() {
1139            Some(machine) => {
1140                machine.store().get_pending_key_bundle_details_for_room(room_id).await?
1141            }
1142            None => None,
1143        };
1144        Ok(result)
1145    }
1146}
1147
1148/// Represent the `required_state` values sent by a sync request.
1149///
1150/// This is useful to track what state events have been requested when handling
1151/// a response.
1152///
1153/// For example, if a sync requests the `m.room.encryption` state event, and the
1154/// server replies with nothing, if means the room **is not** encrypted. Without
1155/// knowing which state event was required by the sync, it is impossible to
1156/// interpret the absence of state event from the server as _the room's
1157/// encryption state is **not encrypted**_ or _the room's encryption state is
1158/// **unknown**_.
1159#[derive(Debug, Default)]
1160pub struct RequestedRequiredStates {
1161    default: Vec<(StateEventType, String)>,
1162    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1163}
1164
1165impl RequestedRequiredStates {
1166    /// Create a new `RequestedRequiredStates`.
1167    ///
1168    /// `default` represents the `required_state` value for all rooms.
1169    /// `for_rooms` is the `required_state` per room.
1170    pub fn new(
1171        default: Vec<(StateEventType, String)>,
1172        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1173    ) -> Self {
1174        Self { default, for_rooms }
1175    }
1176
1177    /// Get the `required_state` value for a specific room.
1178    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1179        self.for_rooms.get(room_id).unwrap_or(&self.default)
1180    }
1181}
1182
1183impl From<&v5::Request> for RequestedRequiredStates {
1184    fn from(request: &v5::Request) -> Self {
1185        // The following information is missing in the MSC4186 at the time of writing
1186        // (2025-03-12) but: the `required_state`s from all lists and from all room
1187        // subscriptions are combined by doing an union.
1188        //
1189        // Thus, we can do the same here, put the union in `default` and keep
1190        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1191        let mut default = BTreeSet::new();
1192
1193        for list in request.lists.values() {
1194            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1195        }
1196
1197        for room_subscription in request.room_subscriptions.values() {
1198            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1199        }
1200
1201        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1202    }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use std::collections::HashMap;
1208
1209    use assert_matches2::assert_let;
1210    #[cfg(feature = "e2e-encryption")]
1211    use assert_matches2::assert_matches;
1212    use futures_util::FutureExt as _;
1213    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1214    use matrix_sdk_test::{
1215        BOB, InvitedRoomBuilder, LeftRoomBuilder, SyncResponseBuilder, async_test,
1216        event_factory::EventFactory, ruma_response_from_json,
1217    };
1218    use ruma::{
1219        api::client::{self as api, sync::sync_events::v5},
1220        event_id,
1221        events::{StateEventType, room::member::MembershipState},
1222        room_id,
1223        serde::Raw,
1224        user_id,
1225    };
1226    use serde_json::{json, value::to_raw_value};
1227
1228    use super::{BaseClient, RequestedRequiredStates};
1229    use crate::{
1230        RoomDisplayName, RoomState, SessionMeta,
1231        client::ThreadingSupport,
1232        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1233        test_utils::logged_in_base_client,
1234    };
1235
1236    #[test]
1237    fn test_requested_required_states() {
1238        let room_id_0 = room_id!("!r0");
1239        let room_id_1 = room_id!("!r1");
1240
1241        let requested_required_states = RequestedRequiredStates::new(
1242            vec![(StateEventType::RoomAvatar, "".to_owned())],
1243            HashMap::from([(
1244                room_id_0.to_owned(),
1245                vec![
1246                    (StateEventType::RoomMember, "foo".to_owned()),
1247                    (StateEventType::RoomEncryption, "".to_owned()),
1248                ],
1249            )]),
1250        );
1251
1252        // A special set of state events exists for `room_id_0`.
1253        assert_eq!(
1254            requested_required_states.for_room(room_id_0),
1255            &[
1256                (StateEventType::RoomMember, "foo".to_owned()),
1257                (StateEventType::RoomEncryption, "".to_owned()),
1258            ]
1259        );
1260
1261        // No special list for `room_id_1`, it should return the defaults.
1262        assert_eq!(
1263            requested_required_states.for_room(room_id_1),
1264            &[(StateEventType::RoomAvatar, "".to_owned()),]
1265        );
1266    }
1267
1268    #[test]
1269    fn test_requested_required_states_from_sync_v5_request() {
1270        let room_id_0 = room_id!("!r0");
1271        let room_id_1 = room_id!("!r1");
1272
1273        // Empty request.
1274        let mut request = v5::Request::new();
1275
1276        {
1277            let requested_required_states = RequestedRequiredStates::from(&request);
1278
1279            assert!(requested_required_states.default.is_empty());
1280            assert!(requested_required_states.for_rooms.is_empty());
1281        }
1282
1283        // One list.
1284        request.lists.insert("foo".to_owned(), {
1285            let mut list = v5::request::List::default();
1286            list.room_details.required_state = vec![
1287                (StateEventType::RoomAvatar, "".to_owned()),
1288                (StateEventType::RoomEncryption, "".to_owned()),
1289            ];
1290
1291            list
1292        });
1293
1294        {
1295            let requested_required_states = RequestedRequiredStates::from(&request);
1296
1297            assert_eq!(
1298                requested_required_states.default,
1299                &[
1300                    (StateEventType::RoomAvatar, "".to_owned()),
1301                    (StateEventType::RoomEncryption, "".to_owned())
1302                ]
1303            );
1304            assert!(requested_required_states.for_rooms.is_empty());
1305        }
1306
1307        // Two lists.
1308        request.lists.insert("bar".to_owned(), {
1309            let mut list = v5::request::List::default();
1310            list.room_details.required_state = vec![
1311                (StateEventType::RoomEncryption, "".to_owned()),
1312                (StateEventType::RoomName, "".to_owned()),
1313            ];
1314
1315            list
1316        });
1317
1318        {
1319            let requested_required_states = RequestedRequiredStates::from(&request);
1320
1321            // Union of the state events.
1322            assert_eq!(
1323                requested_required_states.default,
1324                &[
1325                    (StateEventType::RoomAvatar, "".to_owned()),
1326                    (StateEventType::RoomEncryption, "".to_owned()),
1327                    (StateEventType::RoomName, "".to_owned()),
1328                ]
1329            );
1330            assert!(requested_required_states.for_rooms.is_empty());
1331        }
1332
1333        // One room subscription.
1334        request.room_subscriptions.insert(room_id_0.to_owned(), {
1335            let mut room_subscription = v5::request::RoomSubscription::default();
1336
1337            room_subscription.required_state = vec![
1338                (StateEventType::RoomJoinRules, "".to_owned()),
1339                (StateEventType::RoomEncryption, "".to_owned()),
1340            ];
1341
1342            room_subscription
1343        });
1344
1345        {
1346            let requested_required_states = RequestedRequiredStates::from(&request);
1347
1348            // Union of state events, all in `default`, still nothing in `for_rooms`.
1349            assert_eq!(
1350                requested_required_states.default,
1351                &[
1352                    (StateEventType::RoomAvatar, "".to_owned()),
1353                    (StateEventType::RoomEncryption, "".to_owned()),
1354                    (StateEventType::RoomJoinRules, "".to_owned()),
1355                    (StateEventType::RoomName, "".to_owned()),
1356                ]
1357            );
1358            assert!(requested_required_states.for_rooms.is_empty());
1359        }
1360
1361        // Two room subscriptions.
1362        request.room_subscriptions.insert(room_id_1.to_owned(), {
1363            let mut room_subscription = v5::request::RoomSubscription::default();
1364
1365            room_subscription.required_state = vec![
1366                (StateEventType::RoomName, "".to_owned()),
1367                (StateEventType::RoomTopic, "".to_owned()),
1368            ];
1369
1370            room_subscription
1371        });
1372
1373        {
1374            let requested_required_states = RequestedRequiredStates::from(&request);
1375
1376            // Union of state events, all in `default`, still nothing in `for_rooms`.
1377            assert_eq!(
1378                requested_required_states.default,
1379                &[
1380                    (StateEventType::RoomAvatar, "".to_owned()),
1381                    (StateEventType::RoomEncryption, "".to_owned()),
1382                    (StateEventType::RoomJoinRules, "".to_owned()),
1383                    (StateEventType::RoomName, "".to_owned()),
1384                    (StateEventType::RoomTopic, "".to_owned()),
1385                ]
1386            );
1387        }
1388    }
1389
1390    #[async_test]
1391    async fn test_invite_after_leaving() {
1392        let user_id = user_id!("@alice:example.org");
1393        let room_id = room_id!("!test:example.org");
1394
1395        let client = logged_in_base_client(Some(user_id)).await;
1396        let f = EventFactory::new();
1397
1398        let mut sync_builder = SyncResponseBuilder::new();
1399
1400        let response = sync_builder
1401            .add_left_room(
1402                LeftRoomBuilder::new(room_id).add_timeline_event(
1403                    EventFactory::new()
1404                        .member(user_id)
1405                        .membership(MembershipState::Leave)
1406                        .display_name("Alice")
1407                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1408                ),
1409            )
1410            .build_sync_response();
1411        client.receive_sync_response(response).await.unwrap();
1412        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1413
1414        let response = sync_builder
1415            .add_invited_room(
1416                InvitedRoomBuilder::new(room_id).add_state_event(
1417                    f.member(user_id)
1418                        .sender(user_id!("@example:example.org"))
1419                        .membership(MembershipState::Invite)
1420                        .display_name("Alice"),
1421                ),
1422            )
1423            .build_sync_response();
1424        client.receive_sync_response(response).await.unwrap();
1425        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1426    }
1427
1428    #[async_test]
1429    async fn test_invite_displayname() {
1430        let user_id = user_id!("@alice:example.org");
1431        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1432
1433        let client = logged_in_base_client(Some(user_id)).await;
1434
1435        let response = ruma_response_from_json(&json!({
1436            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1437            "device_one_time_keys_count": {
1438                "signed_curve25519": 50u64
1439            },
1440            "device_unused_fallback_key_types": [
1441                "signed_curve25519"
1442            ],
1443            "rooms": {
1444                "invite": {
1445                    "!ithpyNKDtmhneaTQja:example.org": {
1446                        "invite_state": {
1447                            "events": [
1448                                {
1449                                    "content": {
1450                                        "creator": "@test:example.org",
1451                                        "room_version": "9"
1452                                    },
1453                                    "sender": "@test:example.org",
1454                                    "state_key": "",
1455                                    "type": "m.room.create"
1456                                },
1457                                {
1458                                    "content": {
1459                                        "join_rule": "invite"
1460                                    },
1461                                    "sender": "@test:example.org",
1462                                    "state_key": "",
1463                                    "type": "m.room.join_rules"
1464                                },
1465                                {
1466                                    "content": {
1467                                        "algorithm": "m.megolm.v1.aes-sha2"
1468                                    },
1469                                    "sender": "@test:example.org",
1470                                    "state_key": "",
1471                                    "type": "m.room.encryption"
1472                                },
1473                                {
1474                                    "content": {
1475                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1476                                        "displayname": "Kyra",
1477                                        "membership": "join"
1478                                    },
1479                                    "sender": "@test:example.org",
1480                                    "state_key": "@test:example.org",
1481                                    "type": "m.room.member"
1482                                },
1483                                {
1484                                    "content": {
1485                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1486                                        "displayname": "alice",
1487                                        "is_direct": true,
1488                                        "membership": "invite"
1489                                    },
1490                                    "origin_server_ts": 1650878657984u64,
1491                                    "sender": "@test:example.org",
1492                                    "state_key": "@alice:example.org",
1493                                    "type": "m.room.member",
1494                                    "unsigned": {
1495                                        "age": 14u64
1496                                    },
1497                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1498                                }
1499                            ]
1500                        }
1501                    }
1502                }
1503            }
1504        }));
1505
1506        client.receive_sync_response(response).await.unwrap();
1507
1508        let room = client.get_room(room_id).expect("Room not found");
1509        assert_eq!(room.state(), RoomState::Invited);
1510        assert_eq!(
1511            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1512            RoomDisplayName::Calculated("Kyra".to_owned())
1513        );
1514    }
1515
1516    #[async_test]
1517    async fn test_deserialization_failure() {
1518        let user_id = user_id!("@alice:example.org");
1519        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1520
1521        let client = BaseClient::new(
1522            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1523            ThreadingSupport::Disabled,
1524        );
1525        client
1526            .activate(
1527                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1528                RoomLoadSettings::default(),
1529                #[cfg(feature = "e2e-encryption")]
1530                None,
1531            )
1532            .await
1533            .unwrap();
1534
1535        let response = ruma_response_from_json(&json!({
1536            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1537            "rooms": {
1538                "join": {
1539                    "!ithpyNKDtmhneaTQja:example.org": {
1540                        "state": {
1541                            "events": [
1542                                {
1543                                    "invalid": "invalid",
1544                                },
1545                                {
1546                                    "content": {
1547                                        "name": "The room name"
1548                                    },
1549                                    "event_id": "$143273582443PhrSn:example.org",
1550                                    "origin_server_ts": 1432735824653u64,
1551                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1552                                    "sender": "@example:example.org",
1553                                    "state_key": "",
1554                                    "type": "m.room.name",
1555                                    "unsigned": {
1556                                        "age": 1234
1557                                    }
1558                                },
1559                            ]
1560                        }
1561                    }
1562                }
1563            }
1564        }));
1565
1566        client.receive_sync_response(response).await.unwrap();
1567        client
1568            .state_store()
1569            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1570            .await
1571            .expect("Failed to fetch state event")
1572            .expect("State event not found")
1573            .deserialize()
1574            .expect("Failed to deserialize state event");
1575    }
1576
1577    #[async_test]
1578    async fn test_invited_members_arent_ignored() {
1579        let user_id = user_id!("@alice:example.org");
1580        let inviter_user_id = user_id!("@bob:example.org");
1581        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1582
1583        let client = BaseClient::new(
1584            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1585            ThreadingSupport::Disabled,
1586        );
1587        client
1588            .activate(
1589                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1590                RoomLoadSettings::default(),
1591                #[cfg(feature = "e2e-encryption")]
1592                None,
1593            )
1594            .await
1595            .unwrap();
1596
1597        // Preamble: let the SDK know about the room.
1598        let mut sync_builder = SyncResponseBuilder::new();
1599        let response = sync_builder
1600            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1601            .build_sync_response();
1602        client.receive_sync_response(response).await.unwrap();
1603
1604        // When I process the result of a /members request that only contains an invited
1605        // member,
1606        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1607
1608        let raw_member_event = json!({
1609            "content": {
1610                "avatar_url": "mxc://localhost/fewjilfewjil42",
1611                "displayname": "Invited Alice",
1612                "membership": "invite"
1613            },
1614            "event_id": "$151800140517rfvjc:localhost",
1615            "origin_server_ts": 151800140,
1616            "room_id": room_id,
1617            "sender": inviter_user_id,
1618            "state_key": user_id,
1619            "type": "m.room.member",
1620            "unsigned": {
1621                "age": 13374242,
1622            }
1623        });
1624        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1625            to_raw_value(&raw_member_event).unwrap(),
1626        )]);
1627
1628        // It's correctly processed,
1629        client.receive_all_members(room_id, &request, &response).await.unwrap();
1630
1631        let room = client.get_room(room_id).unwrap();
1632
1633        // And I can get the invited member display name and avatar.
1634        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1635
1636        assert_eq!(member.user_id(), user_id);
1637        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1638        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1639    }
1640
1641    #[async_test]
1642    async fn test_reinvited_members_get_a_display_name() {
1643        let user_id = user_id!("@alice:example.org");
1644        let inviter_user_id = user_id!("@bob:example.org");
1645        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1646
1647        let client = BaseClient::new(
1648            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1649            ThreadingSupport::Disabled,
1650        );
1651        client
1652            .activate(
1653                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1654                RoomLoadSettings::default(),
1655                #[cfg(feature = "e2e-encryption")]
1656                None,
1657            )
1658            .await
1659            .unwrap();
1660
1661        // Preamble: let the SDK know about the room, and that the invited user left it.
1662        let f = EventFactory::new().sender(user_id);
1663        let mut sync_builder = SyncResponseBuilder::new();
1664        let response = sync_builder
1665            .add_joined_room(
1666                matrix_sdk_test::JoinedRoomBuilder::new(room_id)
1667                    .add_state_event(f.member(user_id).leave()),
1668            )
1669            .build_sync_response();
1670        client.receive_sync_response(response).await.unwrap();
1671
1672        // Now, say that the user has been re-invited.
1673        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1674
1675        let raw_member_event = json!({
1676            "content": {
1677                "avatar_url": "mxc://localhost/fewjilfewjil42",
1678                "displayname": "Invited Alice",
1679                "membership": "invite"
1680            },
1681            "event_id": "$151800140517rfvjc:localhost",
1682            "origin_server_ts": 151800140,
1683            "room_id": room_id,
1684            "sender": inviter_user_id,
1685            "state_key": user_id,
1686            "type": "m.room.member",
1687            "unsigned": {
1688                "age": 13374242,
1689            }
1690        });
1691        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1692            to_raw_value(&raw_member_event).unwrap(),
1693        )]);
1694
1695        // It's correctly processed,
1696        client.receive_all_members(room_id, &request, &response).await.unwrap();
1697
1698        let room = client.get_room(room_id).unwrap();
1699
1700        // And I can get the invited member display name and avatar.
1701        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1702
1703        assert_eq!(member.user_id(), user_id);
1704        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1705        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1706    }
1707
1708    #[async_test]
1709    async fn test_ignored_user_list_changes() {
1710        let user_id = user_id!("@alice:example.org");
1711        let client = BaseClient::new(
1712            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1713            ThreadingSupport::Disabled,
1714        );
1715
1716        client
1717            .activate(
1718                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1719                RoomLoadSettings::default(),
1720                #[cfg(feature = "e2e-encryption")]
1721                None,
1722            )
1723            .await
1724            .unwrap();
1725
1726        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1727        assert!(subscriber.next().now_or_never().is_none());
1728
1729        let f = EventFactory::new();
1730        let mut sync_builder = SyncResponseBuilder::new();
1731        let response = sync_builder
1732            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1733            .build_sync_response();
1734        client.receive_sync_response(response).await.unwrap();
1735
1736        assert_let!(Some(ignored) = subscriber.next().await);
1737        assert_eq!(ignored, [BOB.to_string()]);
1738
1739        // Receive the same response.
1740        let response = sync_builder
1741            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1742            .build_sync_response();
1743        client.receive_sync_response(response).await.unwrap();
1744
1745        // No changes in the ignored list.
1746        assert!(subscriber.next().now_or_never().is_none());
1747
1748        // Now remove Bob from the ignored list.
1749        let response =
1750            sync_builder.add_global_account_data(f.ignored_user_list([])).build_sync_response();
1751        client.receive_sync_response(response).await.unwrap();
1752
1753        assert_let!(Some(ignored) = subscriber.next().await);
1754        assert!(ignored.is_empty());
1755    }
1756
1757    #[async_test]
1758    async fn test_is_user_ignored() {
1759        let ignored_user_id = user_id!("@alice:example.org");
1760        let client = logged_in_base_client(None).await;
1761
1762        let mut sync_builder = SyncResponseBuilder::new();
1763        let f = EventFactory::new();
1764        let response = sync_builder
1765            .add_global_account_data(f.ignored_user_list([ignored_user_id.to_owned()]))
1766            .build_sync_response();
1767        client.receive_sync_response(response).await.unwrap();
1768
1769        assert!(client.is_user_ignored(ignored_user_id).await);
1770    }
1771
1772    #[cfg(feature = "e2e-encryption")]
1773    #[async_test]
1774    async fn test_invite_details_are_set() {
1775        let user_id = user_id!("@alice:localhost");
1776        let client = logged_in_base_client(Some(user_id)).await;
1777        let known_room_id = room_id!("!invited:localhost");
1778        let unknown_room_id = room_id!("!unknown:localhost");
1779
1780        let mut sync_builder = SyncResponseBuilder::new();
1781        let response = sync_builder
1782            .add_invited_room(InvitedRoomBuilder::new(known_room_id))
1783            .build_sync_response();
1784        client.receive_sync_response(response).await.unwrap();
1785
1786        // Let us first check the initial state, we should have a room in the invite
1787        // state.
1788        let invited_room = client
1789            .get_room(known_room_id)
1790            .expect("The sync should have created a room in the invited state");
1791
1792        assert_eq!(invited_room.state(), RoomState::Invited);
1793        assert!(
1794            client.get_pending_key_bundle_details_for_room(known_room_id).await.unwrap().is_none()
1795        );
1796
1797        // Now we join the room.
1798        let joined_room = client
1799            .room_joined(known_room_id, Some(user_id.to_owned()))
1800            .await
1801            .expect("We should be able to mark a room as joined");
1802
1803        // Yup, we now have some invite details.
1804        assert_eq!(joined_room.state(), RoomState::Joined);
1805        assert_matches!(
1806            client.get_pending_key_bundle_details_for_room(known_room_id).await,
1807            Ok(Some(details))
1808        );
1809        assert_eq!(details.inviter, user_id);
1810
1811        // If we didn't know about the room before the join, we assume that there wasn't
1812        // an invite and we don't record the timestamp.
1813        assert!(client.get_room(unknown_room_id).is_none());
1814        let unknown_room = client
1815            .room_joined(unknown_room_id, Some(user_id.to_owned()))
1816            .await
1817            .expect("We should be able to mark a room as joined");
1818
1819        assert_eq!(unknown_room.state(), RoomState::Joined);
1820        assert!(
1821            client
1822                .get_pending_key_bundle_details_for_room(unknown_room_id)
1823                .await
1824                .unwrap()
1825                .is_none()
1826        );
1827
1828        sync_builder.clear();
1829        let response =
1830            sync_builder.add_left_room(LeftRoomBuilder::new(known_room_id)).build_sync_response();
1831        client.receive_sync_response(response).await.unwrap();
1832
1833        // Now that we left the room, we shouldn't have any details anymore.
1834        let left_room = client
1835            .get_room(known_room_id)
1836            .expect("The sync should have created a room in the invited state");
1837
1838        assert_eq!(left_room.state(), RoomState::Left);
1839        assert!(
1840            client.get_pending_key_bundle_details_for_room(known_room_id).await.unwrap().is_none()
1841        );
1842    }
1843}