matrix_sdk_base/
client.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19    collections::{BTreeMap, BTreeSet, HashMap},
20    fmt,
21    ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29    CollectStrategy, DecryptionSettings, EncryptionSettings, OlmError, OlmMachine,
30    TrustRequirement, store::DynCryptoStore, types::requests::ToDeviceRequest,
31};
32#[cfg(doc)]
33use ruma::DeviceId;
34#[cfg(feature = "e2e-encryption")]
35use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
36use ruma::{
37    MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UserId,
38    api::client::{self as api, sync::sync_events::v5},
39    events::{
40        StateEvent, StateEventType,
41        ignored_user_list::IgnoredUserListEventContent,
42        push_rules::{PushRulesEvent, PushRulesEventContent},
43        room::member::SyncRoomMemberEvent,
44    },
45    push::Ruleset,
46    time::Instant,
47};
48use tokio::sync::{Mutex, broadcast};
49#[cfg(feature = "e2e-encryption")]
50use tokio::sync::{RwLock, RwLockReadGuard};
51use tracing::{Level, debug, enabled, info, instrument, warn};
52
53#[cfg(feature = "e2e-encryption")]
54use crate::RoomMemberships;
55use crate::{
56    InviteAcceptanceDetails, RoomStateFilter, SessionMeta,
57    deserialized_responses::DisplayName,
58    error::{Error, Result},
59    event_cache::store::EventCacheStoreLock,
60    response_processors::{self as processors, Context},
61    room::{
62        Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
63    },
64    store::{
65        BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult, RoomLoadSettings,
66        StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, StoreConfig,
67        ambiguity_map::AmbiguityCache,
68    },
69    sync::{RoomUpdates, SyncResponse},
70};
71
72/// A no (network) IO client implementation.
73///
74/// This client is a state machine that receives responses and events and
75/// accordingly updates its state. It is not designed to be used directly, but
76/// rather through `matrix_sdk::Client`.
77///
78/// ```rust
79/// use matrix_sdk_base::{BaseClient, ThreadingSupport, store::StoreConfig};
80///
81/// let client = BaseClient::new(
82///     StoreConfig::new("cross-process-holder-name".to_owned()),
83///     ThreadingSupport::Disabled,
84/// );
85/// ```
86#[derive(Clone)]
87pub struct BaseClient {
88    /// The state store.
89    pub(crate) state_store: BaseStateStore,
90
91    /// The store used by the event cache.
92    event_cache_store: EventCacheStoreLock,
93
94    /// The store used for encryption.
95    ///
96    /// This field is only meant to be used for `OlmMachine` initialization.
97    /// All operations on it happen inside the `OlmMachine`.
98    #[cfg(feature = "e2e-encryption")]
99    crypto_store: Arc<DynCryptoStore>,
100
101    /// The olm-machine that is created once the
102    /// [`SessionMeta`][crate::session::SessionMeta] is set via
103    /// [`BaseClient::activate`]
104    #[cfg(feature = "e2e-encryption")]
105    olm_machine: Arc<RwLock<Option<OlmMachine>>>,
106
107    /// Observable of when a user is ignored/unignored.
108    pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
109
110    /// A sender that is used to communicate changes to room information. Each
111    /// tick contains the room ID and the reasons that have generated this tick.
112    pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
113
114    /// The strategy to use for picking recipient devices, when sending an
115    /// encrypted message.
116    #[cfg(feature = "e2e-encryption")]
117    pub room_key_recipient_strategy: CollectStrategy,
118
119    /// The settings to use for decrypting events.
120    #[cfg(feature = "e2e-encryption")]
121    pub decryption_settings: DecryptionSettings,
122
123    /// If the client should handle verification events received when syncing.
124    #[cfg(feature = "e2e-encryption")]
125    pub handle_verification_events: bool,
126
127    /// Whether the client supports threads or not.
128    pub threading_support: ThreadingSupport,
129}
130
131#[cfg(not(tarpaulin_include))]
132impl fmt::Debug for BaseClient {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        f.debug_struct("BaseClient")
135            .field("session_meta", &self.state_store.session_meta())
136            .field("sync_token", &self.state_store.sync_token)
137            .finish_non_exhaustive()
138    }
139}
140
141/// Whether this client instance supports threading or not. Currently used to
142/// determine how the client handles read receipts and unread count computations
143/// on the base SDK level.
144///
145/// Timelines on the other hand have a separate `TimelineFocus`
146/// `hide_threaded_events` associated value that can be used to hide threaded
147/// events but also to enable threaded read receipt sending. This is because
148/// certain timeline instances should ignore threading no matter what's defined
149/// at the client level. One such example are media filtered timelines which
150/// should contain all the room's media no matter what thread its in (unless
151/// explicitly opted into).
152#[derive(Clone, Copy, Debug)]
153pub enum ThreadingSupport {
154    /// Threading enabled.
155    Enabled {
156        /// Enable client-wide thread subscriptions support (MSC4306 / MSC4308).
157        ///
158        /// This may cause filtering out of thread subscriptions, and loading
159        /// the thread subscriptions via the sliding sync extension,
160        /// when the room list service is being used.
161        with_subscriptions: bool,
162    },
163    /// Threading disabled.
164    Disabled,
165}
166
167impl BaseClient {
168    /// Create a new client.
169    ///
170    /// # Arguments
171    ///
172    /// * `config` - the configuration for the stores (state store, event cache
173    ///   store and crypto store).
174    pub fn new(config: StoreConfig, threading_support: ThreadingSupport) -> Self {
175        let store = BaseStateStore::new(config.state_store);
176
177        // Create the channel to receive `RoomInfoNotableUpdate`.
178        //
179        // Let's consider the channel will receive 5 updates for 100 rooms maximum. This
180        // is unrealistic in practise, as the sync mechanism is pretty unlikely to
181        // trigger such amount of updates, it's a safe value.
182        //
183        // Also, note that it must not be
184        // zero, because (i) it will panic, (ii) a new user has no room, but can create
185        // rooms; remember that the channel's capacity is immutable.
186        let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
187            broadcast::channel(500);
188
189        BaseClient {
190            state_store: store,
191            event_cache_store: config.event_cache_store,
192            #[cfg(feature = "e2e-encryption")]
193            crypto_store: config.crypto_store,
194            #[cfg(feature = "e2e-encryption")]
195            olm_machine: Default::default(),
196            ignore_user_list_changes: Default::default(),
197            room_info_notable_update_sender,
198            #[cfg(feature = "e2e-encryption")]
199            room_key_recipient_strategy: Default::default(),
200            #[cfg(feature = "e2e-encryption")]
201            decryption_settings: DecryptionSettings {
202                sender_device_trust_requirement: TrustRequirement::Untrusted,
203            },
204            #[cfg(feature = "e2e-encryption")]
205            handle_verification_events: true,
206            threading_support,
207        }
208    }
209
210    /// Clones the current base client to use the same crypto store but a
211    /// different, in-memory store config, and resets transient state.
212    #[cfg(feature = "e2e-encryption")]
213    pub async fn clone_with_in_memory_state_store(
214        &self,
215        cross_process_store_locks_holder_name: &str,
216        handle_verification_events: bool,
217    ) -> Result<Self> {
218        let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
219            .state_store(MemoryStore::new());
220        let config = config.crypto_store(self.crypto_store.clone());
221
222        let copy = Self {
223            state_store: BaseStateStore::new(config.state_store),
224            event_cache_store: config.event_cache_store,
225            // We copy the crypto store as well as the `OlmMachine` for two reasons:
226            // 1. The `self.crypto_store` is the same as the one used inside the `OlmMachine`.
227            // 2. We need to ensure that the parent and child use the same data and caches inside
228            //    the `OlmMachine` so the various ratchets and places where new randomness gets
229            //    introduced don't diverge, i.e. one-time keys that get generated by the Olm Account
230            //    or Olm sessions when they encrypt or decrypt messages.
231            crypto_store: self.crypto_store.clone(),
232            olm_machine: self.olm_machine.clone(),
233            ignore_user_list_changes: Default::default(),
234            room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
235            room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
236            decryption_settings: self.decryption_settings.clone(),
237            handle_verification_events,
238            threading_support: self.threading_support,
239        };
240
241        copy.state_store
242            .derive_from_other(&self.state_store, &copy.room_info_notable_update_sender)
243            .await?;
244
245        Ok(copy)
246    }
247
248    /// Clones the current base client to use the same crypto store but a
249    /// different, in-memory store config, and resets transient state.
250    #[cfg(not(feature = "e2e-encryption"))]
251    #[allow(clippy::unused_async)]
252    pub async fn clone_with_in_memory_state_store(
253        &self,
254        cross_process_store_locks_holder: &str,
255        _handle_verification_events: bool,
256    ) -> Result<Self> {
257        let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
258            .state_store(MemoryStore::new());
259        Ok(Self::new(config, ThreadingSupport::Disabled))
260    }
261
262    /// Get the session meta information.
263    ///
264    /// If the client is currently logged in, this will return a
265    /// [`SessionMeta`] object which contains the user ID and device ID.
266    /// Otherwise it returns `None`.
267    pub fn session_meta(&self) -> Option<&SessionMeta> {
268        self.state_store.session_meta()
269    }
270
271    /// Get all the rooms this client knows about.
272    pub fn rooms(&self) -> Vec<Room> {
273        self.state_store.rooms()
274    }
275
276    /// Get all the rooms this client knows about, filtered by room state.
277    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
278        self.state_store.rooms_filtered(filter)
279    }
280
281    /// Get a stream of all the rooms changes, in addition to the existing
282    /// rooms.
283    pub fn rooms_stream(
284        &self,
285    ) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + use<>) {
286        self.state_store.rooms_stream()
287    }
288
289    /// Lookup the Room for the given RoomId, or create one, if it didn't exist
290    /// yet in the store
291    pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
292        self.state_store.get_or_create_room(
293            room_id,
294            room_state,
295            self.room_info_notable_update_sender.clone(),
296        )
297    }
298
299    /// Get a reference to the state store.
300    pub fn state_store(&self) -> &DynStateStore {
301        self.state_store.deref()
302    }
303
304    /// Get a reference to the event cache store.
305    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
306        &self.event_cache_store
307    }
308
309    /// Check whether the client has been activated.
310    ///
311    /// See [`BaseClient::activate`] to know what it means.
312    pub fn is_active(&self) -> bool {
313        self.state_store.session_meta().is_some()
314    }
315
316    /// Activate the client.
317    ///
318    /// A client is considered active when:
319    ///
320    /// 1. It has a `SessionMeta` (user ID, device ID and access token),
321    /// 2. Has loaded cached data from storage,
322    /// 3. If encryption is enabled, it also initialized or restored its
323    ///    `OlmMachine`.
324    ///
325    /// # Arguments
326    ///
327    /// * `session_meta` - The meta of a session that the user already has from
328    ///   a previous login call.
329    ///
330    /// * `custom_account` - A custom
331    ///   [`matrix_sdk_crypto::vodozemac::olm::Account`] to be used for the
332    ///   identity and one-time keys of this [`BaseClient`]. If no account is
333    ///   provided, a new default one or one from the store will be used. If an
334    ///   account is provided and one already exists in the store for this
335    ///   [`UserId`]/[`DeviceId`] combination, an error will be raised. This is
336    ///   useful if one wishes to create identity keys before knowing the
337    ///   user/device IDs, e.g., to use the identity key as the device ID.
338    ///
339    /// * `room_load_settings` — Specify how many rooms must be restored; use
340    ///   `::default()` if you don't know which value to pick.
341    ///
342    /// # Panics
343    ///
344    /// This method panics if it is called twice.
345    ///
346    /// [`UserId`]: ruma::UserId
347    pub async fn activate(
348        &self,
349        session_meta: SessionMeta,
350        room_load_settings: RoomLoadSettings,
351        #[cfg(feature = "e2e-encryption")] custom_account: Option<
352            crate::crypto::vodozemac::olm::Account,
353        >,
354    ) -> Result<()> {
355        debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
356
357        self.state_store
358            .load_rooms(
359                &session_meta.user_id,
360                room_load_settings,
361                &self.room_info_notable_update_sender,
362            )
363            .await?;
364        self.state_store.load_sync_token().await?;
365        self.state_store.set_session_meta(session_meta);
366
367        #[cfg(feature = "e2e-encryption")]
368        self.regenerate_olm(custom_account).await?;
369
370        Ok(())
371    }
372
373    /// Recreate an `OlmMachine` from scratch.
374    ///
375    /// In particular, this will clear all its caches.
376    #[cfg(feature = "e2e-encryption")]
377    pub async fn regenerate_olm(
378        &self,
379        custom_account: Option<crate::crypto::vodozemac::olm::Account>,
380    ) -> Result<()> {
381        tracing::debug!("regenerating OlmMachine");
382        let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
383
384        // Recreate the `OlmMachine` and wipe the in-memory cache in the store
385        // because we suspect it has stale data.
386        let olm_machine = OlmMachine::with_store(
387            &session_meta.user_id,
388            &session_meta.device_id,
389            self.crypto_store.clone(),
390            custom_account,
391        )
392        .await
393        .map_err(OlmError::from)?;
394
395        *self.olm_machine.write().await = Some(olm_machine);
396        Ok(())
397    }
398
399    /// Get the current, if any, sync token of the client.
400    /// This will be None if the client didn't sync at least once.
401    pub async fn sync_token(&self) -> Option<String> {
402        self.state_store.sync_token.read().await.clone()
403    }
404
405    /// User has knocked on a room.
406    ///
407    /// Update the internal and cached state accordingly. Return the final Room.
408    pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
409        let room = self.state_store.get_or_create_room(
410            room_id,
411            RoomState::Knocked,
412            self.room_info_notable_update_sender.clone(),
413        );
414
415        if room.state() != RoomState::Knocked {
416            let _sync_lock = self.sync_lock().lock().await;
417
418            let mut room_info = room.clone_info();
419            room_info.mark_as_knocked();
420            room_info.mark_state_partially_synced();
421            room_info.mark_members_missing(); // the own member event changed
422            let mut changes = StateChanges::default();
423            changes.add_room(room_info.clone());
424            self.state_store.save_changes(&changes).await?; // Update the store
425            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
426        }
427
428        Ok(room)
429    }
430
431    /// The user has joined a room using this specific client.
432    ///
433    /// This method should be called if the user accepts an invite or if they
434    /// join a public room.
435    ///
436    /// The method will create a [`Room`] object if one does not exist yet and
437    /// set the state of the [`Room`] to [`RoomState::Joined`]. The [`Room`]
438    /// object will be persisted in the cache. Please note that the [`Room`]
439    /// will be a stub until a sync has been received with the full room
440    /// state using [`BaseClient::receive_sync_response`].
441    ///
442    /// Update the internal and cached state accordingly. Return the final Room.
443    ///
444    /// # Arguments
445    ///
446    /// * `room_id` - The unique ID identifying the joined room.
447    /// * `inviter` - When joining this room in response to an invitation, the
448    ///   inviter should be recorded before sending the join request to the
449    ///   server. Providing the inviter here ensures that the
450    ///   [`InviteAcceptanceDetails`] are stored for this room.
451    ///
452    /// # Examples
453    ///
454    /// ```rust
455    /// # use matrix_sdk_base::{BaseClient, store::StoreConfig, RoomState, ThreadingSupport};
456    /// # use ruma::{OwnedRoomId, OwnedUserId, RoomId};
457    /// # async {
458    /// # let client = BaseClient::new(StoreConfig::new("example".to_owned()), ThreadingSupport::Disabled);
459    /// # async fn send_join_request() -> anyhow::Result<OwnedRoomId> { todo!() }
460    /// # async fn maybe_get_inviter(room_id: &RoomId) -> anyhow::Result<Option<OwnedUserId>> { todo!() }
461    /// # let room_id: &RoomId = todo!();
462    /// let maybe_inviter = maybe_get_inviter(room_id).await?;
463    /// let room_id = send_join_request().await?;
464    /// let room = client.room_joined(&room_id, maybe_inviter).await?;
465    ///
466    /// assert_eq!(room.state(), RoomState::Joined);
467    /// # matrix_sdk_test::TestResult::Ok(()) };
468    /// ```
469    pub async fn room_joined(
470        &self,
471        room_id: &RoomId,
472        inviter: Option<OwnedUserId>,
473    ) -> Result<Room> {
474        let room = self.state_store.get_or_create_room(
475            room_id,
476            RoomState::Joined,
477            self.room_info_notable_update_sender.clone(),
478        );
479
480        // If the state isn't `RoomState::Joined` then this means that we knew about
481        // this room before. Let's modify the existing state now.
482        if room.state() != RoomState::Joined {
483            let _sync_lock = self.sync_lock().lock().await;
484
485            let mut room_info = room.clone_info();
486            let previous_state = room.state();
487
488            room_info.mark_as_joined();
489            room_info.mark_state_partially_synced();
490            room_info.mark_members_missing(); // the own member event changed
491
492            // If our previous state was an invite and we're now in the joined state, this
493            // means that the user has explicitly accepted an invite. Let's
494            // remember some details about the invite.
495            //
496            // This is somewhat of a workaround for our lack of cryptographic membership.
497            // Later on we will decide if historic room keys should be accepted
498            // based on this info. If a user has accepted an invite and we receive a room
499            // key bundle shortly after, we might accept it. If we don't do
500            // this, the homeserver could trick us into accepting any historic room key
501            // bundle.
502            if previous_state == RoomState::Invited
503                && let Some(inviter) = inviter
504            {
505                let details = InviteAcceptanceDetails {
506                    invite_accepted_at: MilliSecondsSinceUnixEpoch::now(),
507                    inviter,
508                };
509                room_info.set_invite_acceptance_details(details);
510            }
511
512            let mut changes = StateChanges::default();
513            changes.add_room(room_info.clone());
514
515            self.state_store.save_changes(&changes).await?; // Update the store
516
517            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
518        }
519
520        Ok(room)
521    }
522
523    /// User has left a room.
524    ///
525    /// Update the internal and cached state accordingly.
526    pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
527        let room = self.state_store.get_or_create_room(
528            room_id,
529            RoomState::Left,
530            self.room_info_notable_update_sender.clone(),
531        );
532
533        if room.state() != RoomState::Left {
534            let _sync_lock = self.sync_lock().lock().await;
535
536            let mut room_info = room.clone_info();
537            room_info.mark_as_left();
538            room_info.mark_state_partially_synced();
539            room_info.mark_members_missing(); // the own member event changed
540            let mut changes = StateChanges::default();
541            changes.add_room(room_info.clone());
542            self.state_store.save_changes(&changes).await?; // Update the store
543            room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
544        }
545
546        Ok(())
547    }
548
549    /// Get access to the store's sync lock.
550    pub fn sync_lock(&self) -> &Mutex<()> {
551        self.state_store.sync_lock()
552    }
553
554    /// Receive a response from a sync call.
555    ///
556    /// # Arguments
557    ///
558    /// * `response` - The response that we received after a successful sync.
559    #[instrument(skip_all)]
560    pub async fn receive_sync_response(
561        &self,
562        response: api::sync::sync_events::v3::Response,
563    ) -> Result<SyncResponse> {
564        self.receive_sync_response_with_requested_required_states(
565            response,
566            &RequestedRequiredStates::default(),
567        )
568        .await
569    }
570
571    /// Receive a response from a sync call, with the requested required state
572    /// events.
573    ///
574    /// # Arguments
575    ///
576    /// * `response` - The response that we received after a successful sync.
577    /// * `requested_required_states` - The requested required state events.
578    pub async fn receive_sync_response_with_requested_required_states(
579        &self,
580        response: api::sync::sync_events::v3::Response,
581        requested_required_states: &RequestedRequiredStates,
582    ) -> Result<SyncResponse> {
583        // The server might respond multiple times with the same sync token, in
584        // that case we already received this response and there's nothing to
585        // do.
586        if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
587            info!("Got the same sync response twice");
588            return Ok(SyncResponse::default());
589        }
590
591        let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
592
593        #[cfg(feature = "e2e-encryption")]
594        let olm_machine = self.olm_machine().await;
595
596        let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
597
598        #[cfg(feature = "e2e-encryption")]
599        let to_device = {
600            let processors::e2ee::to_device::Output {
601                processed_to_device_events: to_device,
602                room_key_updates,
603            } = processors::e2ee::to_device::from_sync_v2(
604                &response,
605                olm_machine.as_ref(),
606                &self.decryption_settings,
607            )
608            .await?;
609
610            processors::latest_event::decrypt_from_rooms(
611                &mut context,
612                room_key_updates
613                    .into_iter()
614                    .flatten()
615                    .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
616                    .collect(),
617                processors::e2ee::E2EE::new(
618                    olm_machine.as_ref(),
619                    &self.decryption_settings,
620                    self.handle_verification_events,
621                ),
622            )
623            .await?;
624
625            to_device
626        };
627
628        #[cfg(not(feature = "e2e-encryption"))]
629        let to_device = response
630            .to_device
631            .events
632            .into_iter()
633            .map(|raw| {
634                use matrix_sdk_common::deserialized_responses::{
635                    ProcessedToDeviceEvent, ToDeviceUnableToDecryptInfo,
636                    ToDeviceUnableToDecryptReason,
637                };
638
639                if let Ok(Some(event_type)) = raw.get_field::<String>("type") {
640                    if event_type == "m.room.encrypted" {
641                        ProcessedToDeviceEvent::UnableToDecrypt {
642                            encrypted_event: raw,
643                            utd_info: ToDeviceUnableToDecryptInfo {
644                                reason: ToDeviceUnableToDecryptReason::EncryptionIsDisabled,
645                            },
646                        }
647                    } else {
648                        ProcessedToDeviceEvent::PlainText(raw)
649                    }
650                } else {
651                    // Exclude events with no type
652                    ProcessedToDeviceEvent::Invalid(raw)
653                }
654            })
655            .collect();
656
657        let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
658
659        let global_account_data_processor =
660            processors::account_data::global(&response.account_data.events);
661
662        let push_rules = self.get_push_rules(&global_account_data_processor).await?;
663
664        let mut room_updates = RoomUpdates::default();
665        let mut notifications = Default::default();
666
667        let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
668            BTreeMap::new();
669
670        for (room_id, joined_room) in response.rooms.join {
671            let joined_room_update = processors::room::sync_v2::update_joined_room(
672                &mut context,
673                processors::room::RoomCreationData::new(
674                    &room_id,
675                    self.room_info_notable_update_sender.clone(),
676                    requested_required_states,
677                    &mut ambiguity_cache,
678                ),
679                joined_room,
680                &mut updated_members_in_room,
681                processors::notification::Notification::new(
682                    &push_rules,
683                    &mut notifications,
684                    &self.state_store,
685                ),
686                #[cfg(feature = "e2e-encryption")]
687                processors::e2ee::E2EE::new(
688                    olm_machine.as_ref(),
689                    &self.decryption_settings,
690                    self.handle_verification_events,
691                ),
692            )
693            .await?;
694
695            room_updates.joined.insert(room_id, joined_room_update);
696        }
697
698        for (room_id, left_room) in response.rooms.leave {
699            let left_room_update = processors::room::sync_v2::update_left_room(
700                &mut context,
701                processors::room::RoomCreationData::new(
702                    &room_id,
703                    self.room_info_notable_update_sender.clone(),
704                    requested_required_states,
705                    &mut ambiguity_cache,
706                ),
707                left_room,
708                processors::notification::Notification::new(
709                    &push_rules,
710                    &mut notifications,
711                    &self.state_store,
712                ),
713                #[cfg(feature = "e2e-encryption")]
714                processors::e2ee::E2EE::new(
715                    olm_machine.as_ref(),
716                    &self.decryption_settings,
717                    self.handle_verification_events,
718                ),
719            )
720            .await?;
721
722            room_updates.left.insert(room_id, left_room_update);
723        }
724
725        for (room_id, invited_room) in response.rooms.invite {
726            let invited_room_update = processors::room::sync_v2::update_invited_room(
727                &mut context,
728                &room_id,
729                invited_room,
730                self.room_info_notable_update_sender.clone(),
731                processors::notification::Notification::new(
732                    &push_rules,
733                    &mut notifications,
734                    &self.state_store,
735                ),
736            )
737            .await?;
738
739            room_updates.invited.insert(room_id, invited_room_update);
740        }
741
742        for (room_id, knocked_room) in response.rooms.knock {
743            let knocked_room_update = processors::room::sync_v2::update_knocked_room(
744                &mut context,
745                &room_id,
746                knocked_room,
747                self.room_info_notable_update_sender.clone(),
748                processors::notification::Notification::new(
749                    &push_rules,
750                    &mut notifications,
751                    &self.state_store,
752                ),
753            )
754            .await?;
755
756            room_updates.knocked.insert(room_id, knocked_room_update);
757        }
758
759        global_account_data_processor.apply(&mut context, &self.state_store).await;
760
761        context.state_changes.presence = response
762            .presence
763            .events
764            .iter()
765            .filter_map(|e| {
766                let event = e.deserialize().ok()?;
767                Some((event.sender, e.clone()))
768            })
769            .collect();
770
771        context.state_changes.ambiguity_maps = ambiguity_cache.cache;
772
773        {
774            let _sync_lock = self.sync_lock().lock().await;
775
776            processors::changes::save_and_apply(
777                context,
778                &self.state_store,
779                &self.ignore_user_list_changes,
780                Some(response.next_batch.clone()),
781            )
782            .await?;
783        }
784
785        let mut context = Context::default();
786
787        // Now that all the rooms information have been saved, update the display name
788        // of the updated rooms (which relies on information stored in the database).
789        processors::room::display_name::update_for_rooms(
790            &mut context,
791            &room_updates,
792            &self.state_store,
793        )
794        .await;
795
796        // Save the new display name updates if any.
797        processors::changes::save_only(context, &self.state_store).await?;
798
799        for (room_id, member_ids) in updated_members_in_room {
800            if let Some(room) = self.get_room(&room_id) {
801                let _ =
802                    room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
803            }
804        }
805
806        if enabled!(Level::INFO) {
807            info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
808        }
809
810        let response = SyncResponse {
811            rooms: room_updates,
812            presence: response.presence.events,
813            account_data: response.account_data.events,
814            to_device,
815            notifications,
816        };
817
818        Ok(response)
819    }
820
821    /// Receive a get member events response and convert it to a deserialized
822    /// `MembersResponse`
823    ///
824    /// This client-server request must be made without filters to make sure all
825    /// members are received. Otherwise, an error is returned.
826    ///
827    /// # Arguments
828    ///
829    /// * `room_id` - The room id this response belongs to.
830    ///
831    /// * `response` - The raw response that was received from the server.
832    #[instrument(skip_all, fields(?room_id))]
833    pub async fn receive_all_members(
834        &self,
835        room_id: &RoomId,
836        request: &api::membership::get_member_events::v3::Request,
837        response: &api::membership::get_member_events::v3::Response,
838    ) -> Result<()> {
839        if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
840        {
841            // This function assumes all members are loaded at once to optimise how display
842            // name disambiguation works. Using it with partial member list results
843            // would produce incorrect disambiguated display name entries
844            return Err(Error::InvalidReceiveMembersParameters);
845        }
846
847        let Some(room) = self.state_store.room(room_id) else {
848            // The room is unknown to us: leave early.
849            return Ok(());
850        };
851
852        let mut chunk = Vec::with_capacity(response.chunk.len());
853        let mut context = Context::default();
854
855        #[cfg(feature = "e2e-encryption")]
856        let mut user_ids = BTreeSet::new();
857
858        let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
859
860        for raw_event in &response.chunk {
861            let member = match raw_event.deserialize() {
862                Ok(ev) => ev,
863                Err(e) => {
864                    let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
865                    debug!(event_id, "Failed to deserialize member event: {e}");
866                    continue;
867                }
868            };
869
870            // TODO: All the actions in this loop used to be done only when the membership
871            // event was not in the store before. This was changed with the new room API,
872            // because e.g. leaving a room makes members events outdated and they need to be
873            // fetched by `members`. Therefore, they need to be overwritten here, even
874            // if they exist.
875            // However, this makes a new problem occur where setting the member events here
876            // potentially races with the sync.
877            // See <https://github.com/matrix-org/matrix-rust-sdk/issues/1205>.
878
879            #[cfg(feature = "e2e-encryption")]
880            match member.membership() {
881                MembershipState::Join | MembershipState::Invite => {
882                    user_ids.insert(member.state_key().to_owned());
883                }
884                _ => (),
885            }
886
887            if let StateEvent::Original(e) = &member
888                && let Some(d) = &e.content.displayname
889            {
890                let display_name = DisplayName::new(d);
891                ambiguity_map.entry(display_name).or_default().insert(member.state_key().clone());
892            }
893
894            let sync_member: SyncRoomMemberEvent = member.clone().into();
895            processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
896
897            context
898                .state_changes
899                .state
900                .entry(room_id.to_owned())
901                .or_default()
902                .entry(member.event_type())
903                .or_default()
904                .insert(member.state_key().to_string(), raw_event.clone().cast());
905            chunk.push(member);
906        }
907
908        #[cfg(feature = "e2e-encryption")]
909        processors::e2ee::tracked_users::update(
910            self.olm_machine().await.as_ref(),
911            room.encryption_state(),
912            &user_ids,
913        )
914        .await?;
915
916        context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
917
918        let _sync_lock = self.sync_lock().lock().await;
919        let mut room_info = room.clone_info();
920        room_info.mark_members_synced();
921        context.state_changes.add_room(room_info);
922
923        processors::changes::save_and_apply(
924            context,
925            &self.state_store,
926            &self.ignore_user_list_changes,
927            None,
928        )
929        .await?;
930
931        let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
932
933        Ok(())
934    }
935
936    /// Receive a successful filter upload response, the filter id will be
937    /// stored under the given name in the store.
938    ///
939    /// The filter id can later be retrieved with the [`get_filter`] method.
940    ///
941    ///
942    /// # Arguments
943    ///
944    /// * `filter_name` - The name that should be used to persist the filter id
945    ///   in the store.
946    ///
947    /// * `response` - The successful filter upload response containing the
948    ///   filter id.
949    ///
950    /// [`get_filter`]: #method.get_filter
951    pub async fn receive_filter_upload(
952        &self,
953        filter_name: &str,
954        response: &api::filter::create_filter::v3::Response,
955    ) -> Result<()> {
956        Ok(self
957            .state_store
958            .set_kv_data(
959                StateStoreDataKey::Filter(filter_name),
960                StateStoreDataValue::Filter(response.filter_id.clone()),
961            )
962            .await?)
963    }
964
965    /// Get the filter id of a previously uploaded filter.
966    ///
967    /// *Note*: A filter will first need to be uploaded and persisted using
968    /// [`receive_filter_upload`].
969    ///
970    /// # Arguments
971    ///
972    /// * `filter_name` - The name of the filter that was previously used to
973    ///   persist the filter.
974    ///
975    /// [`receive_filter_upload`]: #method.receive_filter_upload
976    pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
977        let filter = self
978            .state_store
979            .get_kv_data(StateStoreDataKey::Filter(filter_name))
980            .await?
981            .map(|d| d.into_filter().expect("State store data not a filter"));
982
983        Ok(filter)
984    }
985
986    /// Get a to-device request that will share a room key with users in a room.
987    #[cfg(feature = "e2e-encryption")]
988    pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
989        match self.olm_machine().await.as_ref() {
990            Some(o) => {
991                let Some(room) = self.get_room(room_id) else {
992                    return Err(Error::InsufficientData);
993                };
994
995                let history_visibility = room.history_visibility_or_default();
996                let Some(room_encryption_event) = room.encryption_settings() else {
997                    return Err(Error::EncryptionNotEnabled);
998                };
999
1000                // Don't share the group session with members that are invited
1001                // if the history visibility is set to `Joined`
1002                let filter = if history_visibility == HistoryVisibility::Joined {
1003                    RoomMemberships::JOIN
1004                } else {
1005                    RoomMemberships::ACTIVE
1006                };
1007
1008                let members = self.state_store.get_user_ids(room_id, filter).await?;
1009
1010                let settings = EncryptionSettings::new(
1011                    room_encryption_event,
1012                    history_visibility,
1013                    self.room_key_recipient_strategy.clone(),
1014                );
1015
1016                Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1017            }
1018            None => panic!("Olm machine wasn't started"),
1019        }
1020    }
1021
1022    /// Get the room with the given room id.
1023    ///
1024    /// # Arguments
1025    ///
1026    /// * `room_id` - The id of the room that should be fetched.
1027    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1028        self.state_store.room(room_id)
1029    }
1030
1031    /// Forget the room with the given room ID.
1032    ///
1033    /// The room will be dropped from the room list and the store.
1034    ///
1035    /// # Arguments
1036    ///
1037    /// * `room_id` - The id of the room that should be forgotten.
1038    pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1039        // Forget the room in the state store.
1040        self.state_store.forget_room(room_id).await?;
1041
1042        // Remove the room in the event cache store too.
1043        self.event_cache_store().lock().await?.remove_room(room_id).await?;
1044
1045        Ok(())
1046    }
1047
1048    /// Get the olm machine.
1049    #[cfg(feature = "e2e-encryption")]
1050    pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1051        self.olm_machine.read().await
1052    }
1053
1054    /// Get the push rules.
1055    ///
1056    /// Gets the push rules previously processed, otherwise get them from the
1057    /// store. As a fallback, uses [`Ruleset::server_default`] if the user
1058    /// is logged in.
1059    pub(crate) async fn get_push_rules(
1060        &self,
1061        global_account_data_processor: &processors::account_data::Global,
1062    ) -> Result<Ruleset> {
1063        if let Some(event) = global_account_data_processor
1064            .push_rules()
1065            .and_then(|ev| ev.deserialize_as_unchecked::<PushRulesEvent>().ok())
1066        {
1067            Ok(event.content.global)
1068        } else if let Some(event) = self
1069            .state_store
1070            .get_account_data_event_static::<PushRulesEventContent>()
1071            .await?
1072            .and_then(|ev| ev.deserialize().ok())
1073        {
1074            Ok(event.content.global)
1075        } else if let Some(session_meta) = self.state_store.session_meta() {
1076            Ok(Ruleset::server_default(&session_meta.user_id))
1077        } else {
1078            Ok(Ruleset::new())
1079        }
1080    }
1081
1082    /// Returns a subscriber that publishes an event every time the ignore user
1083    /// list changes
1084    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1085        self.ignore_user_list_changes.subscribe()
1086    }
1087
1088    /// Returns a new receiver that gets future room info notable updates.
1089    ///
1090    /// Learn more by reading the [`RoomInfoNotableUpdate`] type.
1091    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1092        self.room_info_notable_update_sender.subscribe()
1093    }
1094
1095    /// Checks whether the provided `user_id` belongs to an ignored user.
1096    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
1097        match self.state_store.get_account_data_event_static::<IgnoredUserListEventContent>().await
1098        {
1099            Ok(Some(raw_ignored_user_list)) => match raw_ignored_user_list.deserialize() {
1100                Ok(current_ignored_user_list) => {
1101                    current_ignored_user_list.content.ignored_users.contains_key(user_id)
1102                }
1103                Err(error) => {
1104                    warn!(?error, "Failed to deserialize the ignored user list event");
1105                    false
1106                }
1107            },
1108            Ok(None) => false,
1109            Err(error) => {
1110                warn!(?error, "Could not get the ignored user list from the state store");
1111                false
1112            }
1113        }
1114    }
1115}
1116
1117/// Represent the `required_state` values sent by a sync request.
1118///
1119/// This is useful to track what state events have been requested when handling
1120/// a response.
1121///
1122/// For example, if a sync requests the `m.room.encryption` state event, and the
1123/// server replies with nothing, if means the room **is not** encrypted. Without
1124/// knowing which state event was required by the sync, it is impossible to
1125/// interpret the absence of state event from the server as _the room's
1126/// encryption state is **not encrypted**_ or _the room's encryption state is
1127/// **unknown**_.
1128#[derive(Debug, Default)]
1129pub struct RequestedRequiredStates {
1130    default: Vec<(StateEventType, String)>,
1131    for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1132}
1133
1134impl RequestedRequiredStates {
1135    /// Create a new `RequestedRequiredStates`.
1136    ///
1137    /// `default` represents the `required_state` value for all rooms.
1138    /// `for_rooms` is the `required_state` per room.
1139    pub fn new(
1140        default: Vec<(StateEventType, String)>,
1141        for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1142    ) -> Self {
1143        Self { default, for_rooms }
1144    }
1145
1146    /// Get the `required_state` value for a specific room.
1147    pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1148        self.for_rooms.get(room_id).unwrap_or(&self.default)
1149    }
1150}
1151
1152impl From<&v5::Request> for RequestedRequiredStates {
1153    fn from(request: &v5::Request) -> Self {
1154        // The following information is missing in the MSC4186 at the time of writing
1155        // (2025-03-12) but: the `required_state`s from all lists and from all room
1156        // subscriptions are combined by doing an union.
1157        //
1158        // Thus, we can do the same here, put the union in `default` and keep
1159        // `for_rooms` empty. The `Self::for_room` will automatically do the fallback.
1160        let mut default = BTreeSet::new();
1161
1162        for list in request.lists.values() {
1163            default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1164        }
1165
1166        for room_subscription in request.room_subscriptions.values() {
1167            default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1168        }
1169
1170        Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1171    }
1172}
1173
1174#[cfg(test)]
1175mod tests {
1176    use std::collections::HashMap;
1177
1178    use assert_matches2::{assert_let, assert_matches};
1179    use futures_util::FutureExt as _;
1180    use matrix_sdk_test::{
1181        BOB, InvitedRoomBuilder, LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent,
1182        SyncResponseBuilder, async_test, event_factory::EventFactory, ruma_response_from_json,
1183    };
1184    use ruma::{
1185        api::client::{self as api, sync::sync_events::v5},
1186        event_id,
1187        events::{StateEventType, room::member::MembershipState},
1188        room_id,
1189        serde::Raw,
1190        user_id,
1191    };
1192    use serde_json::{json, value::to_raw_value};
1193
1194    use super::{BaseClient, RequestedRequiredStates};
1195    use crate::{
1196        RoomDisplayName, RoomState, SessionMeta,
1197        client::ThreadingSupport,
1198        store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1199        test_utils::logged_in_base_client,
1200    };
1201
1202    #[test]
1203    fn test_requested_required_states() {
1204        let room_id_0 = room_id!("!r0");
1205        let room_id_1 = room_id!("!r1");
1206
1207        let requested_required_states = RequestedRequiredStates::new(
1208            vec![(StateEventType::RoomAvatar, "".to_owned())],
1209            HashMap::from([(
1210                room_id_0.to_owned(),
1211                vec![
1212                    (StateEventType::RoomMember, "foo".to_owned()),
1213                    (StateEventType::RoomEncryption, "".to_owned()),
1214                ],
1215            )]),
1216        );
1217
1218        // A special set of state events exists for `room_id_0`.
1219        assert_eq!(
1220            requested_required_states.for_room(room_id_0),
1221            &[
1222                (StateEventType::RoomMember, "foo".to_owned()),
1223                (StateEventType::RoomEncryption, "".to_owned()),
1224            ]
1225        );
1226
1227        // No special list for `room_id_1`, it should return the defaults.
1228        assert_eq!(
1229            requested_required_states.for_room(room_id_1),
1230            &[(StateEventType::RoomAvatar, "".to_owned()),]
1231        );
1232    }
1233
1234    #[test]
1235    fn test_requested_required_states_from_sync_v5_request() {
1236        let room_id_0 = room_id!("!r0");
1237        let room_id_1 = room_id!("!r1");
1238
1239        // Empty request.
1240        let mut request = v5::Request::new();
1241
1242        {
1243            let requested_required_states = RequestedRequiredStates::from(&request);
1244
1245            assert!(requested_required_states.default.is_empty());
1246            assert!(requested_required_states.for_rooms.is_empty());
1247        }
1248
1249        // One list.
1250        request.lists.insert("foo".to_owned(), {
1251            let mut list = v5::request::List::default();
1252            list.room_details.required_state = vec![
1253                (StateEventType::RoomAvatar, "".to_owned()),
1254                (StateEventType::RoomEncryption, "".to_owned()),
1255            ];
1256
1257            list
1258        });
1259
1260        {
1261            let requested_required_states = RequestedRequiredStates::from(&request);
1262
1263            assert_eq!(
1264                requested_required_states.default,
1265                &[
1266                    (StateEventType::RoomAvatar, "".to_owned()),
1267                    (StateEventType::RoomEncryption, "".to_owned())
1268                ]
1269            );
1270            assert!(requested_required_states.for_rooms.is_empty());
1271        }
1272
1273        // Two lists.
1274        request.lists.insert("bar".to_owned(), {
1275            let mut list = v5::request::List::default();
1276            list.room_details.required_state = vec![
1277                (StateEventType::RoomEncryption, "".to_owned()),
1278                (StateEventType::RoomName, "".to_owned()),
1279            ];
1280
1281            list
1282        });
1283
1284        {
1285            let requested_required_states = RequestedRequiredStates::from(&request);
1286
1287            // Union of the state events.
1288            assert_eq!(
1289                requested_required_states.default,
1290                &[
1291                    (StateEventType::RoomAvatar, "".to_owned()),
1292                    (StateEventType::RoomEncryption, "".to_owned()),
1293                    (StateEventType::RoomName, "".to_owned()),
1294                ]
1295            );
1296            assert!(requested_required_states.for_rooms.is_empty());
1297        }
1298
1299        // One room subscription.
1300        request.room_subscriptions.insert(room_id_0.to_owned(), {
1301            let mut room_subscription = v5::request::RoomSubscription::default();
1302
1303            room_subscription.required_state = vec![
1304                (StateEventType::RoomJoinRules, "".to_owned()),
1305                (StateEventType::RoomEncryption, "".to_owned()),
1306            ];
1307
1308            room_subscription
1309        });
1310
1311        {
1312            let requested_required_states = RequestedRequiredStates::from(&request);
1313
1314            // Union of state events, all in `default`, still nothing in `for_rooms`.
1315            assert_eq!(
1316                requested_required_states.default,
1317                &[
1318                    (StateEventType::RoomAvatar, "".to_owned()),
1319                    (StateEventType::RoomEncryption, "".to_owned()),
1320                    (StateEventType::RoomJoinRules, "".to_owned()),
1321                    (StateEventType::RoomName, "".to_owned()),
1322                ]
1323            );
1324            assert!(requested_required_states.for_rooms.is_empty());
1325        }
1326
1327        // Two room subscriptions.
1328        request.room_subscriptions.insert(room_id_1.to_owned(), {
1329            let mut room_subscription = v5::request::RoomSubscription::default();
1330
1331            room_subscription.required_state = vec![
1332                (StateEventType::RoomName, "".to_owned()),
1333                (StateEventType::RoomTopic, "".to_owned()),
1334            ];
1335
1336            room_subscription
1337        });
1338
1339        {
1340            let requested_required_states = RequestedRequiredStates::from(&request);
1341
1342            // Union of state events, all in `default`, still nothing in `for_rooms`.
1343            assert_eq!(
1344                requested_required_states.default,
1345                &[
1346                    (StateEventType::RoomAvatar, "".to_owned()),
1347                    (StateEventType::RoomEncryption, "".to_owned()),
1348                    (StateEventType::RoomJoinRules, "".to_owned()),
1349                    (StateEventType::RoomName, "".to_owned()),
1350                    (StateEventType::RoomTopic, "".to_owned()),
1351                ]
1352            );
1353        }
1354    }
1355
1356    #[async_test]
1357    async fn test_invite_after_leaving() {
1358        let user_id = user_id!("@alice:example.org");
1359        let room_id = room_id!("!test:example.org");
1360
1361        let client = logged_in_base_client(Some(user_id)).await;
1362
1363        let mut sync_builder = SyncResponseBuilder::new();
1364
1365        let response = sync_builder
1366            .add_left_room(
1367                LeftRoomBuilder::new(room_id).add_timeline_event(
1368                    EventFactory::new()
1369                        .member(user_id)
1370                        .membership(MembershipState::Leave)
1371                        .display_name("Alice")
1372                        .event_id(event_id!("$994173582443PhrSn:example.org")),
1373                ),
1374            )
1375            .build_sync_response();
1376        client.receive_sync_response(response).await.unwrap();
1377        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1378
1379        let response = sync_builder
1380            .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1381                StrippedStateTestEvent::Custom(json!({
1382                    "content": {
1383                        "displayname": "Alice",
1384                        "membership": "invite",
1385                    },
1386                    "event_id": "$143273582443PhrSn:example.org",
1387                    "origin_server_ts": 1432735824653u64,
1388                    "sender": "@example:example.org",
1389                    "state_key": user_id,
1390                    "type": "m.room.member",
1391                })),
1392            ))
1393            .build_sync_response();
1394        client.receive_sync_response(response).await.unwrap();
1395        assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1396    }
1397
1398    #[async_test]
1399    async fn test_invite_displayname() {
1400        let user_id = user_id!("@alice:example.org");
1401        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1402
1403        let client = logged_in_base_client(Some(user_id)).await;
1404
1405        let response = ruma_response_from_json(&json!({
1406            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1407            "device_one_time_keys_count": {
1408                "signed_curve25519": 50u64
1409            },
1410            "device_unused_fallback_key_types": [
1411                "signed_curve25519"
1412            ],
1413            "rooms": {
1414                "invite": {
1415                    "!ithpyNKDtmhneaTQja:example.org": {
1416                        "invite_state": {
1417                            "events": [
1418                                {
1419                                    "content": {
1420                                        "creator": "@test:example.org",
1421                                        "room_version": "9"
1422                                    },
1423                                    "sender": "@test:example.org",
1424                                    "state_key": "",
1425                                    "type": "m.room.create"
1426                                },
1427                                {
1428                                    "content": {
1429                                        "join_rule": "invite"
1430                                    },
1431                                    "sender": "@test:example.org",
1432                                    "state_key": "",
1433                                    "type": "m.room.join_rules"
1434                                },
1435                                {
1436                                    "content": {
1437                                        "algorithm": "m.megolm.v1.aes-sha2"
1438                                    },
1439                                    "sender": "@test:example.org",
1440                                    "state_key": "",
1441                                    "type": "m.room.encryption"
1442                                },
1443                                {
1444                                    "content": {
1445                                        "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1446                                        "displayname": "Kyra",
1447                                        "membership": "join"
1448                                    },
1449                                    "sender": "@test:example.org",
1450                                    "state_key": "@test:example.org",
1451                                    "type": "m.room.member"
1452                                },
1453                                {
1454                                    "content": {
1455                                        "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1456                                        "displayname": "alice",
1457                                        "is_direct": true,
1458                                        "membership": "invite"
1459                                    },
1460                                    "origin_server_ts": 1650878657984u64,
1461                                    "sender": "@test:example.org",
1462                                    "state_key": "@alice:example.org",
1463                                    "type": "m.room.member",
1464                                    "unsigned": {
1465                                        "age": 14u64
1466                                    },
1467                                    "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1468                                }
1469                            ]
1470                        }
1471                    }
1472                }
1473            }
1474        }));
1475
1476        client.receive_sync_response(response).await.unwrap();
1477
1478        let room = client.get_room(room_id).expect("Room not found");
1479        assert_eq!(room.state(), RoomState::Invited);
1480        assert_eq!(
1481            room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1482            RoomDisplayName::Calculated("Kyra".to_owned())
1483        );
1484    }
1485
1486    #[async_test]
1487    async fn test_deserialization_failure() {
1488        let user_id = user_id!("@alice:example.org");
1489        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1490
1491        let client = BaseClient::new(
1492            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1493            ThreadingSupport::Disabled,
1494        );
1495        client
1496            .activate(
1497                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1498                RoomLoadSettings::default(),
1499                #[cfg(feature = "e2e-encryption")]
1500                None,
1501            )
1502            .await
1503            .unwrap();
1504
1505        let response = ruma_response_from_json(&json!({
1506            "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1507            "rooms": {
1508                "join": {
1509                    "!ithpyNKDtmhneaTQja:example.org": {
1510                        "state": {
1511                            "events": [
1512                                {
1513                                    "invalid": "invalid",
1514                                },
1515                                {
1516                                    "content": {
1517                                        "name": "The room name"
1518                                    },
1519                                    "event_id": "$143273582443PhrSn:example.org",
1520                                    "origin_server_ts": 1432735824653u64,
1521                                    "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1522                                    "sender": "@example:example.org",
1523                                    "state_key": "",
1524                                    "type": "m.room.name",
1525                                    "unsigned": {
1526                                        "age": 1234
1527                                    }
1528                                },
1529                            ]
1530                        }
1531                    }
1532                }
1533            }
1534        }));
1535
1536        client.receive_sync_response(response).await.unwrap();
1537        client
1538            .state_store()
1539            .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1540            .await
1541            .expect("Failed to fetch state event")
1542            .expect("State event not found")
1543            .deserialize()
1544            .expect("Failed to deserialize state event");
1545    }
1546
1547    #[async_test]
1548    async fn test_invited_members_arent_ignored() {
1549        let user_id = user_id!("@alice:example.org");
1550        let inviter_user_id = user_id!("@bob:example.org");
1551        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1552
1553        let client = BaseClient::new(
1554            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1555            ThreadingSupport::Disabled,
1556        );
1557        client
1558            .activate(
1559                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1560                RoomLoadSettings::default(),
1561                #[cfg(feature = "e2e-encryption")]
1562                None,
1563            )
1564            .await
1565            .unwrap();
1566
1567        // Preamble: let the SDK know about the room.
1568        let mut sync_builder = SyncResponseBuilder::new();
1569        let response = sync_builder
1570            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1571            .build_sync_response();
1572        client.receive_sync_response(response).await.unwrap();
1573
1574        // When I process the result of a /members request that only contains an invited
1575        // member,
1576        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1577
1578        let raw_member_event = json!({
1579            "content": {
1580                "avatar_url": "mxc://localhost/fewjilfewjil42",
1581                "displayname": "Invited Alice",
1582                "membership": "invite"
1583            },
1584            "event_id": "$151800140517rfvjc:localhost",
1585            "origin_server_ts": 151800140,
1586            "room_id": room_id,
1587            "sender": inviter_user_id,
1588            "state_key": user_id,
1589            "type": "m.room.member",
1590            "unsigned": {
1591                "age": 13374242,
1592            }
1593        });
1594        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1595            to_raw_value(&raw_member_event).unwrap(),
1596        )]);
1597
1598        // It's correctly processed,
1599        client.receive_all_members(room_id, &request, &response).await.unwrap();
1600
1601        let room = client.get_room(room_id).unwrap();
1602
1603        // And I can get the invited member display name and avatar.
1604        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1605
1606        assert_eq!(member.user_id(), user_id);
1607        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1608        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1609    }
1610
1611    #[async_test]
1612    async fn test_reinvited_members_get_a_display_name() {
1613        let user_id = user_id!("@alice:example.org");
1614        let inviter_user_id = user_id!("@bob:example.org");
1615        let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1616
1617        let client = BaseClient::new(
1618            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1619            ThreadingSupport::Disabled,
1620        );
1621        client
1622            .activate(
1623                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1624                RoomLoadSettings::default(),
1625                #[cfg(feature = "e2e-encryption")]
1626                None,
1627            )
1628            .await
1629            .unwrap();
1630
1631        // Preamble: let the SDK know about the room, and that the invited user left it.
1632        let mut sync_builder = SyncResponseBuilder::new();
1633        let response = sync_builder
1634            .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1635                StateTestEvent::Custom(json!({
1636                    "content": {
1637                        "avatar_url": null,
1638                        "displayname": null,
1639                        "membership": "leave"
1640                    },
1641                    "event_id": "$151803140217rkvjc:localhost",
1642                    "origin_server_ts": 151800139,
1643                    "room_id": room_id,
1644                    "sender": user_id,
1645                    "state_key": user_id,
1646                    "type": "m.room.member",
1647                })),
1648            ))
1649            .build_sync_response();
1650        client.receive_sync_response(response).await.unwrap();
1651
1652        // Now, say that the user has been re-invited.
1653        let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1654
1655        let raw_member_event = json!({
1656            "content": {
1657                "avatar_url": "mxc://localhost/fewjilfewjil42",
1658                "displayname": "Invited Alice",
1659                "membership": "invite"
1660            },
1661            "event_id": "$151800140517rfvjc:localhost",
1662            "origin_server_ts": 151800140,
1663            "room_id": room_id,
1664            "sender": inviter_user_id,
1665            "state_key": user_id,
1666            "type": "m.room.member",
1667            "unsigned": {
1668                "age": 13374242,
1669            }
1670        });
1671        let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1672            to_raw_value(&raw_member_event).unwrap(),
1673        )]);
1674
1675        // It's correctly processed,
1676        client.receive_all_members(room_id, &request, &response).await.unwrap();
1677
1678        let room = client.get_room(room_id).unwrap();
1679
1680        // And I can get the invited member display name and avatar.
1681        let member = room.get_member(user_id).await.expect("ok").expect("exists");
1682
1683        assert_eq!(member.user_id(), user_id);
1684        assert_eq!(member.display_name().unwrap(), "Invited Alice");
1685        assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1686    }
1687
1688    #[async_test]
1689    async fn test_ignored_user_list_changes() {
1690        let user_id = user_id!("@alice:example.org");
1691        let client = BaseClient::new(
1692            StoreConfig::new("cross-process-store-locks-holder-name".to_owned()),
1693            ThreadingSupport::Disabled,
1694        );
1695
1696        client
1697            .activate(
1698                SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1699                RoomLoadSettings::default(),
1700                #[cfg(feature = "e2e-encryption")]
1701                None,
1702            )
1703            .await
1704            .unwrap();
1705
1706        let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1707        assert!(subscriber.next().now_or_never().is_none());
1708
1709        let f = EventFactory::new();
1710        let mut sync_builder = SyncResponseBuilder::new();
1711        let response = sync_builder
1712            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1713            .build_sync_response();
1714        client.receive_sync_response(response).await.unwrap();
1715
1716        assert_let!(Some(ignored) = subscriber.next().await);
1717        assert_eq!(ignored, [BOB.to_string()]);
1718
1719        // Receive the same response.
1720        let response = sync_builder
1721            .add_global_account_data(f.ignored_user_list([(*BOB).into()]))
1722            .build_sync_response();
1723        client.receive_sync_response(response).await.unwrap();
1724
1725        // No changes in the ignored list.
1726        assert!(subscriber.next().now_or_never().is_none());
1727
1728        // Now remove Bob from the ignored list.
1729        let response =
1730            sync_builder.add_global_account_data(f.ignored_user_list([])).build_sync_response();
1731        client.receive_sync_response(response).await.unwrap();
1732
1733        assert_let!(Some(ignored) = subscriber.next().await);
1734        assert!(ignored.is_empty());
1735    }
1736
1737    #[async_test]
1738    async fn test_is_user_ignored() {
1739        let ignored_user_id = user_id!("@alice:example.org");
1740        let client = logged_in_base_client(None).await;
1741
1742        let mut sync_builder = SyncResponseBuilder::new();
1743        let f = EventFactory::new();
1744        let response = sync_builder
1745            .add_global_account_data(f.ignored_user_list([ignored_user_id.to_owned()]))
1746            .build_sync_response();
1747        client.receive_sync_response(response).await.unwrap();
1748
1749        assert!(client.is_user_ignored(ignored_user_id).await);
1750    }
1751
1752    #[async_test]
1753    async fn test_invite_details_are_set() {
1754        let user_id = user_id!("@alice:localhost");
1755        let client = logged_in_base_client(Some(user_id)).await;
1756        let invited_room_id = room_id!("!invited:localhost");
1757        let unknown_room_id = room_id!("!unknown:localhost");
1758
1759        let mut sync_builder = SyncResponseBuilder::new();
1760        let response = sync_builder
1761            .add_invited_room(InvitedRoomBuilder::new(invited_room_id))
1762            .build_sync_response();
1763        client.receive_sync_response(response).await.unwrap();
1764
1765        // Let us first check the initial state, we should have a room in the invite
1766        // state.
1767        let invited_room = client
1768            .get_room(invited_room_id)
1769            .expect("The sync should have created a room in the invited state");
1770
1771        assert_eq!(invited_room.state(), RoomState::Invited);
1772        assert!(invited_room.invite_acceptance_details().is_none());
1773
1774        // Now we join the room.
1775        let joined_room = client
1776            .room_joined(invited_room_id, Some(user_id.to_owned()))
1777            .await
1778            .expect("We should be able to mark a room as joined");
1779
1780        // Yup, we now have some invite details.
1781        assert_eq!(joined_room.state(), RoomState::Joined);
1782        assert_matches!(joined_room.invite_acceptance_details(), Some(details));
1783        assert_eq!(details.inviter, user_id);
1784
1785        // If we didn't know about the room before the join, we assume that there wasn't
1786        // an invite and we don't record the timestamp.
1787        assert!(client.get_room(unknown_room_id).is_none());
1788        let unknown_room = client
1789            .room_joined(unknown_room_id, Some(user_id.to_owned()))
1790            .await
1791            .expect("We should be able to mark a room as joined");
1792
1793        assert_eq!(unknown_room.state(), RoomState::Joined);
1794        assert!(unknown_room.invite_acceptance_details().is_none());
1795
1796        sync_builder.clear();
1797        let response =
1798            sync_builder.add_left_room(LeftRoomBuilder::new(invited_room_id)).build_sync_response();
1799        client.receive_sync_response(response).await.unwrap();
1800
1801        // Now that we left the room, we shouldn't have any details anymore.
1802        let left_room = client
1803            .get_room(invited_room_id)
1804            .expect("The sync should have created a room in the invited state");
1805
1806        assert_eq!(left_room.state(), RoomState::Left);
1807        assert!(left_room.invite_acceptance_details().is_none());
1808    }
1809}