Skip to main content

matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, timer};
28#[cfg(feature = "e2e-encryption")]
29use matrix_sdk_crypto::{
30    CollectStrategy, DecryptionSettings, EncryptionSettings, OlmError, OlmMachine,
31    TrustRequirement, store::DynCryptoStore, store::types::RoomPendingKeyBundleDetails,
32    types::requests::ToDeviceRequest,
33};
34#[cfg(doc)]
35use ruma::DeviceId;
36#[cfg(feature = "e2e-encryption")]
37use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
38use ruma::{
39    OwnedRoomId, OwnedUserId, RoomId, UserId,
40    api::client::{self as api, sync::sync_events::v5},
41    events::{
42        StateEvent, StateEventType,
43        ignored_user_list::IgnoredUserListEventContent,
44        push_rules::{PushRulesEvent, PushRulesEventContent},
45        room::member::SyncRoomMemberEvent,
46    },
47    push::Ruleset,
48    time::Instant,
49};
50use tokio::sync::{Mutex, broadcast};
51#[cfg(feature = "e2e-encryption")]
52use tokio::sync::{RwLock, RwLockReadGuard};
53use tracing::{Level, debug, enabled, info, instrument, warn};
54
55#[cfg(feature = "e2e-encryption")]
56use crate::RoomMemberships;
57use crate::{
58    RoomStateFilter, SessionMeta, StateStore,
59    deserialized_responses::DisplayName,
60    error::{Error, Result},
61    event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
62    media::store::MediaStoreLock,
63    response_processors::{self as processors, Context},
64    room::{
65        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
66    },
67    store::{
68        AvatarCache, BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult,
69        RoomLoadSettings, StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
70        StoreConfig, ambiguity_map::AmbiguityCache,
71    },
72    sync::{RoomUpdates, SyncResponse},
73};
74
75/// A no (network) IO client implementation.
76///
77/// This client is a state machine that receives responses and events and
78/// accordingly updates its state. It is not designed to be used directly, but
79/// rather through `matrix_sdk::Client`.
80///
81/// ```rust
82/// use matrix_sdk_base::{
83///     BaseClient, DmRoomDefinition, ThreadingSupport, store::StoreConfig,
84/// };
85/// use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
86///
87/// let client = BaseClient::new(
88///     StoreConfig::new(CrossProcessLockConfig::multi_process(
89///         "cross-process-holder-name".to_owned(),
90///     )),
91///     ThreadingSupport::Disabled,
92///     DmRoomDefinition::default(),
93/// );
94/// ```
95#[derive(Clone)]
96pub struct BaseClient {
97    /// The state store.
98    pub(crate) state_store: BaseStateStore,
99
100    /// The store used by the event cache.
101    event_cache_store: EventCacheStoreLock,
102
103    /// The store used by the media cache.
104    media_store: MediaStoreLock,
105
106    /// The store used for encryption.
107    ///
108    /// This field is only meant to be used for `OlmMachine` initialization.
109    /// All operations on it happen inside the `OlmMachine`.
110    #[cfg(feature = "e2e-encryption")]
111    crypto_store: Arc<DynCryptoStore>,
112
113    /// The olm-machine that is created once the
114    /// [`SessionMeta`][crate::session::SessionMeta] is set via
115    /// [`BaseClient::activate`]
116    #[cfg(feature = "e2e-encryption")]
117    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
118
119    /// Observable of when a user is ignored/unignored.
120    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
121
122    /// The strategy to use for picking recipient devices, when sending an
123    /// encrypted message.
124    #[cfg(feature = "e2e-encryption")]
125    pub room_key_recipient_strategy: CollectStrategy,
126
127    /// The settings to use for decrypting events.
128    #[cfg(feature = "e2e-encryption")]
129    pub decryption_settings: DecryptionSettings,
130
131    /// If the client should handle verification events received when syncing.
132    #[cfg(feature = "e2e-encryption")]
133    pub handle_verification_events: bool,
134
135    /// Whether the client supports threads or not.
136    pub threading_support: ThreadingSupport,
137
138    /// The definition of what is considered a DM room.
139    pub dm_room_definition: DmRoomDefinition,
140}
141
142#[cfg(not(tarpaulin_include))]
143impl fmt::Debug for BaseClient {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.debug_struct("BaseClient")
146            .field("session_meta", &self.state_store.session_meta())
147            .field("sync_token", &self.state_store.sync_token)
148            .finish_non_exhaustive()
149    }
150}
151
152/// Whether this client instance supports threading or not. Currently used to
153/// determine how the client handles read receipts and unread count computations
154/// on the base SDK level.
155///
156/// Timelines on the other hand have a separate `TimelineFocus`
157/// `hide_threaded_events` associated value that can be used to hide threaded
158/// events but also to enable threaded read receipt sending. This is because
159/// certain timeline instances should ignore threading no matter what's defined
160/// at the client level. One such example are media filtered timelines which
161/// should contain all the room's media no matter what thread its in (unless
162/// explicitly opted into).
163#[derive(Clone, Copy, Debug)]
164pub enum ThreadingSupport {
165    /// Threading enabled.
166    Enabled {
167        /// Enable client-wide thread subscriptions support (MSC4306 / MSC4308).
168        ///
169        /// This may cause filtering out of thread subscriptions, and loading
170        /// the thread subscriptions via the sliding sync extension,
171        /// when the room list service is being used.
172        with_subscriptions: bool,
173    },
174    /// Threading disabled.
175    Disabled,
176}
177
178impl BaseClient {
179    /// Create a new client.
180    ///
181    /// # Arguments
182    ///
183    /// * `config` - the configuration for the stores (state store, event cache
184    ///   store and crypto store).
185    pub fn new(
186        config: StoreConfig,
187        threading_support: ThreadingSupport,
188        dm_room_definition: DmRoomDefinition,
189    ) -> Self {
190        let store = BaseStateStore::new(config.state_store);
191
192        BaseClient {
193            state_store: store,
194            event_cache_store: config.event_cache_store,
195            media_store: config.media_store,
196            #[cfg(feature = "e2e-encryption")]
197            crypto_store: config.crypto_store,
198            #[cfg(feature = "e2e-encryption")]
199            olm_machine: Default::default(),
200            ignore_user_list_changes: Default::default(),
201            #[cfg(feature = "e2e-encryption")]
202            room_key_recipient_strategy: Default::default(),
203            #[cfg(feature = "e2e-encryption")]
204            decryption_settings: DecryptionSettings {
205                sender_device_trust_requirement: TrustRequirement::Untrusted,
206            },
207            #[cfg(feature = "e2e-encryption")]
208            handle_verification_events: true,
209            threading_support,
210            dm_room_definition,
211        }
212    }
213
214    /// Clones the current base client to use the same crypto store but a
215    /// different, in-memory store config, and resets transient state.
216    #[cfg(feature = "e2e-encryption")]
217    pub async fn clone_with_in_memory_state_store(
218        &self,
219        cross_process_mode: CrossProcessLockConfig,
220        handle_verification_events: bool,
221    ) -> Result<Self> {
222        let config = StoreConfig::new(cross_process_mode).state_store(MemoryStore::new());
223        let config = config.crypto_store(self.crypto_store.clone());
224
225        let copy = Self {
226            state_store: BaseStateStore::new(config.state_store),
227            event_cache_store: config.event_cache_store,
228            media_store: config.media_store,
229            // We copy the crypto store as well as the `OlmMachine` for two reasons:
230            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
231            // 2. We need to ensure that the parent and child use the same data and caches inside
232            //    the `OlmMachine` so the various ratchets and places where new randomness gets
233            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
234            //    or Olm sessions when they encrypt or decrypt messages.
235            crypto_store: self.crypto_store.clone(),
236            olm_machine: self.olm_machine.clone(),
237            ignore_user_list_changes: Default::default(),
238            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
239            decryption_settings: self.decryption_settings.clone(),
240            handle_verification_events,
241            threading_support: self.threading_support,
242            dm_room_definition: self.dm_room_definition.clone(),
243        };
244
245        copy.state_store.derive_from_other(&self.state_store).await?;
246
247        Ok(copy)
248    }
249
250    /// Clones the current base client to use the same crypto store but a
251    /// different, in-memory store config, and resets transient state.
252    #[cfg(not(feature = "e2e-encryption"))]
253    #[allow(clippy::unused_async)]
254    pub async fn clone_with_in_memory_state_store(
255        &self,
256        cross_process_store_config: CrossProcessLockConfig,
257        _handle_verification_events: bool,
258    ) -> Result<Self> {
259        let config = StoreConfig::new(cross_process_store_config).state_store(MemoryStore::new());
260        Ok(Self::new(config, ThreadingSupport::Disabled, DmRoomDefinition::default()))
261    }
262
263    /// Get the session meta information.
264    ///
265    /// If the client is currently logged in, this will return a
266    /// [`SessionMeta`] object which contains the user ID and device ID.
267    /// Otherwise it returns `None`.
268    pub fn session_meta(&self) -> Option<&SessionMeta> {
269        self.state_store.session_meta()
270    }
271
272    /// Get all the rooms this client knows about.
273    pub fn rooms(&self) -> Vec<Room> {
274        self.state_store.rooms()
275    }
276
277    /// Get all the rooms this client knows about, filtered by room state.
278    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
279        self.state_store.rooms_filtered(filter)
280    }
281
282    /// Get a stream of all the rooms changes, in addition to the existing
283    /// rooms.
284    pub fn rooms_stream(
285        &self,
286    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
287        self.state_store.rooms_stream()
288    }
289
290    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
291    /// yet in the store
292    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
293        self.state_store.get_or_create_room(room_id, room_state)
294    }
295
296    /// Get a reference to the state store.
297    pub fn state_store(&self) -> &DynStateStore {
298        self.state_store.deref()
299    }
300
301    /// Get a reference to the event cache store.
302    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
303        &self.event_cache_store
304    }
305
306    /// Get a reference to the media store.
307    pub fn media_store(&self) -> &MediaStoreLock {
308        &self.media_store
309    }
310
311    /// Check whether the client has been activated.
312    ///
313    /// See [`BaseClient::activate`] to know what it means.
314    pub fn is_active(&self) -> bool {
315        self.state_store.session_meta().is_some()
316    }
317
318    /// Activate the client.
319    ///
320    /// A client is considered active when:
321    ///
322    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
323    /// 2. Has loaded cached data from storage,
324    /// 3. If encryption is enabled, it also initialized or restored its
325    ///    `OlmMachine`.
326    ///
327    /// # Arguments
328    ///
329    /// * `session_meta` - The meta of a session that the user already has from
330    ///   a previous login call.
331    ///
332    /// * `custom_account` - A custom
333    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
334    ///   identity and one-time keys of this [`BaseClient`]. If no account is
335    ///   provided, a new default one or one from the store will be used. If an
336    ///   account is provided and one already exists in the store for this
337    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
338    ///   useful if one wishes to create identity keys before knowing the
339    ///   user/device IDs, e.g., to use the identity key as the device ID.
340    ///
341    /// * `room_load_settings` — Specify how many rooms must be restored; use
342    ///   `::default()` if you don't know which value to pick.
343    ///
344    /// # Panics
345    ///
346    /// This method panics if it is called twice.
347    ///
348    /// [`UserId`]: ruma::UserId
349    pub async fn activate(
350        &self,
351        session_meta: SessionMeta,
352        room_load_settings: RoomLoadSettings,
353        #[cfg(feature = "e2e-encryption")] custom_account: Option<
354            crate::crypto::vodozemac::olm::Account,
355        >,
356    ) -> Result<()> {
357        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
358
359        self.state_store.load_rooms(&session_meta.user_id, room_load_settings).await?;
360        self.state_store.load_sync_token().await?;
361        self.state_store.set_session_meta(session_meta);
362
363        #[cfg(feature = "e2e-encryption")]
364        self.regenerate_olm(custom_account).await?;
365
366        Ok(())
367    }
368
369    /// Recreate an `OlmMachine` from scratch.
370    ///
371    /// In particular, this will clear all its caches.
372    #[cfg(feature = "e2e-encryption")]
373    pub async fn regenerate_olm(
374        &self,
375        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
376    ) -> Result<()> {
377        tracing::debug!("regenerating OlmMachine");
378        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
379
380        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
381        // because we suspect it has stale data.
382        let olm_machine = OlmMachine::with_store(
383            &session_meta.user_id,
384            &session_meta.device_id,
385            self.crypto_store.clone(),
386            custom_account,
387        )
388        .await
389        .map_err(OlmError::from)?;
390
391        *self.olm_machine.write().await = Some(olm_machine);
392        Ok(())
393    }
394
395    /// Get the current, if any, sync token of the client.
396    /// This will be None if the client didn't sync at least once.
397    pub async fn sync_token(&self) -> Option<String> {
398        self.state_store.sync_token.read().await.clone()
399    }
400
401    /// User has knocked on a room.
402    ///
403    /// Update the internal and cached state accordingly. Return the final Room.
404    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
405        let room = self.state_store.get_or_create_room(room_id, RoomState::Knocked);
406
407        if room.state() != RoomState::Knocked {
408            let store_guard = self.state_store.lock().lock().await;
409
410            // We are no longer joined to the room, so the invite acceptance details are no
411            // longer relevant.
412            #[cfg(feature = "e2e-encryption")]
413            if let Some(olm_machine) = self.olm_machine().await.as_ref() {
414                olm_machine.store().clear_room_pending_key_bundle(room_id).await?
415            }
416
417            room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
418                info.mark_as_knocked();
419                info.mark_state_partially_synced();
420                info.mark_members_missing(); // the own member event changed
421                (info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
422            })
423            .await?;
424        }
425
426        Ok(room)
427    }
428
429    /// The user has joined a room using this specific client.
430    ///
431    /// This method should be called if the user accepts an invite or if they
432    /// join a public room.
433    ///
434    /// The method will create a [`Room`] object if one does not exist yet and
435    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
436    /// object will be persisted in the cache. Please note that the [`Room`]
437    /// will be a stub until a sync has been received with the full room
438    /// state using [`BaseClient::receive_sync_response`].
439    ///
440    /// Update the internal and cached state accordingly. Return the final Room.
441    ///
442    /// # Arguments
443    ///
444    /// * `room_id` - The unique ID identifying the joined room.
445    /// * `inviter` - When joining this room in response to an invitation, the
446    ///   inviter should be recorded before sending the join request to the
447    ///   server. Providing the inviter here ensures that the
448    ///   [`RoomPendingKeyBundleDetails`] are stored for this room.
449    ///
450    /// # Examples
451    ///
452    /// ```rust
453    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport, DmRoomDefinition};
454    /// # use ruma::{OwnedRoomId, OwnedUserId, RoomId};
455    /// use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
456    /// # async {
457    /// # let client = BaseClient::new(
458    ///     StoreConfig::new(CrossProcessLockConfig::multi_process("example")),
459    ///     ThreadingSupport::Disabled,
460    ///     DmRoomDefinition::default()
461    /// );
462    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
463    /// # async fn maybe_get_inviter(room_id: &RoomId) -> anyhow::Result<Option<OwnedUserId>> { todo!() }
464    /// # let room_id: &RoomId = todo!();
465    /// let maybe_inviter = maybe_get_inviter(room_id).await?;
466    /// let room_id = send_join_request().await?;
467    /// let room = client.room_joined(&room_id, maybe_inviter).await?;
468    ///
469    /// assert_eq!(room.state(), RoomState::Joined);
470    /// # matrix_sdk_test::TestResult::Ok(()) };
471    /// ```
472    pub async fn room_joined(
473        &self,
474        room_id: &RoomId,
475        inviter: Option<OwnedUserId>,
476    ) -> Result<Room> {
477        let room = self.state_store.get_or_create_room(room_id, RoomState::Joined);
478
479        // If the state isn't `RoomState::Joined` then this means that we knew about
480        // this room before. Let's modify the existing state now.
481        if room.state() != RoomState::Joined {
482            let store_guard = self.state_store_lock().lock().await;
483
484            #[cfg(feature = "e2e-encryption")]
485            {
486                // If our previous state was an invite and we're now in the joined state, this
487                // means that the user has explicitly accepted an invite. Let's
488                // remember some details about the invite.
489                //
490                // This is somewhat of a workaround for our lack of cryptographic membership.
491                // Later on we will decide if historic room keys should be accepted
492                // based on this info. If a user has accepted an invite and we receive a room
493                // key bundle shortly after, we might accept it. If we don't do
494                // this, the homeserver could trick us into accepting any historic room key
495                // bundle.
496                let previous_state = room.state();
497                if previous_state == RoomState::Invited
498                    && let Some(inviter) = inviter
499                    && let Some(olm_machine) = self.olm_machine().await.as_ref()
500                {
501                    olm_machine.store().store_room_pending_key_bundle(room_id, &inviter).await?
502                }
503            }
504            #[cfg(not(feature = "e2e-encryption"))]
505            {
506                // suppress unused argument warning
507                let _ = inviter;
508            }
509
510            room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
511                info.mark_as_joined();
512                info.mark_state_partially_synced();
513                info.mark_members_missing(); // the own member event changed
514                (info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
515            })
516            .await?;
517        }
518
519        Ok(room)
520    }
521
522    /// User has left a room.
523    ///
524    /// Update the internal and cached state accordingly.
525    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
526        let room = self.state_store.get_or_create_room(room_id, RoomState::Left);
527
528        if room.state() != RoomState::Left {
529            let store_guard = self.state_store.lock().lock().await;
530
531            // We are no longer joined to the room, so the invite acceptance details are no
532            // longer relevant.
533            #[cfg(feature = "e2e-encryption")]
534            if let Some(olm_machine) = self.olm_machine().await.as_ref() {
535                olm_machine.store().clear_room_pending_key_bundle(room_id).await?
536            }
537
538            room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
539                info.mark_as_left();
540                info.mark_state_partially_synced();
541                info.mark_members_missing(); // the own member event changed
542                (info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
543            })
544            .await?;
545        }
546
547        Ok(())
548    }
549
550    /// Get a lock to the state store, with an exclusive access.
551    ///
552    /// It doesn't give an access to the state store itself. It's rather a lock
553    /// to synchronise all accesses to the state store.
554    pub fn state_store_lock(&self) -> &Mutex<()> {
555        self.state_store.lock()
556    }
557
558    /// Receive a response from a sync call.
559    ///
560    /// # Arguments
561    ///
562    /// * `response` - The response that we received after a successful sync.
563    #[instrument(skip_all)]
564    pub async fn receive_sync_response(
565        &self,
566        response: api::sync::sync_events::v3::Response,
567    ) -> Result<SyncResponse> {
568        self.receive_sync_response_with_requested_required_states(
569            response,
570            &RequestedRequiredStates::default(),
571        )
572        .await
573    }
574
575    /// Receive a response from a sync call, with the requested required state
576    /// events.
577    ///
578    /// # Arguments
579    ///
580    /// * `response` - The response that we received after a successful sync.
581    /// * `requested_required_states` - The requested required state events.
582    pub async fn receive_sync_response_with_requested_required_states(
583        &self,
584        response: api::sync::sync_events::v3::Response,
585        requested_required_states: &RequestedRequiredStates,
586    ) -> Result<SyncResponse> {
587        // The server might respond multiple times with the same sync token, in
588        // that case we already received this response and there's nothing to
589        // do.
590        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
591            info!("Got the same sync response twice");
592            return Ok(SyncResponse::default());
593        }
594
595        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
596
597        // Acquire the state store lock and hold on to it while processing
598        // the sync response below.
599        let state_store_guard = self.state_store_lock().lock().await;
600
601        let user_id = self
602            .session_meta()
603            .expect("Sync shouldn't run without an authenticated user")
604            .user_id
605            .to_owned();
606
607        #[cfg(feature = "e2e-encryption")]
608        let olm_machine = self.olm_machine().await;
609
610        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
611
612        #[cfg(feature = "e2e-encryption")]
613        let processors::e2ee::to_device::Output { processed_to_device_events: to_device } =
614            processors::e2ee::to_device::from_sync_v2(
615                &response,
616                olm_machine.as_ref(),
617                &self.decryption_settings,
618            )
619            .await?;
620
621        #[cfg(not(feature = "e2e-encryption"))]
622        let to_device = response
623            .to_device
624            .events
625            .into_iter()
626            .map(|raw| {
627                use matrix_sdk_common::deserialized_responses::{
628                    ProcessedToDeviceEvent, ToDeviceUnableToDecryptInfo,
629                    ToDeviceUnableToDecryptReason,
630                };
631
632                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
633                    if event_type == "m.room.encrypted" {
634                        ProcessedToDeviceEvent::UnableToDecrypt {
635                            encrypted_event: raw,
636                            utd_info: ToDeviceUnableToDecryptInfo {
637                                reason: ToDeviceUnableToDecryptReason::EncryptionIsDisabled,
638                            },
639                        }
640                    } else {
641                        ProcessedToDeviceEvent::PlainText(raw)
642                    }
643                } else {
644                    // Exclude events with no type
645                    ProcessedToDeviceEvent::Invalid(raw)
646                }
647            })
648            .collect();
649
650        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
651        let mut avatar_cache = AvatarCache::new(self.state_store.inner.clone());
652
653        let global_account_data_processor =
654            processors::account_data::global(&response.account_data.events);
655
656        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
657
658        let mut room_updates = RoomUpdates::default();
659        let mut notifications = Default::default();
660
661        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
662            BTreeMap::new();
663
664        #[cfg(feature = "e2e-encryption")]
665        let e2ee_context = processors::e2ee::E2EE::new(
666            olm_machine.as_ref(),
667            &self.decryption_settings,
668            self.handle_verification_events,
669        );
670
671        for (room_id, joined_room) in response.rooms.join {
672            let joined_room_update = processors::room::sync_v2::update_joined_room(
673                &mut context,
674                processors::room::RoomCreationData::new(
675                    &room_id,
676                    requested_required_states,
677                    &mut ambiguity_cache,
678                    &mut avatar_cache,
679                ),
680                joined_room,
681                &mut updated_members_in_room,
682                processors::notification::Notification::new(
683                    &push_rules,
684                    &mut notifications,
685                    &self.state_store,
686                ),
687                #[cfg(feature = "e2e-encryption")]
688                &e2ee_context,
689            )
690            .await?;
691
692            room_updates.joined.insert(room_id, joined_room_update);
693        }
694
695        for (room_id, left_room) in response.rooms.leave {
696            let left_room_update = processors::room::sync_v2::update_left_room(
697                &mut context,
698                processors::room::RoomCreationData::new(
699                    &room_id,
700                    requested_required_states,
701                    &mut ambiguity_cache,
702                    &mut avatar_cache,
703                ),
704                left_room,
705                processors::notification::Notification::new(
706                    &push_rules,
707                    &mut notifications,
708                    &self.state_store,
709                ),
710                #[cfg(feature = "e2e-encryption")]
711                &e2ee_context,
712            )
713            .await?;
714
715            room_updates.left.insert(room_id, left_room_update);
716        }
717
718        for (room_id, invited_room) in response.rooms.invite {
719            let invited_room_update = processors::room::sync_v2::update_invited_room(
720                &mut context,
721                &room_id,
722                &user_id,
723                invited_room,
724                processors::notification::Notification::new(
725                    &push_rules,
726                    &mut notifications,
727                    &self.state_store,
728                ),
729                #[cfg(feature = "e2e-encryption")]
730                &e2ee_context,
731            )
732            .await?;
733
734            room_updates.invited.insert(room_id, invited_room_update);
735        }
736
737        for (room_id, knocked_room) in response.rooms.knock {
738            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
739                &mut context,
740                &room_id,
741                &user_id,
742                knocked_room,
743                processors::notification::Notification::new(
744                    &push_rules,
745                    &mut notifications,
746                    &self.state_store,
747                ),
748                #[cfg(feature = "e2e-encryption")]
749                &e2ee_context,
750            )
751            .await?;
752
753            room_updates.knocked.insert(room_id, knocked_room_update);
754        }
755
756        global_account_data_processor.apply(&mut context, &self.state_store).await;
757
758        context.state_changes.presence = response
759            .presence
760            .events
761            .iter()
762            .filter_map(|e| {
763                let event = e.deserialize().ok()?;
764                Some((event.sender, e.clone()))
765            })
766            .collect();
767
768        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
769
770        processors::changes::save_and_apply(
771            context,
772            &self.state_store,
773            &state_store_guard,
774            &self.ignore_user_list_changes,
775            Some(response.next_batch.clone()),
776        )
777        .await?;
778
779        let mut context = Context::default();
780
781        // Now that all the rooms information have been saved, update the display name
782        // of the updated rooms (which relies on information stored in the database).
783        processors::room::display_name::update_for_rooms(
784            &mut context,
785            &room_updates,
786            &self.state_store,
787        )
788        .await;
789
790        // Save the new display name updates if any.
791        processors::changes::save_only(context, &self.state_store, &state_store_guard).await?;
792
793        for (room_id, member_ids) in updated_members_in_room {
794            if let Some(room) = self.get_room(&room_id) {
795                let _ =
796                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
797            }
798        }
799
800        // Release the state store lock
801        drop(state_store_guard);
802
803        if enabled!(Level::INFO) {
804            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
805        }
806
807        let response = SyncResponse {
808            rooms: room_updates,
809            presence: response.presence.events,
810            account_data: response.account_data.events,
811            to_device,
812            notifications,
813        };
814
815        Ok(response)
816    }
817
818    /// Receive a get member events response and convert it to a deserialized
819    /// `MembersResponse`
820    ///
821    /// This client-server request must be made without filters to make sure all
822    /// members are received. Otherwise, an error is returned.
823    ///
824    /// # Arguments
825    ///
826    /// * `room_id` - The room id this response belongs to.
827    ///
828    /// * `response` - The raw response that was received from the server.
829    #[instrument(skip_all, fields(?room_id))]
830    pub async fn receive_all_members(
831        &self,
832        room_id: &RoomId,
833        request: &api::membership::get_member_events::v3::Request,
834        response: &api::membership::get_member_events::v3::Response,
835    ) -> Result<()> {
836        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
837        {
838            // This function assumes all members are loaded at once to optimise how display
839            // name disambiguation works. Using it with partial member list results
840            // would produce incorrect disambiguated display name entries
841            return Err(Error::InvalidReceiveMembersParameters);
842        }
843
844        let Some(room) = self.state_store.room(room_id) else {
845            // The room is unknown to us: leave early.
846            return Ok(());
847        };
848
849        let mut chunk = Vec::with_capacity(response.chunk.len());
850        let mut context = Context::default();
851
852        #[cfg(feature = "e2e-encryption")]
853        let mut user_ids = BTreeSet::new();
854
855        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
856
857        for raw_event in &response.chunk {
858            let member = match raw_event.deserialize() {
859                Ok(ev) => ev,
860                Err(e) => {
861                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
862                    debug!(event_id, "Failed to deserialize member event: {e}");
863                    continue;
864                }
865            };
866
867            // TODO: All the actions in this loop used to be done only when the membership
868            // event was not in the store before. This was changed with the new room API,
869            // because e.g. leaving a room makes members events outdated and they need to be
870            // fetched by `members`. Therefore, they need to be overwritten here, even
871            // if they exist.
872            // However, this makes a new problem occur where setting the member events here
873            // potentially races with the sync.
874            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
875
876            #[cfg(feature = "e2e-encryption")]
877            match member.membership() {
878                MembershipState::Join | MembershipState::Invite => {
879                    user_ids.insert(member.state_key().to_owned());
880                }
881                _ => (),
882            }
883
884            if let StateEvent::Original(e) = &member
885                && let Some(d) = &e.content.displayname
886            {
887                let display_name = DisplayName::new(d);
888                ambiguity_map.entry(display_name).or_default().insert(member.state_key().clone());
889            }
890
891            let sync_member: SyncRoomMemberEvent = member.clone().into();
892            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
893
894            context
895                .state_changes
896                .state
897                .entry(room_id.to_owned())
898                .or_default()
899                .entry(member.event_type())
900                .or_default()
901                .insert(member.state_key().to_string(), raw_event.clone().cast());
902            chunk.push(member);
903        }
904
905        #[cfg(feature = "e2e-encryption")]
906        processors::e2ee::tracked_users::update(
907            self.olm_machine().await.as_ref(),
908            room.encryption_state(),
909            &user_ids,
910        )
911        .await?;
912
913        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
914
915        {
916            let state_store_guard = self.state_store_lock().lock().await;
917
918            let mut room_info = room.clone_info();
919            room_info.mark_members_synced();
920            context.state_changes.add_room(room_info);
921
922            processors::changes::save_and_apply(
923                context,
924                &self.state_store,
925                &state_store_guard,
926                &self.ignore_user_list_changes,
927                None,
928            )
929            .await?;
930        }
931
932        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
933
934        #[cfg(feature = "e2e-encryption")]
935        if let Some(olm) = self.olm_machine().await.as_ref() {
936            // With the introduction of MSC4268, it is no longer sufficient to check for
937            // changes to session recipients when we send a message, since we may miss
938            // join/leave pairs in our view of the room state. Instead, we should rotate
939            // the room key whenever we fully reload the member list as a precaution.
940            tracing::debug!("Rotating room key due to full member list reload");
941            if let Err(e) = olm.discard_room_key(room_id).await {
942                tracing::warn!("Error discarding room key: {e:?}");
943            }
944        }
945
946        Ok(())
947    }
948
949    /// Receive a successful filter upload response, the filter id will be
950    /// stored under the given name in the store.
951    ///
952    /// The filter id can later be retrieved with the [`get_filter`] method.
953    ///
954    ///
955    /// # Arguments
956    ///
957    /// * `filter_name` - The name that should be used to persist the filter id
958    ///   in the store.
959    ///
960    /// * `response` - The successful filter upload response containing the
961    ///   filter id.
962    ///
963    /// [`get_filter`]: #method.get_filter
964    pub async fn receive_filter_upload(
965        &self,
966        filter_name: &str,
967        response: &api::filter::create_filter::v3::Response,
968    ) -> Result<()> {
969        Ok(self
970            .state_store
971            .set_kv_data(
972                StateStoreDataKey::Filter(filter_name),
973                StateStoreDataValue::Filter(response.filter_id.clone()),
974            )
975            .await?)
976    }
977
978    /// Get the filter id of a previously uploaded filter.
979    ///
980    /// *Note*: A filter will first need to be uploaded and persisted using
981    /// [`receive_filter_upload`].
982    ///
983    /// # Arguments
984    ///
985    /// * `filter_name` - The name of the filter that was previously used to
986    ///   persist the filter.
987    ///
988    /// [`receive_filter_upload`]: #method.receive_filter_upload
989    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
990        let filter = self
991            .state_store
992            .get_kv_data(StateStoreDataKey::Filter(filter_name))
993            .await?
994            .map(|d| d.into_filter().expect("State store data not a filter"));
995
996        Ok(filter)
997    }
998
999    /// Get a to-device request that will share a room key with users in a room.
1000    #[cfg(feature = "e2e-encryption")]
1001    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1002        match self.olm_machine().await.as_ref() {
1003            Some(o) => {
1004                let Some(room) = self.get_room(room_id) else {
1005                    return Err(Error::InsufficientData);
1006                };
1007
1008                let history_visibility = room.history_visibility_or_default();
1009                let Some(room_encryption_event) = room.encryption_settings() else {
1010                    return Err(Error::EncryptionNotEnabled);
1011                };
1012
1013                // Don't share the group session with members that are invited
1014                // if the history visibility is set to `Joined`
1015                let filter = if history_visibility == HistoryVisibility::Joined {
1016                    RoomMemberships::JOIN
1017                } else {
1018                    RoomMemberships::ACTIVE
1019                };
1020
1021                let members = self.state_store.get_user_ids(room_id, filter).await?;
1022
1023                let Some(settings) = EncryptionSettings::from_possibly_redacted(
1024                    room_encryption_event,
1025                    history_visibility,
1026                    self.room_key_recipient_strategy.clone(),
1027                ) else {
1028                    return Err(Error::EncryptionNotEnabled);
1029                };
1030
1031                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1032            }
1033            None => panic!("Olm machine wasn't started"),
1034        }
1035    }
1036
1037    /// Get the room with the given room id.
1038    ///
1039    /// # Arguments
1040    ///
1041    /// * `room_id` - The id of the room that should be fetched.
1042    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1043        self.state_store.room(room_id)
1044    }
1045
1046    /// Forget the room with the given room ID.
1047    ///
1048    /// The room will be dropped from the room list and the store.
1049    ///
1050    /// # Arguments
1051    ///
1052    /// * `room_id` - The id of the room that should be forgotten.
1053    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1054        // Forget the room in the state store.
1055        self.state_store.forget_room(room_id).await?;
1056
1057        // Remove the room in the event cache store too.
1058        match self.event_cache_store().lock().await? {
1059            // If the lock is clear, we can do the operation as expected.
1060            // If the lock is dirty, we can ignore to refresh the state, we just need to remove a
1061            // room. Also, we must not mark the lock as non-dirty because other operations may be
1062            // critical and may need to refresh the `EventCache`' state.
1063            EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
1064                guard.remove_room(room_id).await?
1065            }
1066        }
1067
1068        Ok(())
1069    }
1070
1071    /// Get the olm machine.
1072    #[cfg(feature = "e2e-encryption")]
1073    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1074        self.olm_machine.read().await
1075    }
1076
1077    /// Get the push rules.
1078    ///
1079    /// Gets the push rules previously processed, otherwise get them from the
1080    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1081    /// is logged in.
1082    pub(crate) async fn get_push_rules(
1083        &self,
1084        global_account_data_processor: &processors::account_data::Global,
1085    ) -> Result<Ruleset> {
1086        let _timer = timer!(Level::TRACE, "get_push_rules");
1087        if let Some(event) = global_account_data_processor
1088            .push_rules()
1089            .and_then(|ev| ev.deserialize_as_unchecked::<PushRulesEvent>().ok())
1090        {
1091            Ok(event.content.global)
1092        } else if let Some(event) = self
1093            .state_store
1094            .get_account_data_event_static::<PushRulesEventContent>()
1095            .await?
1096            .and_then(|ev| ev.deserialize().ok())
1097        {
1098            Ok(event.content.global)
1099        } else if let Some(session_meta) = self.state_store.session_meta() {
1100            Ok(Ruleset::server_default(&session_meta.user_id))
1101        } else {
1102            Ok(Ruleset::new())
1103        }
1104    }
1105
1106    /// Returns a subscriber that publishes an event every time the ignore user
1107    /// list changes
1108    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1109        self.ignore_user_list_changes.subscribe()
1110    }
1111
1112    /// Returns a new receiver that gets future room info notable updates.
1113    ///
1114    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1115    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1116        self.state_store.room_info_notable_update_sender.subscribe()
1117    }
1118
1119    /// Checks whether the provided `user_id` belongs to an ignored user.
1120    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1121        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1122        {
1123            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1124                Ok(current_ignored_user_list) => {
1125                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1126                }
1127                Err(error) => {
1128                    warn!(?error, "Failed to deserialize the ignored user list event");
1129                    false
1130                }
1131            },
1132            Ok(None) => false,
1133            Err(error) => {
1134                warn!(?error, "Could not get the ignored user list from the state store");
1135                false
1136            }
1137        }
1138    }
1139
1140    /// Check the record of whether we are waiting for an [MSC4268] key bundle
1141    /// for the given room.
1142    ///
1143    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1144    #[cfg(feature = "e2e-encryption")]
1145    pub async fn get_pending_key_bundle_details_for_room(
1146        &self,
1147        room_id: &RoomId,
1148    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
1149        let result = match self.olm_machine().await.as_ref() {
1150            Some(machine) => {
1151                machine.store().get_pending_key_bundle_details_for_room(room_id).await?
1152            }
1153            None => None,
1154        };
1155        Ok(result)
1156    }
1157
1158    /// Close all stores, releasing database connections and file locks.
1159    ///
1160    /// In-flight operations will complete before this returns.
1161    pub async fn close_stores(&self) -> Result<()> {
1162        self.state_store.close().await?;
1163        self.event_cache_store.close().await.map_err(Error::EventCacheStore)?;
1164        self.media_store.close().await.map_err(Error::MediaStore)?;
1165
1166        #[cfg(feature = "e2e-encryption")]
1167        self.crypto_store.close().await.map_err(Error::CryptoStore)?;
1168
1169        Ok(())
1170    }
1171
1172    /// Reopen all stores after a close, re-opening database connections.
1173    pub async fn reopen_stores(&self) -> Result<()> {
1174        #[cfg(feature = "e2e-encryption")]
1175        self.crypto_store.reopen().await.map_err(Error::CryptoStore)?;
1176
1177        self.media_store.reopen().await.map_err(Error::MediaStore)?;
1178        self.event_cache_store.reopen().await.map_err(Error::EventCacheStore)?;
1179        self.state_store.reopen().await?;
1180
1181        Ok(())
1182    }
1183}
1184
1185/// Represent the `required_state` values sent by a sync request.
1186///
1187/// This is useful to track what state events have been requested when handling
1188/// a response.
1189///
1190/// For example, if a sync requests the `m.room.encryption` state event, and the
1191/// server replies with nothing, if means the room **is not** encrypted. Without
1192/// knowing which state event was required by the sync, it is impossible to
1193/// interpret the absence of state event from the server as _the room's
1194/// encryption state is **not encrypted**_ or _the room's encryption state is
1195/// **unknown**_.
1196#[derive(Debug, Default)]
1197pub struct RequestedRequiredStates {
1198    default: Vec<(StateEventType, String)>,
1199    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1200}
1201
1202impl RequestedRequiredStates {
1203    /// Create a new `RequestedRequiredStates`.
1204    ///
1205    /// `default` represents the `required_state` value for all rooms.
1206    /// `for_rooms` is the `required_state` per room.
1207    pub fn new(
1208        default: Vec<(StateEventType, String)>,
1209        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1210    ) -> Self {
1211        Self { default, for_rooms }
1212    }
1213
1214    /// Get the `required_state` value for a specific room.
1215    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1216        self.for_rooms.get(room_id).unwrap_or(&self.default)
1217    }
1218}
1219
1220impl From<&v5::Request> for RequestedRequiredStates {
1221    fn from(request: &v5::Request) -> Self {
1222        // The following information is missing in the MSC4186 at the time of writing
1223        // (2025-03-12) but: the `required_state`s from all lists and from all room
1224        // subscriptions are combined by doing an union.
1225        //
1226        // Thus, we can do the same here, put the union in `default` and keep
1227        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1228        let mut default = BTreeSet::new();
1229
1230        for list in request.lists.values() {
1231            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1232        }
1233
1234        for room_subscription in request.room_subscriptions.values() {
1235            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1236        }
1237
1238        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1239    }
1240}
1241
1242/// An enum that defines what the [`BaseClient`] should consider a DM room.
1243#[derive(Debug, Clone, Default)]
1244#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
1245pub enum DmRoomDefinition {
1246    /// Standard Matrix spec definition: a room linked to a user in an
1247    /// `m.direct` event.
1248    #[default]
1249    MatrixSpec,
1250    /// A room that is direct, as per the spec but also contains at most 2
1251    /// active members.
1252    TwoMembers,
1253}
1254
1255#[cfg(test)]
1256mod tests {
1257    use std::collections::HashMap;
1258
1259    use assert_matches2::assert_let;
1260    #[cfg(feature = "e2e-encryption")]
1261    use assert_matches2::assert_matches;
1262    use futures_util::FutureExt as _;
1263    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1264    use matrix_sdk_test::{
1265        BOB, InvitedRoomBuilder, LeftRoomBuilder, SyncResponseBuilder, async_test,
1266        event_factory::EventFactory, ruma_response_from_json,
1267    };
1268    use ruma::{
1269        api::client::{self as api, sync::sync_events::v5},
1270        event_id,
1271        events::{StateEventType, room::member::MembershipState},
1272        room_id,
1273        serde::Raw,
1274        user_id,
1275    };
1276    use serde_json::{json, value::to_raw_value};
1277
1278    use super::{BaseClient, RequestedRequiredStates};
1279    use crate::{
1280        DmRoomDefinition, RoomDisplayName, RoomState, SessionMeta,
1281        client::ThreadingSupport,
1282        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1283        test_utils::logged_in_base_client,
1284    };
1285
1286    #[test]
1287    fn test_requested_required_states() {
1288        let room_id_0 = room_id!("!r0");
1289        let room_id_1 = room_id!("!r1");
1290
1291        let requested_required_states = RequestedRequiredStates::new(
1292            vec![(StateEventType::RoomAvatar, "".to_owned())],
1293            HashMap::from([(
1294                room_id_0.to_owned(),
1295                vec![
1296                    (StateEventType::RoomMember, "foo".to_owned()),
1297                    (StateEventType::RoomEncryption, "".to_owned()),
1298                ],
1299            )]),
1300        );
1301
1302        // A special set of state events exists for `room_id_0`.
1303        assert_eq!(
1304            requested_required_states.for_room(room_id_0),
1305            &[
1306                (StateEventType::RoomMember, "foo".to_owned()),
1307                (StateEventType::RoomEncryption, "".to_owned()),
1308            ]
1309        );
1310
1311        // No special list for `room_id_1`, it should return the defaults.
1312        assert_eq!(
1313            requested_required_states.for_room(room_id_1),
1314            &[(StateEventType::RoomAvatar, "".to_owned()),]
1315        );
1316    }
1317
1318    #[test]
1319    fn test_requested_required_states_from_sync_v5_request() {
1320        let room_id_0 = room_id!("!r0");
1321        let room_id_1 = room_id!("!r1");
1322
1323        // Empty request.
1324        let mut request = v5::Request::new();
1325
1326        {
1327            let requested_required_states = RequestedRequiredStates::from(&request);
1328
1329            assert!(requested_required_states.default.is_empty());
1330            assert!(requested_required_states.for_rooms.is_empty());
1331        }
1332
1333        // One list.
1334        request.lists.insert("foo".to_owned(), {
1335            let mut list = v5::request::List::default();
1336            list.room_details.required_state = vec![
1337                (StateEventType::RoomAvatar, "".to_owned()),
1338                (StateEventType::RoomEncryption, "".to_owned()),
1339            ];
1340
1341            list
1342        });
1343
1344        {
1345            let requested_required_states = RequestedRequiredStates::from(&request);
1346
1347            assert_eq!(
1348                requested_required_states.default,
1349                &[
1350                    (StateEventType::RoomAvatar, "".to_owned()),
1351                    (StateEventType::RoomEncryption, "".to_owned())
1352                ]
1353            );
1354            assert!(requested_required_states.for_rooms.is_empty());
1355        }
1356
1357        // Two lists.
1358        request.lists.insert("bar".to_owned(), {
1359            let mut list = v5::request::List::default();
1360            list.room_details.required_state = vec![
1361                (StateEventType::RoomEncryption, "".to_owned()),
1362                (StateEventType::RoomName, "".to_owned()),
1363            ];
1364
1365            list
1366        });
1367
1368        {
1369            let requested_required_states = RequestedRequiredStates::from(&request);
1370
1371            // Union of the state events.
1372            assert_eq!(
1373                requested_required_states.default,
1374                &[
1375                    (StateEventType::RoomAvatar, "".to_owned()),
1376                    (StateEventType::RoomEncryption, "".to_owned()),
1377                    (StateEventType::RoomName, "".to_owned()),
1378                ]
1379            );
1380            assert!(requested_required_states.for_rooms.is_empty());
1381        }
1382
1383        // One room subscription.
1384        request.room_subscriptions.insert(room_id_0.to_owned(), {
1385            let mut room_subscription = v5::request::RoomSubscription::default();
1386
1387            room_subscription.required_state = vec![
1388                (StateEventType::RoomJoinRules, "".to_owned()),
1389                (StateEventType::RoomEncryption, "".to_owned()),
1390            ];
1391
1392            room_subscription
1393        });
1394
1395        {
1396            let requested_required_states = RequestedRequiredStates::from(&request);
1397
1398            // Union of state events, all in `default`, still nothing in `for_rooms`.
1399            assert_eq!(
1400                requested_required_states.default,
1401                &[
1402                    (StateEventType::RoomAvatar, "".to_owned()),
1403                    (StateEventType::RoomEncryption, "".to_owned()),
1404                    (StateEventType::RoomJoinRules, "".to_owned()),
1405                    (StateEventType::RoomName, "".to_owned()),
1406                ]
1407            );
1408            assert!(requested_required_states.for_rooms.is_empty());
1409        }
1410
1411        // Two room subscriptions.
1412        request.room_subscriptions.insert(room_id_1.to_owned(), {
1413            let mut room_subscription = v5::request::RoomSubscription::default();
1414
1415            room_subscription.required_state = vec![
1416                (StateEventType::RoomName, "".to_owned()),
1417                (StateEventType::RoomTopic, "".to_owned()),
1418            ];
1419
1420            room_subscription
1421        });
1422
1423        {
1424            let requested_required_states = RequestedRequiredStates::from(&request);
1425
1426            // Union of state events, all in `default`, still nothing in `for_rooms`.
1427            assert_eq!(
1428                requested_required_states.default,
1429                &[
1430                    (StateEventType::RoomAvatar, "".to_owned()),
1431                    (StateEventType::RoomEncryption, "".to_owned()),
1432                    (StateEventType::RoomJoinRules, "".to_owned()),
1433                    (StateEventType::RoomName, "".to_owned()),
1434                    (StateEventType::RoomTopic, "".to_owned()),
1435                ]
1436            );
1437        }
1438    }
1439
1440    #[async_test]
1441    async fn test_invite_after_leaving() {
1442        let user_id = user_id!("@alice:example.org");
1443        let room_id = room_id!("!test:example.org");
1444
1445        let client = logged_in_base_client(Some(user_id)).await;
1446        let f = EventFactory::new();
1447
1448        let mut sync_builder = SyncResponseBuilder::new();
1449
1450        let response = sync_builder
1451            .add_left_room(
1452                LeftRoomBuilder::new(room_id).add_timeline_event(
1453                    EventFactory::new()
1454                        .member(user_id)
1455                        .membership(MembershipState::Leave)
1456                        .display_name("Alice")
1457                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1458                ),
1459            )
1460            .build_sync_response();
1461        client.receive_sync_response(response).await.unwrap();
1462        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1463
1464        let response = sync_builder
1465            .add_invited_room(
1466                InvitedRoomBuilder::new(room_id).add_state_event(
1467                    f.member(user_id)
1468                        .sender(user_id!("@example:example.org"))
1469                        .membership(MembershipState::Invite)
1470                        .display_name("Alice"),
1471                ),
1472            )
1473            .build_sync_response();
1474        client.receive_sync_response(response).await.unwrap();
1475        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1476    }
1477
1478    #[async_test]
1479    async fn test_invite_displayname() {
1480        let user_id = user_id!("@alice:example.org");
1481        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1482
1483        let client = logged_in_base_client(Some(user_id)).await;
1484
1485        let response = ruma_response_from_json(&json!({
1486            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1487            "device_one_time_keys_count": {
1488                "signed_curve25519": 50u64
1489            },
1490            "device_unused_fallback_key_types": [
1491                "signed_curve25519"
1492            ],
1493            "rooms": {
1494                "invite": {
1495                    "!ithpyNKDtmhneaTQja:example.org": {
1496                        "invite_state": {
1497                            "events": [
1498                                {
1499                                    "content": {
1500                                        "creator": "@test:example.org",
1501                                        "room_version": "9"
1502                                    },
1503                                    "sender": "@test:example.org",
1504                                    "state_key": "",
1505                                    "type": "m.room.create"
1506                                },
1507                                {
1508                                    "content": {
1509                                        "join_rule": "invite"
1510                                    },
1511                                    "sender": "@test:example.org",
1512                                    "state_key": "",
1513                                    "type": "m.room.join_rules"
1514                                },
1515                                {
1516                                    "content": {
1517                                        "algorithm": "m.megolm.v1.aes-sha2"
1518                                    },
1519                                    "sender": "@test:example.org",
1520                                    "state_key": "",
1521                                    "type": "m.room.encryption"
1522                                },
1523                                {
1524                                    "content": {
1525                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1526                                        "displayname": "Kyra",
1527                                        "membership": "join"
1528                                    },
1529                                    "sender": "@test:example.org",
1530                                    "state_key": "@test:example.org",
1531                                    "type": "m.room.member"
1532                                },
1533                                {
1534                                    "content": {
1535                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1536                                        "displayname": "alice",
1537                                        "is_direct": true,
1538                                        "membership": "invite"
1539                                    },
1540                                    "origin_server_ts": 1650878657984u64,
1541                                    "sender": "@test:example.org",
1542                                    "state_key": "@alice:example.org",
1543                                    "type": "m.room.member",
1544                                    "unsigned": {
1545                                        "age": 14u64
1546                                    },
1547                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1548                                }
1549                            ]
1550                        }
1551                    }
1552                }
1553            }
1554        }));
1555
1556        client.receive_sync_response(response).await.unwrap();
1557
1558        let room = client.get_room(room_id).expect("Room not found");
1559        assert_eq!(room.state(), RoomState::Invited);
1560        assert_eq!(
1561            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1562            RoomDisplayName::Calculated("Kyra".to_owned())
1563        );
1564    }
1565
1566    #[async_test]
1567    async fn test_deserialization_failure() {
1568        let user_id = user_id!("@alice:example.org");
1569        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1570
1571        let client = BaseClient::new(
1572            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1573            ThreadingSupport::Disabled,
1574            DmRoomDefinition::default(),
1575        );
1576        client
1577            .activate(
1578                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1579                RoomLoadSettings::default(),
1580                #[cfg(feature = "e2e-encryption")]
1581                None,
1582            )
1583            .await
1584            .unwrap();
1585
1586        let response = ruma_response_from_json(&json!({
1587            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1588            "rooms": {
1589                "join": {
1590                    "!ithpyNKDtmhneaTQja:example.org": {
1591                        "state": {
1592                            "events": [
1593                                {
1594                                    "invalid": "invalid",
1595                                },
1596                                {
1597                                    "content": {
1598                                        "name": "The room name"
1599                                    },
1600                                    "event_id": "$143273582443PhrSn:example.org",
1601                                    "origin_server_ts": 1432735824653u64,
1602                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1603                                    "sender": "@example:example.org",
1604                                    "state_key": "",
1605                                    "type": "m.room.name",
1606                                    "unsigned": {
1607                                        "age": 1234
1608                                    }
1609                                },
1610                            ]
1611                        }
1612                    }
1613                }
1614            }
1615        }));
1616
1617        client.receive_sync_response(response).await.unwrap();
1618        client
1619            .state_store()
1620            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1621            .await
1622            .expect("Failed to fetch state event")
1623            .expect("State event not found")
1624            .deserialize()
1625            .expect("Failed to deserialize state event");
1626    }
1627
1628    #[async_test]
1629    async fn test_invited_members_arent_ignored() {
1630        let user_id = user_id!("@alice:example.org");
1631        let inviter_user_id = user_id!("@bob:example.org");
1632        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1633
1634        let client = BaseClient::new(
1635            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1636            ThreadingSupport::Disabled,
1637            DmRoomDefinition::default(),
1638        );
1639        client
1640            .activate(
1641                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1642                RoomLoadSettings::default(),
1643                #[cfg(feature = "e2e-encryption")]
1644                None,
1645            )
1646            .await
1647            .unwrap();
1648
1649        // Preamble: let the SDK know about the room.
1650        let mut sync_builder = SyncResponseBuilder::new();
1651        let response = sync_builder
1652            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1653            .build_sync_response();
1654        client.receive_sync_response(response).await.unwrap();
1655
1656        // When I process the result of a /members request that only contains an invited
1657        // member,
1658        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1659
1660        let raw_member_event = json!({
1661            "content": {
1662                "avatar_url": "mxc://localhost/fewjilfewjil42",
1663                "displayname": "Invited Alice",
1664                "membership": "invite"
1665            },
1666            "event_id": "$151800140517rfvjc:localhost",
1667            "origin_server_ts": 151800140,
1668            "room_id": room_id,
1669            "sender": inviter_user_id,
1670            "state_key": user_id,
1671            "type": "m.room.member",
1672            "unsigned": {
1673                "age": 13374242,
1674            }
1675        });
1676        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1677            to_raw_value(&raw_member_event).unwrap(),
1678        )]);
1679
1680        // It's correctly processed,
1681        client.receive_all_members(room_id, &request, &response).await.unwrap();
1682
1683        let room = client.get_room(room_id).unwrap();
1684
1685        // And I can get the invited member display name and avatar.
1686        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1687
1688        assert_eq!(member.user_id(), user_id);
1689        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1690        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1691    }
1692
1693    #[async_test]
1694    async fn test_reinvited_members_get_a_display_name() {
1695        let user_id = user_id!("@alice:example.org");
1696        let inviter_user_id = user_id!("@bob:example.org");
1697        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1698
1699        let client = BaseClient::new(
1700            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1701            ThreadingSupport::Disabled,
1702            DmRoomDefinition::default(),
1703        );
1704        client
1705            .activate(
1706                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1707                RoomLoadSettings::default(),
1708                #[cfg(feature = "e2e-encryption")]
1709                None,
1710            )
1711            .await
1712            .unwrap();
1713
1714        // Preamble: let the SDK know about the room, and that the invited user left it.
1715        let f = EventFactory::new().sender(user_id);
1716        let mut sync_builder = SyncResponseBuilder::new();
1717        let response = sync_builder
1718            .add_joined_room(
1719                matrix_sdk_test::JoinedRoomBuilder::new(room_id)
1720                    .add_state_event(f.member(user_id).leave()),
1721            )
1722            .build_sync_response();
1723        client.receive_sync_response(response).await.unwrap();
1724
1725        // Now, say that the user has been re-invited.
1726        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1727
1728        let raw_member_event = json!({
1729            "content": {
1730                "avatar_url": "mxc://localhost/fewjilfewjil42",
1731                "displayname": "Invited Alice",
1732                "membership": "invite"
1733            },
1734            "event_id": "$151800140517rfvjc:localhost",
1735            "origin_server_ts": 151800140,
1736            "room_id": room_id,
1737            "sender": inviter_user_id,
1738            "state_key": user_id,
1739            "type": "m.room.member",
1740            "unsigned": {
1741                "age": 13374242,
1742            }
1743        });
1744        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1745            to_raw_value(&raw_member_event).unwrap(),
1746        )]);
1747
1748        // It's correctly processed,
1749        client.receive_all_members(room_id, &request, &response).await.unwrap();
1750
1751        let room = client.get_room(room_id).unwrap();
1752
1753        // And I can get the invited member display name and avatar.
1754        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1755
1756        assert_eq!(member.user_id(), user_id);
1757        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1758        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1759    }
1760
1761    #[async_test]
1762    async fn test_ignored_user_list_changes() {
1763        let user_id = user_id!("@alice:example.org");
1764        let client = BaseClient::new(
1765            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1766            ThreadingSupport::Disabled,
1767            DmRoomDefinition::default(),
1768        );
1769
1770        client
1771            .activate(
1772                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1773                RoomLoadSettings::default(),
1774                #[cfg(feature = "e2e-encryption")]
1775                None,
1776            )
1777            .await
1778            .unwrap();
1779
1780        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1781        assert!(subscriber.next().now_or_never().is_none());
1782
1783        let f = EventFactory::new();
1784        let mut sync_builder = SyncResponseBuilder::new();
1785        let response = sync_builder
1786            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1787            .build_sync_response();
1788        client.receive_sync_response(response).await.unwrap();
1789
1790        assert_let!(Some(ignored) = subscriber.next().await);
1791        assert_eq!(ignored, [BOB.to_string()]);
1792
1793        // Receive the same response.
1794        let response = sync_builder
1795            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1796            .build_sync_response();
1797        client.receive_sync_response(response).await.unwrap();
1798
1799        // No changes in the ignored list.
1800        assert!(subscriber.next().now_or_never().is_none());
1801
1802        // Now remove Bob from the ignored list.
1803        let response =
1804            sync_builder.add_global_account_data(f.ignored_user_list([])).build_sync_response();
1805        client.receive_sync_response(response).await.unwrap();
1806
1807        assert_let!(Some(ignored) = subscriber.next().await);
1808        assert!(ignored.is_empty());
1809    }
1810
1811    #[async_test]
1812    async fn test_is_user_ignored() {
1813        let ignored_user_id = user_id!("@alice:example.org");
1814        let client = logged_in_base_client(None).await;
1815
1816        let mut sync_builder = SyncResponseBuilder::new();
1817        let f = EventFactory::new();
1818        let response = sync_builder
1819            .add_global_account_data(f.ignored_user_list([ignored_user_id.to_owned()]))
1820            .build_sync_response();
1821        client.receive_sync_response(response).await.unwrap();
1822
1823        assert!(client.is_user_ignored(ignored_user_id).await);
1824    }
1825
1826    #[cfg(feature = "e2e-encryption")]
1827    #[async_test]
1828    async fn test_invite_details_are_set() {
1829        let user_id = user_id!("@alice:localhost");
1830        let client = logged_in_base_client(Some(user_id)).await;
1831        let known_room_id = room_id!("!invited:localhost");
1832        let unknown_room_id = room_id!("!unknown:localhost");
1833
1834        let mut sync_builder = SyncResponseBuilder::new();
1835        let response = sync_builder
1836            .add_invited_room(InvitedRoomBuilder::new(known_room_id))
1837            .build_sync_response();
1838        client.receive_sync_response(response).await.unwrap();
1839
1840        // Let us first check the initial state, we should have a room in the invite
1841        // state.
1842        let invited_room = client
1843            .get_room(known_room_id)
1844            .expect("The sync should have created a room in the invited state");
1845
1846        assert_eq!(invited_room.state(), RoomState::Invited);
1847        assert!(
1848            client.get_pending_key_bundle_details_for_room(known_room_id).await.unwrap().is_none()
1849        );
1850
1851        // Now we join the room.
1852        let joined_room = client
1853            .room_joined(known_room_id, Some(user_id.to_owned()))
1854            .await
1855            .expect("We should be able to mark a room as joined");
1856
1857        // Yup, we now have some invite details.
1858        assert_eq!(joined_room.state(), RoomState::Joined);
1859        assert_matches!(
1860            client.get_pending_key_bundle_details_for_room(known_room_id).await,
1861            Ok(Some(details))
1862        );
1863        assert_eq!(details.inviter, user_id);
1864
1865        // If we didn't know about the room before the join, we assume that there wasn't
1866        // an invite and we don't record the timestamp.
1867        assert!(client.get_room(unknown_room_id).is_none());
1868        let unknown_room = client
1869            .room_joined(unknown_room_id, Some(user_id.to_owned()))
1870            .await
1871            .expect("We should be able to mark a room as joined");
1872
1873        assert_eq!(unknown_room.state(), RoomState::Joined);
1874        assert!(
1875            client
1876                .get_pending_key_bundle_details_for_room(unknown_room_id)
1877                .await
1878                .unwrap()
1879                .is_none()
1880        );
1881
1882        sync_builder.clear();
1883        let response =
1884            sync_builder.add_left_room(LeftRoomBuilder::new(known_room_id)).build_sync_response();
1885        client.receive_sync_response(response).await.unwrap();
1886
1887        // Now that we left the room, we shouldn't have any details anymore.
1888        let left_room = client
1889            .get_room(known_room_id)
1890            .expect("The sync should have created a room in the invited state");
1891
1892        assert_eq!(left_room.state(), RoomState::Left);
1893        assert!(
1894            client.get_pending_key_bundle_details_for_room(known_room_id).await.unwrap().is_none()
1895        );
1896    }
1897}