Skip to main content

matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, timer};
28#[cfg(feature = "e2e-encryption")]
29use matrix_sdk_crypto::{
30    CollectStrategy, DecryptionSettings, EncryptionSettings, OlmError, OlmMachine,
31    OlmMachineBuilder, TrustRequirement, store::DynCryptoStore,
32    store::types::RoomPendingKeyBundleDetails, types::requests::ToDeviceRequest,
33};
34#[cfg(doc)]
35use ruma::DeviceId;
36#[cfg(feature = "e2e-encryption")]
37use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
38use ruma::{
39    OwnedRoomId, OwnedUserId, RoomId, UserId,
40    api::client::{self as api, sync::sync_events::v5},
41    events::{
42        StateEvent, StateEventType,
43        ignored_user_list::IgnoredUserListEventContent,
44        push_rules::{PushRulesEvent, PushRulesEventContent},
45        room::member::SyncRoomMemberEvent,
46    },
47    push::Ruleset,
48    time::Instant,
49};
50use tokio::sync::{Mutex, broadcast};
51#[cfg(feature = "e2e-encryption")]
52use tokio::sync::{RwLock, RwLockReadGuard};
53use tracing::{Level, debug, enabled, info, instrument, warn};
54
55#[cfg(feature = "e2e-encryption")]
56use crate::RoomMemberships;
57use crate::{
58    RoomStateFilter, SessionMeta, StateStore,
59    deserialized_responses::DisplayName,
60    error::{Error, Result},
61    event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
62    media::store::MediaStoreLock,
63    response_processors::{self as processors, Context},
64    room::{
65        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
66    },
67    store::{
68        AvatarCache, BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult,
69        RoomLoadSettings, StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt,
70        StoreConfig, ambiguity_map::AmbiguityCache,
71    },
72    sync::{RoomUpdates, SyncResponse},
73};
74
75/// A no (network) IO client implementation.
76///
77/// This client is a state machine that receives responses and events and
78/// accordingly updates its state. It is not designed to be used directly, but
79/// rather through `matrix_sdk::Client`.
80///
81/// ```rust
82/// use matrix_sdk_base::{
83///     BaseClient, DmRoomDefinition, ThreadingSupport, store::StoreConfig,
84/// };
85/// use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
86///
87/// let client = BaseClient::new(
88///     StoreConfig::new(CrossProcessLockConfig::multi_process(
89///         "cross-process-holder-name".to_owned(),
90///     )),
91///     ThreadingSupport::Disabled,
92///     DmRoomDefinition::default(),
93/// );
94/// ```
95#[derive(Clone)]
96pub struct BaseClient {
97    /// The state store.
98    pub(crate) state_store: BaseStateStore,
99
100    /// The store used by the event cache.
101    event_cache_store: EventCacheStoreLock,
102
103    /// The store used by the media cache.
104    media_store: MediaStoreLock,
105
106    /// The store used for encryption.
107    ///
108    /// This field is only meant to be used for `OlmMachine` initialization.
109    /// All operations on it happen inside the `OlmMachine`.
110    #[cfg(feature = "e2e-encryption")]
111    crypto_store: Arc<DynCryptoStore>,
112
113    /// The olm-machine that is created once the
114    /// [`SessionMeta`][crate::session::SessionMeta] is set via
115    /// [`BaseClient::activate`]
116    #[cfg(feature = "e2e-encryption")]
117    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
118
119    /// Observable of when a user is ignored/unignored.
120    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
121
122    /// The strategy to use for picking recipient devices, when sending an
123    /// encrypted message.
124    #[cfg(feature = "e2e-encryption")]
125    pub room_key_recipient_strategy: CollectStrategy,
126
127    /// The settings to use for decrypting events.
128    #[cfg(feature = "e2e-encryption")]
129    pub decryption_settings: DecryptionSettings,
130
131    /// If the client should handle verification events received when syncing.
132    #[cfg(feature = "e2e-encryption")]
133    pub handle_verification_events: bool,
134
135    /// Whether the client supports threads or not.
136    pub threading_support: ThreadingSupport,
137
138    /// The definition of what is considered a DM room.
139    pub dm_room_definition: DmRoomDefinition,
140}
141
142#[cfg(not(tarpaulin_include))]
143impl fmt::Debug for BaseClient {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        f.debug_struct("BaseClient")
146            .field("session_meta", &self.state_store.session_meta())
147            .field("sync_token", &self.state_store.sync_token)
148            .finish_non_exhaustive()
149    }
150}
151
152/// Whether this client instance supports threading or not. Currently used to
153/// determine how the client handles read receipts and unread count computations
154/// on the base SDK level.
155///
156/// Timelines on the other hand have a separate `TimelineFocus`
157/// `hide_threaded_events` associated value that can be used to hide threaded
158/// events but also to enable threaded read receipt sending. This is because
159/// certain timeline instances should ignore threading no matter what's defined
160/// at the client level. One such example are media filtered timelines which
161/// should contain all the room's media no matter what thread its in (unless
162/// explicitly opted into).
163#[derive(Clone, Copy, Debug)]
164pub enum ThreadingSupport {
165    /// Threading enabled.
166    Enabled {
167        /// Enable client-wide thread subscriptions support (MSC4306 / MSC4308).
168        ///
169        /// This may cause filtering out of thread subscriptions, and loading
170        /// the thread subscriptions via the sliding sync extension,
171        /// when the room list service is being used.
172        with_subscriptions: bool,
173    },
174    /// Threading disabled.
175    Disabled,
176}
177
178impl BaseClient {
179    /// Create a new client.
180    ///
181    /// # Arguments
182    ///
183    /// * `config` - the configuration for the stores (state store, event cache
184    ///   store and crypto store).
185    pub fn new(
186        config: StoreConfig,
187        threading_support: ThreadingSupport,
188        dm_room_definition: DmRoomDefinition,
189    ) -> Self {
190        let store = BaseStateStore::new(config.state_store);
191
192        BaseClient {
193            state_store: store,
194            event_cache_store: config.event_cache_store,
195            media_store: config.media_store,
196            #[cfg(feature = "e2e-encryption")]
197            crypto_store: config.crypto_store,
198            #[cfg(feature = "e2e-encryption")]
199            olm_machine: Default::default(),
200            ignore_user_list_changes: Default::default(),
201            #[cfg(feature = "e2e-encryption")]
202            room_key_recipient_strategy: Default::default(),
203            #[cfg(feature = "e2e-encryption")]
204            decryption_settings: DecryptionSettings {
205                sender_device_trust_requirement: TrustRequirement::Untrusted,
206            },
207            #[cfg(feature = "e2e-encryption")]
208            handle_verification_events: true,
209            threading_support,
210            dm_room_definition,
211        }
212    }
213
214    /// Clones the current base client to use the same crypto store but a
215    /// different, in-memory store config, and resets transient state.
216    #[cfg(feature = "e2e-encryption")]
217    pub async fn clone_with_in_memory_state_store(
218        &self,
219        cross_process_mode: CrossProcessLockConfig,
220        handle_verification_events: bool,
221    ) -> Result<Self> {
222        let config = StoreConfig::new(cross_process_mode).state_store(MemoryStore::new());
223        let config = config.crypto_store(self.crypto_store.clone());
224
225        let copy = Self {
226            state_store: BaseStateStore::new(config.state_store),
227            event_cache_store: config.event_cache_store,
228            media_store: config.media_store,
229            // We copy the crypto store as well as the `OlmMachine` for two reasons:
230            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
231            // 2. We need to ensure that the parent and child use the same data and caches inside
232            //    the `OlmMachine` so the various ratchets and places where new randomness gets
233            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
234            //    or Olm sessions when they encrypt or decrypt messages.
235            crypto_store: self.crypto_store.clone(),
236            olm_machine: self.olm_machine.clone(),
237            ignore_user_list_changes: Default::default(),
238            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
239            decryption_settings: self.decryption_settings.clone(),
240            handle_verification_events,
241            threading_support: self.threading_support,
242            dm_room_definition: self.dm_room_definition.clone(),
243        };
244
245        copy.state_store.derive_from_other(&self.state_store).await?;
246
247        Ok(copy)
248    }
249
250    /// Clones the current base client to use the same crypto store but a
251    /// different, in-memory store config, and resets transient state.
252    #[cfg(not(feature = "e2e-encryption"))]
253    #[allow(clippy::unused_async)]
254    pub async fn clone_with_in_memory_state_store(
255        &self,
256        cross_process_store_config: CrossProcessLockConfig,
257        _handle_verification_events: bool,
258    ) -> Result<Self> {
259        let config = StoreConfig::new(cross_process_store_config).state_store(MemoryStore::new());
260        Ok(Self::new(config, ThreadingSupport::Disabled, DmRoomDefinition::default()))
261    }
262
263    /// Get the session meta information.
264    ///
265    /// If the client is currently logged in, this will return a
266    /// [`SessionMeta`] object which contains the user ID and device ID.
267    /// Otherwise it returns `None`.
268    pub fn session_meta(&self) -> Option<&SessionMeta> {
269        self.state_store.session_meta()
270    }
271
272    /// Get all the rooms this client knows about.
273    pub fn rooms(&self) -> Vec<Room> {
274        self.state_store.rooms()
275    }
276
277    /// Get all the rooms this client knows about, filtered by room state.
278    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
279        self.state_store.rooms_filtered(filter)
280    }
281
282    /// Get a stream of all the rooms changes, in addition to the existing
283    /// rooms.
284    pub fn rooms_stream(
285        &self,
286    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
287        self.state_store.rooms_stream()
288    }
289
290    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
291    /// yet in the store
292    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
293        self.state_store.get_or_create_room(room_id, room_state)
294    }
295
296    /// Get a reference to the state store.
297    pub fn state_store(&self) -> &DynStateStore {
298        self.state_store.deref()
299    }
300
301    /// Get a reference to the event cache store.
302    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
303        &self.event_cache_store
304    }
305
306    /// Get a reference to the media store.
307    pub fn media_store(&self) -> &MediaStoreLock {
308        &self.media_store
309    }
310
311    /// Check whether the client has been activated.
312    ///
313    /// See [`BaseClient::activate`] to know what it means.
314    pub fn is_active(&self) -> bool {
315        self.state_store.session_meta().is_some()
316    }
317
318    /// Activate the client.
319    ///
320    /// A client is considered active when:
321    ///
322    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
323    /// 2. Has loaded cached data from storage,
324    /// 3. If encryption is enabled, it also initialized or restored its
325    ///    `OlmMachine`.
326    ///
327    /// # Arguments
328    ///
329    /// * `session_meta` - The meta of a session that the user already has from
330    ///   a previous login call.
331    ///
332    /// * `custom_account` - A custom
333    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
334    ///   identity and one-time keys of this [`BaseClient`]. If no account is
335    ///   provided, a new default one or one from the store will be used. If an
336    ///   account is provided and one already exists in the store for this
337    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
338    ///   useful if one wishes to create identity keys before knowing the
339    ///   user/device IDs, e.g., to use the identity key as the device ID.
340    ///
341    /// * `room_load_settings` — Specify how many rooms must be restored; use
342    ///   `::default()` if you don't know which value to pick.
343    ///
344    /// # Panics
345    ///
346    /// This method panics if it is called twice.
347    ///
348    /// [`UserId`]: ruma::UserId
349    pub async fn activate(
350        &self,
351        session_meta: SessionMeta,
352        room_load_settings: RoomLoadSettings,
353        #[cfg(feature = "e2e-encryption")] custom_account: Option<
354            crate::crypto::vodozemac::olm::Account,
355        >,
356    ) -> Result<()> {
357        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
358
359        self.state_store.load_rooms(&session_meta.user_id, room_load_settings).await?;
360        self.state_store.load_sync_token().await?;
361        self.state_store.set_session_meta(session_meta);
362
363        #[cfg(feature = "e2e-encryption")]
364        self.regenerate_olm(custom_account).await?;
365
366        Ok(())
367    }
368
369    /// Recreate an `OlmMachine` from scratch.
370    ///
371    /// In particular, this will clear all its caches.
372    #[cfg(feature = "e2e-encryption")]
373    pub async fn regenerate_olm(
374        &self,
375        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
376    ) -> Result<()> {
377        tracing::debug!("regenerating OlmMachine");
378        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
379
380        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
381        // because we suspect it has stale data.
382        let olm_machine = OlmMachineBuilder::new(&session_meta.user_id, &session_meta.device_id)
383            .with_crypto_store(self.crypto_store.clone())
384            .with_custom_account(custom_account)
385            .build()
386            .await
387            .map_err(OlmError::from)?;
388
389        *self.olm_machine.write().await = Some(olm_machine);
390        Ok(())
391    }
392
393    /// Get the current, if any, sync token of the client.
394    /// This will be None if the client didn't sync at least once.
395    pub async fn sync_token(&self) -> Option<String> {
396        self.state_store.sync_token.read().await.clone()
397    }
398
399    /// User has knocked on a room.
400    ///
401    /// Update the internal and cached state accordingly. Return the final Room.
402    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
403        let room = self.state_store.get_or_create_room(room_id, RoomState::Knocked);
404
405        if room.state() != RoomState::Knocked {
406            let store_guard = self.state_store.lock().lock().await;
407
408            // We are no longer joined to the room, so the invite acceptance details are no
409            // longer relevant.
410            #[cfg(feature = "e2e-encryption")]
411            if let Some(olm_machine) = self.olm_machine().await.as_ref() {
412                olm_machine.store().clear_room_pending_key_bundle(room_id).await?
413            }
414
415            room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
416                info.mark_as_knocked();
417                info.mark_state_partially_synced();
418                info.mark_members_missing(); // the own member event changed
419                (info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
420            })
421            .await?;
422        }
423
424        Ok(room)
425    }
426
427    /// The user has joined a room using this specific client.
428    ///
429    /// This method should be called if the user accepts an invite or if they
430    /// join a public room.
431    ///
432    /// The method will create a [`Room`] object if one does not exist yet and
433    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
434    /// object will be persisted in the cache. Please note that the [`Room`]
435    /// will be a stub until a sync has been received with the full room
436    /// state using [`BaseClient::receive_sync_response`].
437    ///
438    /// Update the internal and cached state accordingly. Return the final Room.
439    ///
440    /// # Arguments
441    ///
442    /// * `room_id` - The unique ID identifying the joined room.
443    /// * `inviter` - When joining this room in response to an invitation, the
444    ///   inviter should be recorded before sending the join request to the
445    ///   server. Providing the inviter here ensures that the
446    ///   [`RoomPendingKeyBundleDetails`] are stored for this room.
447    ///
448    /// # Examples
449    ///
450    /// ```rust
451    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport, DmRoomDefinition};
452    /// # use ruma::{OwnedRoomId, OwnedUserId, RoomId};
453    /// use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
454    /// # async {
455    /// # let client = BaseClient::new(
456    ///     StoreConfig::new(CrossProcessLockConfig::multi_process("example")),
457    ///     ThreadingSupport::Disabled,
458    ///     DmRoomDefinition::default()
459    /// );
460    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
461    /// # async fn maybe_get_inviter(room_id: &RoomId) -> anyhow::Result<Option<OwnedUserId>> { todo!() }
462    /// # let room_id: &RoomId = todo!();
463    /// let maybe_inviter = maybe_get_inviter(room_id).await?;
464    /// let room_id = send_join_request().await?;
465    /// let room = client.room_joined(&room_id, maybe_inviter).await?;
466    ///
467    /// assert_eq!(room.state(), RoomState::Joined);
468    /// # matrix_sdk_test::TestResult::Ok(()) };
469    /// ```
470    pub async fn room_joined(
471        &self,
472        room_id: &RoomId,
473        inviter: Option<OwnedUserId>,
474    ) -> Result<Room> {
475        let room = self.state_store.get_or_create_room(room_id, RoomState::Joined);
476
477        // If the state isn't `RoomState::Joined` then this means that we knew about
478        // this room before. Let's modify the existing state now.
479        if room.state() != RoomState::Joined {
480            let store_guard = self.state_store_lock().lock().await;
481
482            #[cfg(feature = "e2e-encryption")]
483            {
484                // If our previous state was an invite and we're now in the joined state, this
485                // means that the user has explicitly accepted an invite. Let's
486                // remember some details about the invite.
487                //
488                // This is somewhat of a workaround for our lack of cryptographic membership.
489                // Later on we will decide if historic room keys should be accepted
490                // based on this info. If a user has accepted an invite and we receive a room
491                // key bundle shortly after, we might accept it. If we don't do
492                // this, the homeserver could trick us into accepting any historic room key
493                // bundle.
494                let previous_state = room.state();
495                if previous_state == RoomState::Invited
496                    && let Some(inviter) = inviter
497                    && let Some(olm_machine) = self.olm_machine().await.as_ref()
498                {
499                    olm_machine.store().store_room_pending_key_bundle(room_id, &inviter).await?
500                }
501            }
502            #[cfg(not(feature = "e2e-encryption"))]
503            {
504                // suppress unused argument warning
505                let _ = inviter;
506            }
507
508            room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
509                info.mark_as_joined();
510                info.mark_state_partially_synced();
511                info.mark_members_missing(); // the own member event changed
512                (info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
513            })
514            .await?;
515        }
516
517        Ok(room)
518    }
519
520    /// User has left a room.
521    ///
522    /// Update the internal and cached state accordingly.
523    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
524        let room = self.state_store.get_or_create_room(room_id, RoomState::Left);
525
526        if room.state() != RoomState::Left {
527            let store_guard = self.state_store.lock().lock().await;
528
529            // We are no longer joined to the room, so the invite acceptance details are no
530            // longer relevant.
531            #[cfg(feature = "e2e-encryption")]
532            if let Some(olm_machine) = self.olm_machine().await.as_ref() {
533                olm_machine.store().clear_room_pending_key_bundle(room_id).await?
534            }
535
536            room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
537                info.mark_as_left();
538                info.mark_state_partially_synced();
539                info.mark_members_missing(); // the own member event changed
540                (info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
541            })
542            .await?;
543        }
544
545        Ok(())
546    }
547
548    /// Get a lock to the state store, with an exclusive access.
549    ///
550    /// It doesn't give an access to the state store itself. It's rather a lock
551    /// to synchronise all accesses to the state store.
552    pub fn state_store_lock(&self) -> &Mutex<()> {
553        self.state_store.lock()
554    }
555
556    /// Receive a response from a sync call.
557    ///
558    /// # Arguments
559    ///
560    /// * `response` - The response that we received after a successful sync.
561    #[instrument(skip_all)]
562    pub async fn receive_sync_response(
563        &self,
564        response: api::sync::sync_events::v3::Response,
565    ) -> Result<SyncResponse> {
566        self.receive_sync_response_with_requested_required_states(
567            response,
568            &RequestedRequiredStates::default(),
569        )
570        .await
571    }
572
573    /// Receive a response from a sync call, with the requested required state
574    /// events.
575    ///
576    /// # Arguments
577    ///
578    /// * `response` - The response that we received after a successful sync.
579    /// * `requested_required_states` - The requested required state events.
580    pub async fn receive_sync_response_with_requested_required_states(
581        &self,
582        response: api::sync::sync_events::v3::Response,
583        requested_required_states: &RequestedRequiredStates,
584    ) -> Result<SyncResponse> {
585        // The server might respond multiple times with the same sync token, in
586        // that case we already received this response and there's nothing to
587        // do.
588        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
589            info!("Got the same sync response twice");
590            return Ok(SyncResponse::default());
591        }
592
593        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
594
595        // Acquire the state store lock and hold on to it while processing
596        // the sync response below.
597        let state_store_guard = self.state_store_lock().lock().await;
598
599        let user_id = self
600            .session_meta()
601            .expect("Sync shouldn't run without an authenticated user")
602            .user_id
603            .to_owned();
604
605        #[cfg(feature = "e2e-encryption")]
606        let olm_machine = self.olm_machine().await;
607
608        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
609
610        #[cfg(feature = "e2e-encryption")]
611        let processors::e2ee::to_device::Output { processed_to_device_events: to_device } =
612            processors::e2ee::to_device::from_sync_v2(
613                &response,
614                olm_machine.as_ref(),
615                &self.decryption_settings,
616            )
617            .await?;
618
619        #[cfg(not(feature = "e2e-encryption"))]
620        let to_device = response
621            .to_device
622            .events
623            .into_iter()
624            .map(|raw| {
625                use matrix_sdk_common::deserialized_responses::{
626                    ProcessedToDeviceEvent, ToDeviceUnableToDecryptInfo,
627                    ToDeviceUnableToDecryptReason,
628                };
629
630                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
631                    if event_type == "m.room.encrypted" {
632                        ProcessedToDeviceEvent::UnableToDecrypt {
633                            encrypted_event: raw,
634                            utd_info: ToDeviceUnableToDecryptInfo {
635                                reason: ToDeviceUnableToDecryptReason::EncryptionIsDisabled,
636                            },
637                        }
638                    } else {
639                        ProcessedToDeviceEvent::PlainText(raw)
640                    }
641                } else {
642                    // Exclude events with no type
643                    ProcessedToDeviceEvent::Invalid(raw)
644                }
645            })
646            .collect();
647
648        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
649        let mut avatar_cache = AvatarCache::new(self.state_store.inner.clone());
650
651        let global_account_data_processor =
652            processors::account_data::global(&response.account_data.events);
653
654        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
655
656        let mut room_updates = RoomUpdates::default();
657        let mut notifications = Default::default();
658
659        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
660            BTreeMap::new();
661
662        #[cfg(feature = "e2e-encryption")]
663        let e2ee_context = processors::e2ee::E2EE::new(
664            olm_machine.as_ref(),
665            &self.decryption_settings,
666            self.handle_verification_events,
667        );
668
669        for (room_id, joined_room) in response.rooms.join {
670            let joined_room_update = processors::room::sync_v2::update_joined_room(
671                &mut context,
672                processors::room::RoomCreationData::new(
673                    &room_id,
674                    requested_required_states,
675                    &mut ambiguity_cache,
676                    &mut avatar_cache,
677                ),
678                joined_room,
679                &mut updated_members_in_room,
680                processors::notification::Notification::new(
681                    &push_rules,
682                    &mut notifications,
683                    &self.state_store,
684                ),
685                #[cfg(feature = "e2e-encryption")]
686                &e2ee_context,
687            )
688            .await?;
689
690            room_updates.joined.insert(room_id, joined_room_update);
691        }
692
693        for (room_id, left_room) in response.rooms.leave {
694            let left_room_update = processors::room::sync_v2::update_left_room(
695                &mut context,
696                processors::room::RoomCreationData::new(
697                    &room_id,
698                    requested_required_states,
699                    &mut ambiguity_cache,
700                    &mut avatar_cache,
701                ),
702                left_room,
703                processors::notification::Notification::new(
704                    &push_rules,
705                    &mut notifications,
706                    &self.state_store,
707                ),
708                #[cfg(feature = "e2e-encryption")]
709                &e2ee_context,
710            )
711            .await?;
712
713            room_updates.left.insert(room_id, left_room_update);
714        }
715
716        for (room_id, invited_room) in response.rooms.invite {
717            let invited_room_update = processors::room::sync_v2::update_invited_room(
718                &mut context,
719                &room_id,
720                &user_id,
721                invited_room,
722                processors::notification::Notification::new(
723                    &push_rules,
724                    &mut notifications,
725                    &self.state_store,
726                ),
727                #[cfg(feature = "e2e-encryption")]
728                &e2ee_context,
729            )
730            .await?;
731
732            room_updates.invited.insert(room_id, invited_room_update);
733        }
734
735        for (room_id, knocked_room) in response.rooms.knock {
736            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
737                &mut context,
738                &room_id,
739                &user_id,
740                knocked_room,
741                processors::notification::Notification::new(
742                    &push_rules,
743                    &mut notifications,
744                    &self.state_store,
745                ),
746                #[cfg(feature = "e2e-encryption")]
747                &e2ee_context,
748            )
749            .await?;
750
751            room_updates.knocked.insert(room_id, knocked_room_update);
752        }
753
754        global_account_data_processor.apply(&mut context, &self.state_store).await;
755
756        context.state_changes.presence = response
757            .presence
758            .events
759            .iter()
760            .filter_map(|e| {
761                let event = e.deserialize().ok()?;
762                Some((event.sender, e.clone()))
763            })
764            .collect();
765
766        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
767
768        processors::changes::save_and_apply(
769            context,
770            &self.state_store,
771            &state_store_guard,
772            &self.ignore_user_list_changes,
773            Some(response.next_batch.clone()),
774        )
775        .await?;
776
777        let mut context = Context::default();
778
779        // Now that all the rooms information have been saved, update the display name
780        // of the updated rooms (which relies on information stored in the database).
781        processors::room::display_name::update_for_rooms(
782            &mut context,
783            &room_updates,
784            &self.state_store,
785        )
786        .await;
787
788        // Save the new display name updates if any.
789        processors::changes::save_only(context, &self.state_store, &state_store_guard).await?;
790
791        for (room_id, member_ids) in updated_members_in_room {
792            if let Some(room) = self.get_room(&room_id) {
793                let _ =
794                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
795            }
796        }
797
798        // Release the state store lock
799        drop(state_store_guard);
800
801        if enabled!(Level::INFO) {
802            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
803        }
804
805        let response = SyncResponse {
806            rooms: room_updates,
807            presence: response.presence.events,
808            account_data: response.account_data.events,
809            to_device,
810            notifications,
811        };
812
813        Ok(response)
814    }
815
816    /// Receive a get member events response and convert it to a deserialized
817    /// `MembersResponse`
818    ///
819    /// This client-server request must be made without filters to make sure all
820    /// members are received. Otherwise, an error is returned.
821    ///
822    /// # Arguments
823    ///
824    /// * `room_id` - The room id this response belongs to.
825    ///
826    /// * `response` - The raw response that was received from the server.
827    #[instrument(skip_all, fields(?room_id))]
828    pub async fn receive_all_members(
829        &self,
830        room_id: &RoomId,
831        request: &api::membership::get_member_events::v3::Request,
832        response: &api::membership::get_member_events::v3::Response,
833    ) -> Result<()> {
834        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
835        {
836            // This function assumes all members are loaded at once to optimise how display
837            // name disambiguation works. Using it with partial member list results
838            // would produce incorrect disambiguated display name entries
839            return Err(Error::InvalidReceiveMembersParameters);
840        }
841
842        let Some(room) = self.state_store.room(room_id) else {
843            // The room is unknown to us: leave early.
844            return Ok(());
845        };
846
847        let mut chunk = Vec::with_capacity(response.chunk.len());
848        let mut context = Context::default();
849
850        #[cfg(feature = "e2e-encryption")]
851        let mut user_ids = BTreeSet::new();
852
853        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
854
855        for raw_event in &response.chunk {
856            let member = match raw_event.deserialize() {
857                Ok(ev) => ev,
858                Err(e) => {
859                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
860                    debug!(event_id, "Failed to deserialize member event: {e}");
861                    continue;
862                }
863            };
864
865            // TODO: All the actions in this loop used to be done only when the membership
866            // event was not in the store before. This was changed with the new room API,
867            // because e.g. leaving a room makes members events outdated and they need to be
868            // fetched by `members`. Therefore, they need to be overwritten here, even
869            // if they exist.
870            // However, this makes a new problem occur where setting the member events here
871            // potentially races with the sync.
872            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
873
874            #[cfg(feature = "e2e-encryption")]
875            match member.membership() {
876                MembershipState::Join | MembershipState::Invite => {
877                    user_ids.insert(member.state_key().to_owned());
878                }
879                _ => (),
880            }
881
882            if let StateEvent::Original(e) = &member
883                && let Some(d) = &e.content.displayname
884            {
885                let display_name = DisplayName::new(d);
886                ambiguity_map.entry(display_name).or_default().insert(member.state_key().clone());
887            }
888
889            let sync_member: SyncRoomMemberEvent = member.clone().into();
890            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
891
892            context
893                .state_changes
894                .state
895                .entry(room_id.to_owned())
896                .or_default()
897                .entry(member.event_type())
898                .or_default()
899                .insert(member.state_key().to_string(), raw_event.clone().cast());
900            chunk.push(member);
901        }
902
903        #[cfg(feature = "e2e-encryption")]
904        processors::e2ee::tracked_users::update(
905            self.olm_machine().await.as_ref(),
906            room.encryption_state(),
907            &user_ids,
908        )
909        .await?;
910
911        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
912
913        {
914            let state_store_guard = self.state_store_lock().lock().await;
915
916            let mut room_info = room.clone_info();
917            room_info.mark_members_synced();
918            context.state_changes.add_room(room_info);
919
920            processors::changes::save_and_apply(
921                context,
922                &self.state_store,
923                &state_store_guard,
924                &self.ignore_user_list_changes,
925                None,
926            )
927            .await?;
928        }
929
930        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
931
932        #[cfg(feature = "e2e-encryption")]
933        if let Some(olm) = self.olm_machine().await.as_ref() {
934            // With the introduction of MSC4268, it is no longer sufficient to check for
935            // changes to session recipients when we send a message, since we may miss
936            // join/leave pairs in our view of the room state. Instead, we should rotate
937            // the room key whenever we fully reload the member list as a precaution.
938            tracing::debug!("Rotating room key due to full member list reload");
939            if let Err(e) = olm.discard_room_key(room_id).await {
940                tracing::warn!("Error discarding room key: {e:?}");
941            }
942        }
943
944        Ok(())
945    }
946
947    /// Receive a successful filter upload response, the filter id will be
948    /// stored under the given name in the store.
949    ///
950    /// The filter id can later be retrieved with the [`get_filter`] method.
951    ///
952    ///
953    /// # Arguments
954    ///
955    /// * `filter_name` - The name that should be used to persist the filter id
956    ///   in the store.
957    ///
958    /// * `response` - The successful filter upload response containing the
959    ///   filter id.
960    ///
961    /// [`get_filter`]: #method.get_filter
962    pub async fn receive_filter_upload(
963        &self,
964        filter_name: &str,
965        response: &api::filter::create_filter::v3::Response,
966    ) -> Result<()> {
967        Ok(self
968            .state_store
969            .set_kv_data(
970                StateStoreDataKey::Filter(filter_name),
971                StateStoreDataValue::Filter(response.filter_id.clone()),
972            )
973            .await?)
974    }
975
976    /// Get the filter id of a previously uploaded filter.
977    ///
978    /// *Note*: A filter will first need to be uploaded and persisted using
979    /// [`receive_filter_upload`].
980    ///
981    /// # Arguments
982    ///
983    /// * `filter_name` - The name of the filter that was previously used to
984    ///   persist the filter.
985    ///
986    /// [`receive_filter_upload`]: #method.receive_filter_upload
987    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
988        let filter = self
989            .state_store
990            .get_kv_data(StateStoreDataKey::Filter(filter_name))
991            .await?
992            .map(|d| d.into_filter().expect("State store data not a filter"));
993
994        Ok(filter)
995    }
996
997    /// Get a to-device request that will share a room key with users in a room.
998    #[cfg(feature = "e2e-encryption")]
999    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1000        match self.olm_machine().await.as_ref() {
1001            Some(o) => {
1002                let Some(room) = self.get_room(room_id) else {
1003                    return Err(Error::InsufficientData);
1004                };
1005
1006                let history_visibility = room.history_visibility_or_default();
1007                let Some(room_encryption_event) = room.encryption_settings() else {
1008                    return Err(Error::EncryptionNotEnabled);
1009                };
1010
1011                // Don't share the group session with members that are invited
1012                // if the history visibility is set to `Joined`
1013                let filter = if history_visibility == HistoryVisibility::Joined {
1014                    RoomMemberships::JOIN
1015                } else {
1016                    RoomMemberships::ACTIVE
1017                };
1018
1019                let members = self.state_store.get_user_ids(room_id, filter).await?;
1020
1021                let Some(settings) = EncryptionSettings::from_possibly_redacted(
1022                    room_encryption_event,
1023                    history_visibility,
1024                    self.room_key_recipient_strategy.clone(),
1025                ) else {
1026                    return Err(Error::EncryptionNotEnabled);
1027                };
1028
1029                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1030            }
1031            None => panic!("Olm machine wasn't started"),
1032        }
1033    }
1034
1035    /// Get the room with the given room id.
1036    ///
1037    /// # Arguments
1038    ///
1039    /// * `room_id` - The id of the room that should be fetched.
1040    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1041        self.state_store.room(room_id)
1042    }
1043
1044    /// Forget the room with the given room ID.
1045    ///
1046    /// The room will be dropped from the room list and the store.
1047    ///
1048    /// # Arguments
1049    ///
1050    /// * `room_id` - The id of the room that should be forgotten.
1051    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1052        // Forget the room in the state store.
1053        self.state_store.forget_room(room_id).await?;
1054
1055        // Remove the room in the event cache store too.
1056        match self.event_cache_store().lock().await? {
1057            // If the lock is clear, we can do the operation as expected.
1058            // If the lock is dirty, we can ignore to refresh the state, we just need to remove a
1059            // room. Also, we must not mark the lock as non-dirty because other operations may be
1060            // critical and may need to refresh the `EventCache`' state.
1061            EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
1062                guard.remove_room(room_id).await?
1063            }
1064        }
1065
1066        Ok(())
1067    }
1068
1069    /// Get the olm machine.
1070    #[cfg(feature = "e2e-encryption")]
1071    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1072        self.olm_machine.read().await
1073    }
1074
1075    /// Get the push rules.
1076    ///
1077    /// Gets the push rules previously processed, otherwise get them from the
1078    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1079    /// is logged in.
1080    pub(crate) async fn get_push_rules(
1081        &self,
1082        global_account_data_processor: &processors::account_data::Global,
1083    ) -> Result<Ruleset> {
1084        let _timer = timer!(Level::TRACE, "get_push_rules");
1085        if let Some(event) = global_account_data_processor
1086            .push_rules()
1087            .and_then(|ev| ev.deserialize_as_unchecked::<PushRulesEvent>().ok())
1088        {
1089            Ok(event.content.global)
1090        } else if let Some(event) = self
1091            .state_store
1092            .get_account_data_event_static::<PushRulesEventContent>()
1093            .await?
1094            .and_then(|ev| ev.deserialize().ok())
1095        {
1096            Ok(event.content.global)
1097        } else if let Some(session_meta) = self.state_store.session_meta() {
1098            Ok(Ruleset::server_default(&session_meta.user_id))
1099        } else {
1100            Ok(Ruleset::new())
1101        }
1102    }
1103
1104    /// Returns a subscriber that publishes an event every time the ignore user
1105    /// list changes
1106    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1107        self.ignore_user_list_changes.subscribe()
1108    }
1109
1110    /// Returns a new receiver that gets future room info notable updates.
1111    ///
1112    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1113    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1114        self.state_store.room_info_notable_update_sender.subscribe()
1115    }
1116
1117    /// Checks whether the provided `user_id` belongs to an ignored user.
1118    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1119        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1120        {
1121            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1122                Ok(current_ignored_user_list) => {
1123                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1124                }
1125                Err(error) => {
1126                    warn!(?error, "Failed to deserialize the ignored user list event");
1127                    false
1128                }
1129            },
1130            Ok(None) => false,
1131            Err(error) => {
1132                warn!(?error, "Could not get the ignored user list from the state store");
1133                false
1134            }
1135        }
1136    }
1137
1138    /// Check the record of whether we are waiting for an [MSC4268] key bundle
1139    /// for the given room.
1140    ///
1141    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
1142    #[cfg(feature = "e2e-encryption")]
1143    pub async fn get_pending_key_bundle_details_for_room(
1144        &self,
1145        room_id: &RoomId,
1146    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
1147        let result = match self.olm_machine().await.as_ref() {
1148            Some(machine) => {
1149                machine.store().get_pending_key_bundle_details_for_room(room_id).await?
1150            }
1151            None => None,
1152        };
1153        Ok(result)
1154    }
1155
1156    /// Close all stores, releasing database connections and file locks.
1157    ///
1158    /// In-flight operations will complete before this returns.
1159    pub async fn close_stores(&self) -> Result<()> {
1160        self.state_store.close().await?;
1161        self.event_cache_store.close().await.map_err(Error::EventCacheStore)?;
1162        self.media_store.close().await.map_err(Error::MediaStore)?;
1163
1164        #[cfg(feature = "e2e-encryption")]
1165        self.crypto_store.close().await.map_err(Error::CryptoStore)?;
1166
1167        Ok(())
1168    }
1169
1170    /// Reopen all stores after a close, re-opening database connections.
1171    pub async fn reopen_stores(&self) -> Result<()> {
1172        #[cfg(feature = "e2e-encryption")]
1173        self.crypto_store.reopen().await.map_err(Error::CryptoStore)?;
1174
1175        self.media_store.reopen().await.map_err(Error::MediaStore)?;
1176        self.event_cache_store.reopen().await.map_err(Error::EventCacheStore)?;
1177        self.state_store.reopen().await?;
1178
1179        Ok(())
1180    }
1181}
1182
1183/// Represent the `required_state` values sent by a sync request.
1184///
1185/// This is useful to track what state events have been requested when handling
1186/// a response.
1187///
1188/// For example, if a sync requests the `m.room.encryption` state event, and the
1189/// server replies with nothing, if means the room **is not** encrypted. Without
1190/// knowing which state event was required by the sync, it is impossible to
1191/// interpret the absence of state event from the server as _the room's
1192/// encryption state is **not encrypted**_ or _the room's encryption state is
1193/// **unknown**_.
1194#[derive(Debug, Default)]
1195pub struct RequestedRequiredStates {
1196    default: Vec<(StateEventType, String)>,
1197    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1198}
1199
1200impl RequestedRequiredStates {
1201    /// Create a new `RequestedRequiredStates`.
1202    ///
1203    /// `default` represents the `required_state` value for all rooms.
1204    /// `for_rooms` is the `required_state` per room.
1205    pub fn new(
1206        default: Vec<(StateEventType, String)>,
1207        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1208    ) -> Self {
1209        Self { default, for_rooms }
1210    }
1211
1212    /// Get the `required_state` value for a specific room.
1213    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1214        self.for_rooms.get(room_id).unwrap_or(&self.default)
1215    }
1216}
1217
1218impl From<&v5::Request> for RequestedRequiredStates {
1219    fn from(request: &v5::Request) -> Self {
1220        // The following information is missing in the MSC4186 at the time of writing
1221        // (2025-03-12) but: the `required_state`s from all lists and from all room
1222        // subscriptions are combined by doing an union.
1223        //
1224        // Thus, we can do the same here, put the union in `default` and keep
1225        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1226        let mut default = BTreeSet::new();
1227
1228        for list in request.lists.values() {
1229            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1230        }
1231
1232        for room_subscription in request.room_subscriptions.values() {
1233            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1234        }
1235
1236        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1237    }
1238}
1239
1240/// An enum that defines what the [`BaseClient`] should consider a DM room.
1241#[derive(Debug, Clone, Default)]
1242#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
1243pub enum DmRoomDefinition {
1244    /// Standard Matrix spec definition: a room linked to a user in an
1245    /// `m.direct` event.
1246    #[default]
1247    MatrixSpec,
1248    /// A room that is direct, as per the spec but also contains at most 2
1249    /// active members.
1250    TwoMembers,
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use std::collections::HashMap;
1256
1257    use assert_matches2::assert_let;
1258    #[cfg(feature = "e2e-encryption")]
1259    use assert_matches2::assert_matches;
1260    use futures_util::FutureExt as _;
1261    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
1262    use matrix_sdk_test::{
1263        BOB, InvitedRoomBuilder, LeftRoomBuilder, SyncResponseBuilder, async_test,
1264        event_factory::EventFactory, ruma_response_from_json,
1265    };
1266    use ruma::{
1267        api::client::{self as api, sync::sync_events::v5},
1268        event_id,
1269        events::{StateEventType, room::member::MembershipState},
1270        room_id,
1271        serde::Raw,
1272        user_id,
1273    };
1274    use serde_json::{json, value::to_raw_value};
1275
1276    use super::{BaseClient, RequestedRequiredStates};
1277    use crate::{
1278        DmRoomDefinition, RoomDisplayName, RoomState, SessionMeta,
1279        client::ThreadingSupport,
1280        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1281        test_utils::logged_in_base_client,
1282    };
1283
1284    #[test]
1285    fn test_requested_required_states() {
1286        let room_id_0 = room_id!("!r0");
1287        let room_id_1 = room_id!("!r1");
1288
1289        let requested_required_states = RequestedRequiredStates::new(
1290            vec![(StateEventType::RoomAvatar, "".to_owned())],
1291            HashMap::from([(
1292                room_id_0.to_owned(),
1293                vec![
1294                    (StateEventType::RoomMember, "foo".to_owned()),
1295                    (StateEventType::RoomEncryption, "".to_owned()),
1296                ],
1297            )]),
1298        );
1299
1300        // A special set of state events exists for `room_id_0`.
1301        assert_eq!(
1302            requested_required_states.for_room(room_id_0),
1303            &[
1304                (StateEventType::RoomMember, "foo".to_owned()),
1305                (StateEventType::RoomEncryption, "".to_owned()),
1306            ]
1307        );
1308
1309        // No special list for `room_id_1`, it should return the defaults.
1310        assert_eq!(
1311            requested_required_states.for_room(room_id_1),
1312            &[(StateEventType::RoomAvatar, "".to_owned()),]
1313        );
1314    }
1315
1316    #[test]
1317    fn test_requested_required_states_from_sync_v5_request() {
1318        let room_id_0 = room_id!("!r0");
1319        let room_id_1 = room_id!("!r1");
1320
1321        // Empty request.
1322        let mut request = v5::Request::new();
1323
1324        {
1325            let requested_required_states = RequestedRequiredStates::from(&request);
1326
1327            assert!(requested_required_states.default.is_empty());
1328            assert!(requested_required_states.for_rooms.is_empty());
1329        }
1330
1331        // One list.
1332        request.lists.insert("foo".to_owned(), {
1333            let mut list = v5::request::List::default();
1334            list.room_details.required_state = vec![
1335                (StateEventType::RoomAvatar, "".to_owned()),
1336                (StateEventType::RoomEncryption, "".to_owned()),
1337            ];
1338
1339            list
1340        });
1341
1342        {
1343            let requested_required_states = RequestedRequiredStates::from(&request);
1344
1345            assert_eq!(
1346                requested_required_states.default,
1347                &[
1348                    (StateEventType::RoomAvatar, "".to_owned()),
1349                    (StateEventType::RoomEncryption, "".to_owned())
1350                ]
1351            );
1352            assert!(requested_required_states.for_rooms.is_empty());
1353        }
1354
1355        // Two lists.
1356        request.lists.insert("bar".to_owned(), {
1357            let mut list = v5::request::List::default();
1358            list.room_details.required_state = vec![
1359                (StateEventType::RoomEncryption, "".to_owned()),
1360                (StateEventType::RoomName, "".to_owned()),
1361            ];
1362
1363            list
1364        });
1365
1366        {
1367            let requested_required_states = RequestedRequiredStates::from(&request);
1368
1369            // Union of the state events.
1370            assert_eq!(
1371                requested_required_states.default,
1372                &[
1373                    (StateEventType::RoomAvatar, "".to_owned()),
1374                    (StateEventType::RoomEncryption, "".to_owned()),
1375                    (StateEventType::RoomName, "".to_owned()),
1376                ]
1377            );
1378            assert!(requested_required_states.for_rooms.is_empty());
1379        }
1380
1381        // One room subscription.
1382        request.room_subscriptions.insert(room_id_0.to_owned(), {
1383            let mut room_subscription = v5::request::RoomSubscription::default();
1384
1385            room_subscription.required_state = vec![
1386                (StateEventType::RoomJoinRules, "".to_owned()),
1387                (StateEventType::RoomEncryption, "".to_owned()),
1388            ];
1389
1390            room_subscription
1391        });
1392
1393        {
1394            let requested_required_states = RequestedRequiredStates::from(&request);
1395
1396            // Union of state events, all in `default`, still nothing in `for_rooms`.
1397            assert_eq!(
1398                requested_required_states.default,
1399                &[
1400                    (StateEventType::RoomAvatar, "".to_owned()),
1401                    (StateEventType::RoomEncryption, "".to_owned()),
1402                    (StateEventType::RoomJoinRules, "".to_owned()),
1403                    (StateEventType::RoomName, "".to_owned()),
1404                ]
1405            );
1406            assert!(requested_required_states.for_rooms.is_empty());
1407        }
1408
1409        // Two room subscriptions.
1410        request.room_subscriptions.insert(room_id_1.to_owned(), {
1411            let mut room_subscription = v5::request::RoomSubscription::default();
1412
1413            room_subscription.required_state = vec![
1414                (StateEventType::RoomName, "".to_owned()),
1415                (StateEventType::RoomTopic, "".to_owned()),
1416            ];
1417
1418            room_subscription
1419        });
1420
1421        {
1422            let requested_required_states = RequestedRequiredStates::from(&request);
1423
1424            // Union of state events, all in `default`, still nothing in `for_rooms`.
1425            assert_eq!(
1426                requested_required_states.default,
1427                &[
1428                    (StateEventType::RoomAvatar, "".to_owned()),
1429                    (StateEventType::RoomEncryption, "".to_owned()),
1430                    (StateEventType::RoomJoinRules, "".to_owned()),
1431                    (StateEventType::RoomName, "".to_owned()),
1432                    (StateEventType::RoomTopic, "".to_owned()),
1433                ]
1434            );
1435        }
1436    }
1437
1438    #[async_test]
1439    async fn test_invite_after_leaving() {
1440        let user_id = user_id!("@alice:example.org");
1441        let room_id = room_id!("!test:example.org");
1442
1443        let client = logged_in_base_client(Some(user_id)).await;
1444        let f = EventFactory::new();
1445
1446        let mut sync_builder = SyncResponseBuilder::new();
1447
1448        let response = sync_builder
1449            .add_left_room(
1450                LeftRoomBuilder::new(room_id).add_timeline_event(
1451                    EventFactory::new()
1452                        .member(user_id)
1453                        .membership(MembershipState::Leave)
1454                        .display_name("Alice")
1455                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1456                ),
1457            )
1458            .build_sync_response();
1459        client.receive_sync_response(response).await.unwrap();
1460        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1461
1462        let response = sync_builder
1463            .add_invited_room(
1464                InvitedRoomBuilder::new(room_id).add_state_event(
1465                    f.member(user_id)
1466                        .sender(user_id!("@example:example.org"))
1467                        .membership(MembershipState::Invite)
1468                        .display_name("Alice"),
1469                ),
1470            )
1471            .build_sync_response();
1472        client.receive_sync_response(response).await.unwrap();
1473        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1474    }
1475
1476    #[async_test]
1477    async fn test_invite_displayname() {
1478        let user_id = user_id!("@alice:example.org");
1479        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1480
1481        let client = logged_in_base_client(Some(user_id)).await;
1482
1483        let response = ruma_response_from_json(&json!({
1484            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1485            "device_one_time_keys_count": {
1486                "signed_curve25519": 50u64
1487            },
1488            "device_unused_fallback_key_types": [
1489                "signed_curve25519"
1490            ],
1491            "rooms": {
1492                "invite": {
1493                    "!ithpyNKDtmhneaTQja:example.org": {
1494                        "invite_state": {
1495                            "events": [
1496                                {
1497                                    "content": {
1498                                        "creator": "@test:example.org",
1499                                        "room_version": "9"
1500                                    },
1501                                    "sender": "@test:example.org",
1502                                    "state_key": "",
1503                                    "type": "m.room.create"
1504                                },
1505                                {
1506                                    "content": {
1507                                        "join_rule": "invite"
1508                                    },
1509                                    "sender": "@test:example.org",
1510                                    "state_key": "",
1511                                    "type": "m.room.join_rules"
1512                                },
1513                                {
1514                                    "content": {
1515                                        "algorithm": "m.megolm.v1.aes-sha2"
1516                                    },
1517                                    "sender": "@test:example.org",
1518                                    "state_key": "",
1519                                    "type": "m.room.encryption"
1520                                },
1521                                {
1522                                    "content": {
1523                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1524                                        "displayname": "Kyra",
1525                                        "membership": "join"
1526                                    },
1527                                    "sender": "@test:example.org",
1528                                    "state_key": "@test:example.org",
1529                                    "type": "m.room.member"
1530                                },
1531                                {
1532                                    "content": {
1533                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1534                                        "displayname": "alice",
1535                                        "is_direct": true,
1536                                        "membership": "invite"
1537                                    },
1538                                    "origin_server_ts": 1650878657984u64,
1539                                    "sender": "@test:example.org",
1540                                    "state_key": "@alice:example.org",
1541                                    "type": "m.room.member",
1542                                    "unsigned": {
1543                                        "age": 14u64
1544                                    },
1545                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1546                                }
1547                            ]
1548                        }
1549                    }
1550                }
1551            }
1552        }));
1553
1554        client.receive_sync_response(response).await.unwrap();
1555
1556        let room = client.get_room(room_id).expect("Room not found");
1557        assert_eq!(room.state(), RoomState::Invited);
1558        assert_eq!(
1559            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1560            RoomDisplayName::Calculated("Kyra".to_owned())
1561        );
1562    }
1563
1564    #[async_test]
1565    async fn test_deserialization_failure() {
1566        let user_id = user_id!("@alice:example.org");
1567        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1568
1569        let client = BaseClient::new(
1570            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1571            ThreadingSupport::Disabled,
1572            DmRoomDefinition::default(),
1573        );
1574        client
1575            .activate(
1576                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1577                RoomLoadSettings::default(),
1578                #[cfg(feature = "e2e-encryption")]
1579                None,
1580            )
1581            .await
1582            .unwrap();
1583
1584        let response = ruma_response_from_json(&json!({
1585            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1586            "rooms": {
1587                "join": {
1588                    "!ithpyNKDtmhneaTQja:example.org": {
1589                        "state": {
1590                            "events": [
1591                                {
1592                                    "invalid": "invalid",
1593                                },
1594                                {
1595                                    "content": {
1596                                        "name": "The room name"
1597                                    },
1598                                    "event_id": "$143273582443PhrSn:example.org",
1599                                    "origin_server_ts": 1432735824653u64,
1600                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1601                                    "sender": "@example:example.org",
1602                                    "state_key": "",
1603                                    "type": "m.room.name",
1604                                    "unsigned": {
1605                                        "age": 1234
1606                                    }
1607                                },
1608                            ]
1609                        }
1610                    }
1611                }
1612            }
1613        }));
1614
1615        client.receive_sync_response(response).await.unwrap();
1616        client
1617            .state_store()
1618            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1619            .await
1620            .expect("Failed to fetch state event")
1621            .expect("State event not found")
1622            .deserialize()
1623            .expect("Failed to deserialize state event");
1624    }
1625
1626    #[async_test]
1627    async fn test_invited_members_arent_ignored() {
1628        let user_id = user_id!("@alice:example.org");
1629        let inviter_user_id = user_id!("@bob:example.org");
1630        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1631
1632        let client = BaseClient::new(
1633            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1634            ThreadingSupport::Disabled,
1635            DmRoomDefinition::default(),
1636        );
1637        client
1638            .activate(
1639                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1640                RoomLoadSettings::default(),
1641                #[cfg(feature = "e2e-encryption")]
1642                None,
1643            )
1644            .await
1645            .unwrap();
1646
1647        // Preamble: let the SDK know about the room.
1648        let mut sync_builder = SyncResponseBuilder::new();
1649        let response = sync_builder
1650            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1651            .build_sync_response();
1652        client.receive_sync_response(response).await.unwrap();
1653
1654        // When I process the result of a /members request that only contains an invited
1655        // member,
1656        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1657
1658        let raw_member_event = json!({
1659            "content": {
1660                "avatar_url": "mxc://localhost/fewjilfewjil42",
1661                "displayname": "Invited Alice",
1662                "membership": "invite"
1663            },
1664            "event_id": "$151800140517rfvjc:localhost",
1665            "origin_server_ts": 151800140,
1666            "room_id": room_id,
1667            "sender": inviter_user_id,
1668            "state_key": user_id,
1669            "type": "m.room.member",
1670            "unsigned": {
1671                "age": 13374242,
1672            }
1673        });
1674        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1675            to_raw_value(&raw_member_event).unwrap(),
1676        )]);
1677
1678        // It's correctly processed,
1679        client.receive_all_members(room_id, &request, &response).await.unwrap();
1680
1681        let room = client.get_room(room_id).unwrap();
1682
1683        // And I can get the invited member display name and avatar.
1684        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1685
1686        assert_eq!(member.user_id(), user_id);
1687        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1688        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1689    }
1690
1691    #[async_test]
1692    async fn test_reinvited_members_get_a_display_name() {
1693        let user_id = user_id!("@alice:example.org");
1694        let inviter_user_id = user_id!("@bob:example.org");
1695        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1696
1697        let client = BaseClient::new(
1698            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1699            ThreadingSupport::Disabled,
1700            DmRoomDefinition::default(),
1701        );
1702        client
1703            .activate(
1704                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1705                RoomLoadSettings::default(),
1706                #[cfg(feature = "e2e-encryption")]
1707                None,
1708            )
1709            .await
1710            .unwrap();
1711
1712        // Preamble: let the SDK know about the room, and that the invited user left it.
1713        let f = EventFactory::new().sender(user_id);
1714        let mut sync_builder = SyncResponseBuilder::new();
1715        let response = sync_builder
1716            .add_joined_room(
1717                matrix_sdk_test::JoinedRoomBuilder::new(room_id)
1718                    .add_state_event(f.member(user_id).leave()),
1719            )
1720            .build_sync_response();
1721        client.receive_sync_response(response).await.unwrap();
1722
1723        // Now, say that the user has been re-invited.
1724        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1725
1726        let raw_member_event = json!({
1727            "content": {
1728                "avatar_url": "mxc://localhost/fewjilfewjil42",
1729                "displayname": "Invited Alice",
1730                "membership": "invite"
1731            },
1732            "event_id": "$151800140517rfvjc:localhost",
1733            "origin_server_ts": 151800140,
1734            "room_id": room_id,
1735            "sender": inviter_user_id,
1736            "state_key": user_id,
1737            "type": "m.room.member",
1738            "unsigned": {
1739                "age": 13374242,
1740            }
1741        });
1742        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1743            to_raw_value(&raw_member_event).unwrap(),
1744        )]);
1745
1746        // It's correctly processed,
1747        client.receive_all_members(room_id, &request, &response).await.unwrap();
1748
1749        let room = client.get_room(room_id).unwrap();
1750
1751        // And I can get the invited member display name and avatar.
1752        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1753
1754        assert_eq!(member.user_id(), user_id);
1755        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1756        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1757    }
1758
1759    #[async_test]
1760    async fn test_ignored_user_list_changes() {
1761        let user_id = user_id!("@alice:example.org");
1762        let client = BaseClient::new(
1763            StoreConfig::new(CrossProcessLockConfig::SingleProcess),
1764            ThreadingSupport::Disabled,
1765            DmRoomDefinition::default(),
1766        );
1767
1768        client
1769            .activate(
1770                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1771                RoomLoadSettings::default(),
1772                #[cfg(feature = "e2e-encryption")]
1773                None,
1774            )
1775            .await
1776            .unwrap();
1777
1778        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1779        assert!(subscriber.next().now_or_never().is_none());
1780
1781        let f = EventFactory::new();
1782        let mut sync_builder = SyncResponseBuilder::new();
1783        let response = sync_builder
1784            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1785            .build_sync_response();
1786        client.receive_sync_response(response).await.unwrap();
1787
1788        assert_let!(Some(ignored) = subscriber.next().await);
1789        assert_eq!(ignored, [BOB.to_string()]);
1790
1791        // Receive the same response.
1792        let response = sync_builder
1793            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1794            .build_sync_response();
1795        client.receive_sync_response(response).await.unwrap();
1796
1797        // No changes in the ignored list.
1798        assert!(subscriber.next().now_or_never().is_none());
1799
1800        // Now remove Bob from the ignored list.
1801        let response =
1802            sync_builder.add_global_account_data(f.ignored_user_list([])).build_sync_response();
1803        client.receive_sync_response(response).await.unwrap();
1804
1805        assert_let!(Some(ignored) = subscriber.next().await);
1806        assert!(ignored.is_empty());
1807    }
1808
1809    #[async_test]
1810    async fn test_is_user_ignored() {
1811        let ignored_user_id = user_id!("@alice:example.org");
1812        let client = logged_in_base_client(None).await;
1813
1814        let mut sync_builder = SyncResponseBuilder::new();
1815        let f = EventFactory::new();
1816        let response = sync_builder
1817            .add_global_account_data(f.ignored_user_list([ignored_user_id.to_owned()]))
1818            .build_sync_response();
1819        client.receive_sync_response(response).await.unwrap();
1820
1821        assert!(client.is_user_ignored(ignored_user_id).await);
1822    }
1823
1824    #[cfg(feature = "e2e-encryption")]
1825    #[async_test]
1826    async fn test_invite_details_are_set() {
1827        let user_id = user_id!("@alice:localhost");
1828        let client = logged_in_base_client(Some(user_id)).await;
1829        let known_room_id = room_id!("!invited:localhost");
1830        let unknown_room_id = room_id!("!unknown:localhost");
1831
1832        let mut sync_builder = SyncResponseBuilder::new();
1833        let response = sync_builder
1834            .add_invited_room(InvitedRoomBuilder::new(known_room_id))
1835            .build_sync_response();
1836        client.receive_sync_response(response).await.unwrap();
1837
1838        // Let us first check the initial state, we should have a room in the invite
1839        // state.
1840        let invited_room = client
1841            .get_room(known_room_id)
1842            .expect("The sync should have created a room in the invited state");
1843
1844        assert_eq!(invited_room.state(), RoomState::Invited);
1845        assert!(
1846            client.get_pending_key_bundle_details_for_room(known_room_id).await.unwrap().is_none()
1847        );
1848
1849        // Now we join the room.
1850        let joined_room = client
1851            .room_joined(known_room_id, Some(user_id.to_owned()))
1852            .await
1853            .expect("We should be able to mark a room as joined");
1854
1855        // Yup, we now have some invite details.
1856        assert_eq!(joined_room.state(), RoomState::Joined);
1857        assert_matches!(
1858            client.get_pending_key_bundle_details_for_room(known_room_id).await,
1859            Ok(Some(details))
1860        );
1861        assert_eq!(details.inviter, user_id);
1862
1863        // If we didn't know about the room before the join, we assume that there wasn't
1864        // an invite and we don't record the timestamp.
1865        assert!(client.get_room(unknown_room_id).is_none());
1866        let unknown_room = client
1867            .room_joined(unknown_room_id, Some(user_id.to_owned()))
1868            .await
1869            .expect("We should be able to mark a room as joined");
1870
1871        assert_eq!(unknown_room.state(), RoomState::Joined);
1872        assert!(
1873            client
1874                .get_pending_key_bundle_details_for_room(unknown_room_id)
1875                .await
1876                .unwrap()
1877                .is_none()
1878        );
1879
1880        sync_builder.clear();
1881        let response =
1882            sync_builder.add_left_room(LeftRoomBuilder::new(known_room_id)).build_sync_response();
1883        client.receive_sync_response(response).await.unwrap();
1884
1885        // Now that we left the room, we shouldn't have any details anymore.
1886        let left_room = client
1887            .get_room(known_room_id)
1888            .expect("The sync should have created a room in the invited state");
1889
1890        assert_eq!(left_room.state(), RoomState::Left);
1891        assert!(
1892            client.get_pending_key_bundle_details_for_room(known_room_id).await.unwrap().is_none()
1893        );
1894    }
1895}