matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
32use matrix_sdk_base::{
33    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
34    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
35    event_cache::store::EventCacheStoreLock,
36    media::store::MediaStoreLock,
37    store::{
38        DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
39        WellKnownResponse,
40    },
41    sync::{Notification, RoomUpdates},
42    task_monitor::TaskMonitor,
43};
44use matrix_sdk_common::ttl_cache::TtlCache;
45#[cfg(feature = "e2e-encryption")]
46use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
47use ruma::{
48    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
49    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
50    api::{
51        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
52        auth_scheme::{AuthScheme, SendAccessToken},
53        client::{
54            account::whoami,
55            alias::{create_alias, delete_alias, get_alias},
56            authenticated_media,
57            device::{self, delete_devices, get_devices, update_device},
58            directory::{get_public_rooms, get_public_rooms_filtered},
59            discovery::{
60                discover_homeserver::{self, RtcFocusInfo},
61                get_capabilities::{self, v3::Capabilities},
62                get_supported_versions,
63            },
64            error::ErrorKind,
65            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
66            knock::knock_room,
67            media,
68            membership::{join_room_by_id, join_room_by_id_or_alias},
69            room::create_room,
70            session::login::v3::DiscoveryInfo,
71            sync::sync_events,
72            threads::get_thread_subscriptions_changes,
73            uiaa,
74            user_directory::search_users,
75        },
76        error::FromHttpResponseError,
77        path_builder::PathBuilder,
78    },
79    assign,
80    events::direct::DirectUserIdentifier,
81    push::Ruleset,
82    time::Instant,
83};
84use serde::de::DeserializeOwned;
85use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
86use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
87use url::Url;
88
89use self::{
90    caches::{CachedValue, ClientCaches},
91    futures::SendRequest,
92};
93use crate::{
94    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
95    Room, SessionTokens, TransmissionProgress,
96    authentication::{
97        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
98        oauth::OAuth,
99    },
100    client::thread_subscriptions::ThreadSubscriptionCatchup,
101    config::{RequestConfig, SyncToken},
102    deduplicating_handler::DeduplicatingHandler,
103    error::HttpResult,
104    event_cache::EventCache,
105    event_handler::{
106        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
107        EventHandlerStore, ObservableEventHandler, SyncEvent,
108    },
109    http_client::{HttpClient, SupportedPathBuilder},
110    latest_events::LatestEvents,
111    media::MediaError,
112    notification_settings::NotificationSettings,
113    room::RoomMember,
114    room_preview::RoomPreview,
115    send_queue::{SendQueue, SendQueueData},
116    sliding_sync::Version as SlidingSyncVersion,
117    sync::{RoomUpdate, SyncResponse},
118};
119#[cfg(feature = "e2e-encryption")]
120use crate::{
121    cross_process_lock::CrossProcessLock,
122    encryption::{
123        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
124        VerificationState,
125    },
126};
127
128mod builder;
129pub(crate) mod caches;
130pub(crate) mod futures;
131pub(crate) mod thread_subscriptions;
132
133pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
134#[cfg(feature = "experimental-search")]
135use crate::search_index::SearchIndex;
136
137#[cfg(not(target_family = "wasm"))]
138type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
139#[cfg(target_family = "wasm")]
140type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
141
142#[cfg(not(target_family = "wasm"))]
143type NotificationHandlerFn =
144    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
145#[cfg(target_family = "wasm")]
146type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
147
148/// Enum controlling if a loop running callbacks should continue or abort.
149///
150/// This is mainly used in the [`sync_with_callback`] method, the return value
151/// of the provided callback controls if the sync loop should be exited.
152///
153/// [`sync_with_callback`]: #method.sync_with_callback
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub enum LoopCtrl {
156    /// Continue running the loop.
157    Continue,
158    /// Break out of the loop.
159    Break,
160}
161
162/// Represents changes that can occur to a `Client`s `Session`.
163#[derive(Debug, Clone, PartialEq)]
164pub enum SessionChange {
165    /// The session's token is no longer valid.
166    UnknownToken {
167        /// Whether or not the session was soft logged out
168        soft_logout: bool,
169    },
170    /// The session's tokens have been refreshed.
171    TokensRefreshed,
172}
173
174/// Information about the server vendor obtained from the federation API.
175#[derive(Debug, Clone, PartialEq, Eq)]
176#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
177pub struct ServerVendorInfo {
178    /// The server name.
179    pub server_name: String,
180    /// The server version.
181    pub version: String,
182}
183
184/// An async/await enabled Matrix client.
185///
186/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
187#[derive(Clone)]
188pub struct Client {
189    pub(crate) inner: Arc<ClientInner>,
190}
191
192#[derive(Default)]
193pub(crate) struct ClientLocks {
194    /// Lock ensuring that only a single room may be marked as a DM at once.
195    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
196    /// explanation.
197    pub(crate) mark_as_dm_lock: Mutex<()>,
198
199    /// Lock ensuring that only a single secret store is getting opened at the
200    /// same time.
201    ///
202    /// This is important so we don't accidentally create multiple different new
203    /// default secret storage keys.
204    #[cfg(feature = "e2e-encryption")]
205    pub(crate) open_secret_store_lock: Mutex<()>,
206
207    /// Lock ensuring that we're only storing a single secret at a time.
208    ///
209    /// Take a look at the [`SecretStore::put_secret`] method for a more
210    /// detailed explanation.
211    ///
212    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
213    #[cfg(feature = "e2e-encryption")]
214    pub(crate) store_secret_lock: Mutex<()>,
215
216    /// Lock ensuring that only one method at a time might modify our backup.
217    #[cfg(feature = "e2e-encryption")]
218    pub(crate) backup_modify_lock: Mutex<()>,
219
220    /// Lock ensuring that we're going to attempt to upload backups for a single
221    /// requester.
222    #[cfg(feature = "e2e-encryption")]
223    pub(crate) backup_upload_lock: Mutex<()>,
224
225    /// Handler making sure we only have one group session sharing request in
226    /// flight per room.
227    #[cfg(feature = "e2e-encryption")]
228    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230    /// Lock making sure we're only doing one key claim request at a time.
231    #[cfg(feature = "e2e-encryption")]
232    pub(crate) key_claim_lock: Mutex<()>,
233
234    /// Handler to ensure that only one members request is running at a time,
235    /// given a room.
236    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238    /// Handler to ensure that only one encryption state request is running at a
239    /// time, given a room.
240    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
241
242    /// Deduplicating handler for sending read receipts. The string is an
243    /// internal implementation detail, see [`Self::send_single_receipt`].
244    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
245
246    #[cfg(feature = "e2e-encryption")]
247    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
248
249    /// Latest "generation" of data known by the crypto store.
250    ///
251    /// This is a counter that only increments, set in the database (and can
252    /// wrap). It's incremented whenever some process acquires a lock for the
253    /// first time. *This assumes the crypto store lock is being held, to
254    /// avoid data races on writing to this value in the store*.
255    ///
256    /// The current process will maintain this value in local memory and in the
257    /// DB over time. Observing a different value than the one read in
258    /// memory, when reading from the store indicates that somebody else has
259    /// written into the database under our feet.
260    ///
261    /// TODO: this should live in the `OlmMachine`, since it's information
262    /// related to the lock. As of today (2023-07-28), we blow up the entire
263    /// olm machine when there's a generation mismatch. So storing the
264    /// generation in the olm machine would make the client think there's
265    /// *always* a mismatch, and that's why we need to store the generation
266    /// outside the `OlmMachine`.
267    #[cfg(feature = "e2e-encryption")]
268    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
269}
270
271pub(crate) struct ClientInner {
272    /// All the data related to authentication and authorization.
273    pub(crate) auth_ctx: Arc<AuthCtx>,
274
275    /// The URL of the server.
276    ///
277    /// Not to be confused with the `Self::homeserver`. `server` is usually
278    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
279    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
280    /// homeserver (at the time of writing — 2024-08-28).
281    ///
282    /// This value is optional depending on how the `Client` has been built.
283    /// If it's been built from a homeserver URL directly, we don't know the
284    /// server. However, if the `Client` has been built from a server URL or
285    /// name, then the homeserver has been discovered, and we know both.
286    server: Option<Url>,
287
288    /// The URL of the homeserver to connect to.
289    ///
290    /// This is the URL for the client-server Matrix API.
291    homeserver: StdRwLock<Url>,
292
293    /// The sliding sync version.
294    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
295
296    /// The underlying HTTP client.
297    pub(crate) http_client: HttpClient,
298
299    /// User session data.
300    pub(super) base_client: BaseClient,
301
302    /// Collection of in-memory caches for the [`Client`].
303    pub(crate) caches: ClientCaches,
304
305    /// Collection of locks individual client methods might want to use, either
306    /// to ensure that only a single call to a method happens at once or to
307    /// deduplicate multiple calls to a method.
308    pub(crate) locks: ClientLocks,
309
310    /// The cross-process store locks holder name.
311    ///
312    /// The SDK provides cross-process store locks (see
313    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
314    /// `holder_name` is the value used for all cross-process store locks
315    /// used by this `Client`.
316    ///
317    /// If multiple `Client`s are running in different processes, this
318    /// value MUST be different for each `Client`.
319    cross_process_store_locks_holder_name: String,
320
321    /// A mapping of the times at which the current user sent typing notices,
322    /// keyed by room.
323    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
324
325    /// Event handlers. See `add_event_handler`.
326    pub(crate) event_handlers: EventHandlerStore,
327
328    /// Notification handlers. See `register_notification_handler`.
329    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
330
331    /// The sender-side of channels used to receive room updates.
332    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
333
334    /// The sender-side of a channel used to observe all the room updates of a
335    /// sync response.
336    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
337
338    /// Whether the client should update its homeserver URL with the discovery
339    /// information present in the login response.
340    respect_login_well_known: bool,
341
342    /// An event that can be listened on to wait for a successful sync. The
343    /// event will only be fired if a sync loop is running. Can be used for
344    /// synchronization, e.g. if we send out a request to create a room, we can
345    /// wait for the sync to get the data to fetch a room object from the state
346    /// store.
347    pub(crate) sync_beat: event_listener::Event,
348
349    /// A central cache for events, inactive first.
350    ///
351    /// It becomes active when [`EventCache::subscribe`] is called.
352    pub(crate) event_cache: OnceCell<EventCache>,
353
354    /// End-to-end encryption related state.
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) e2ee: EncryptionData,
357
358    /// The verification state of our own device.
359    #[cfg(feature = "e2e-encryption")]
360    pub(crate) verification_state: SharedObservable<VerificationState>,
361
362    /// Whether to enable the experimental support for sending and receiving
363    /// encrypted room history on invite, per [MSC4268].
364    ///
365    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
366    #[cfg(feature = "e2e-encryption")]
367    pub(crate) enable_share_history_on_invite: bool,
368
369    /// Data related to the [`SendQueue`].
370    ///
371    /// [`SendQueue`]: crate::send_queue::SendQueue
372    pub(crate) send_queue_data: Arc<SendQueueData>,
373
374    /// The `max_upload_size` value of the homeserver, it contains the max
375    /// request size you can send.
376    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
377
378    /// The entry point to get the [`LatestEvent`] of rooms and threads.
379    ///
380    /// [`LatestEvent`]: crate::latest_event::LatestEvent
381    latest_events: OnceCell<LatestEvents>,
382
383    /// Service handling the catching up of thread subscriptions in the
384    /// background.
385    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
386
387    #[cfg(feature = "experimental-search")]
388    /// Handler for [`RoomIndex`]'s of each room
389    search_index: SearchIndex,
390
391    /// A monitor for background tasks spawned by the client.
392    pub(crate) task_monitor: TaskMonitor,
393
394    /// A sender to notify subscribers about duplicate key upload errors
395    /// triggered by requests to /keys/upload.
396    #[cfg(feature = "e2e-encryption")]
397    pub(crate) duplicate_key_upload_error_sender:
398        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
399}
400
401impl ClientInner {
402    /// Create a new `ClientInner`.
403    ///
404    /// All the fields passed as parameters here are those that must be cloned
405    /// upon instantiation of a sub-client, e.g. a client specialized for
406    /// notifications.
407    #[allow(clippy::too_many_arguments)]
408    async fn new(
409        auth_ctx: Arc<AuthCtx>,
410        server: Option<Url>,
411        homeserver: Url,
412        sliding_sync_version: SlidingSyncVersion,
413        http_client: HttpClient,
414        base_client: BaseClient,
415        supported_versions: CachedValue<SupportedVersions>,
416        well_known: CachedValue<Option<WellKnownResponse>>,
417        respect_login_well_known: bool,
418        event_cache: OnceCell<EventCache>,
419        send_queue: Arc<SendQueueData>,
420        latest_events: OnceCell<LatestEvents>,
421        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
422        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
423        cross_process_store_locks_holder_name: String,
424        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
425        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
426    ) -> Arc<Self> {
427        let caches = ClientCaches {
428            supported_versions: supported_versions.into(),
429            well_known: well_known.into(),
430            server_metadata: Mutex::new(TtlCache::new()),
431        };
432
433        let client = Self {
434            server,
435            homeserver: StdRwLock::new(homeserver),
436            auth_ctx,
437            sliding_sync_version: StdRwLock::new(sliding_sync_version),
438            http_client,
439            base_client,
440            caches,
441            locks: Default::default(),
442            cross_process_store_locks_holder_name,
443            typing_notice_times: Default::default(),
444            event_handlers: Default::default(),
445            notification_handlers: Default::default(),
446            room_update_channels: Default::default(),
447            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
448            // ballast for all observers to catch up.
449            room_updates_sender: broadcast::Sender::new(32),
450            respect_login_well_known,
451            sync_beat: event_listener::Event::new(),
452            event_cache,
453            send_queue_data: send_queue,
454            latest_events,
455            #[cfg(feature = "e2e-encryption")]
456            e2ee: EncryptionData::new(encryption_settings),
457            #[cfg(feature = "e2e-encryption")]
458            verification_state: SharedObservable::new(VerificationState::Unknown),
459            #[cfg(feature = "e2e-encryption")]
460            enable_share_history_on_invite,
461            server_max_upload_size: Mutex::new(OnceCell::new()),
462            #[cfg(feature = "experimental-search")]
463            search_index: search_index_handler,
464            thread_subscription_catchup,
465            task_monitor: TaskMonitor::new(),
466            #[cfg(feature = "e2e-encryption")]
467            duplicate_key_upload_error_sender: broadcast::channel(1).0,
468        };
469
470        #[allow(clippy::let_and_return)]
471        let client = Arc::new(client);
472
473        #[cfg(feature = "e2e-encryption")]
474        client.e2ee.initialize_tasks(&client);
475
476        let _ = client
477            .event_cache
478            .get_or_init(|| async {
479                EventCache::new(&client, client.base_client.event_cache_store().clone())
480            })
481            .await;
482
483        let _ = client
484            .thread_subscription_catchup
485            .get_or_init(|| async {
486                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
487            })
488            .await;
489
490        client
491    }
492}
493
494#[cfg(not(tarpaulin_include))]
495impl Debug for Client {
496    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
497        write!(fmt, "Client")
498    }
499}
500
501impl Client {
502    /// Create a new [`Client`] that will use the given homeserver.
503    ///
504    /// # Arguments
505    ///
506    /// * `homeserver_url` - The homeserver that the client should connect to.
507    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
508        Self::builder().homeserver_url(homeserver_url).build().await
509    }
510
511    /// Returns a subscriber that publishes an event every time the ignore user
512    /// list changes.
513    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
514        self.inner.base_client.subscribe_to_ignore_user_list_changes()
515    }
516
517    /// Create a new [`ClientBuilder`].
518    pub fn builder() -> ClientBuilder {
519        ClientBuilder::new()
520    }
521
522    pub(crate) fn base_client(&self) -> &BaseClient {
523        &self.inner.base_client
524    }
525
526    /// The underlying HTTP client.
527    pub fn http_client(&self) -> &reqwest::Client {
528        &self.inner.http_client.inner
529    }
530
531    pub(crate) fn locks(&self) -> &ClientLocks {
532        &self.inner.locks
533    }
534
535    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
536        &self.inner.auth_ctx
537    }
538
539    /// The cross-process store locks holder name.
540    ///
541    /// The SDK provides cross-process store locks (see
542    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
543    /// `holder_name` is the value used for all cross-process store locks
544    /// used by this `Client`.
545    pub fn cross_process_store_locks_holder_name(&self) -> &str {
546        &self.inner.cross_process_store_locks_holder_name
547    }
548
549    /// Change the homeserver URL used by this client.
550    ///
551    /// # Arguments
552    ///
553    /// * `homeserver_url` - The new URL to use.
554    fn set_homeserver(&self, homeserver_url: Url) {
555        *self.inner.homeserver.write().unwrap() = homeserver_url;
556    }
557
558    /// Change to a different homeserver and re-resolve well-known.
559    #[cfg(feature = "e2e-encryption")]
560    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
561        &self,
562        homeserver_url: Url,
563    ) -> Result<()> {
564        self.set_homeserver(homeserver_url);
565        self.reset_well_known().await?;
566        if let Some(well_known) = self.load_or_fetch_well_known().await? {
567            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
568        }
569        Ok(())
570    }
571
572    /// Get the capabilities of the homeserver.
573    ///
574    /// This method should be used to check what features are supported by the
575    /// homeserver.
576    ///
577    /// # Examples
578    ///
579    /// ```no_run
580    /// # use matrix_sdk::Client;
581    /// # use url::Url;
582    /// # async {
583    /// # let homeserver = Url::parse("http://example.com")?;
584    /// let client = Client::new(homeserver).await?;
585    ///
586    /// let capabilities = client.get_capabilities().await?;
587    ///
588    /// if capabilities.change_password.enabled {
589    ///     // Change password
590    /// }
591    /// # anyhow::Ok(()) };
592    /// ```
593    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
594        let res = self.send(get_capabilities::v3::Request::new()).await?;
595        Ok(res.capabilities)
596    }
597
598    /// Get the server vendor information from the federation API.
599    ///
600    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
601    /// both the server's software name and version.
602    ///
603    /// # Examples
604    ///
605    /// ```no_run
606    /// # use matrix_sdk::Client;
607    /// # use url::Url;
608    /// # async {
609    /// # let homeserver = Url::parse("http://example.com")?;
610    /// let client = Client::new(homeserver).await?;
611    ///
612    /// let server_info = client.server_vendor_info(None).await?;
613    /// println!(
614    ///     "Server: {}, Version: {}",
615    ///     server_info.server_name, server_info.version
616    /// );
617    /// # anyhow::Ok(()) };
618    /// ```
619    #[cfg(feature = "federation-api")]
620    pub async fn server_vendor_info(
621        &self,
622        request_config: Option<RequestConfig>,
623    ) -> HttpResult<ServerVendorInfo> {
624        use ruma::api::federation::discovery::get_server_version;
625
626        let res = self
627            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
628            .await?;
629
630        // Extract server info, using defaults if fields are missing.
631        let server = res.server.unwrap_or_default();
632        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
633        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
634
635        Ok(ServerVendorInfo { server_name: server_name_str, version })
636    }
637
638    /// Get a copy of the default request config.
639    ///
640    /// The default request config is what's used when sending requests if no
641    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
642    /// function with such a parameter.
643    ///
644    /// If the default request config was not customized through
645    /// [`ClientBuilder`] when creating this `Client`, the returned value will
646    /// be equivalent to [`RequestConfig::default()`].
647    pub fn request_config(&self) -> RequestConfig {
648        self.inner.http_client.request_config
649    }
650
651    /// Check whether the client has been activated.
652    ///
653    /// A client is considered active when:
654    ///
655    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
656    ///    is logged in,
657    /// 2. Has loaded cached data from storage,
658    /// 3. If encryption is enabled, it also initialized or restored its
659    ///    `OlmMachine`.
660    pub fn is_active(&self) -> bool {
661        self.inner.base_client.is_active()
662    }
663
664    /// The server used by the client.
665    ///
666    /// See `Self::server` to learn more.
667    pub fn server(&self) -> Option<&Url> {
668        self.inner.server.as_ref()
669    }
670
671    /// The homeserver of the client.
672    pub fn homeserver(&self) -> Url {
673        self.inner.homeserver.read().unwrap().clone()
674    }
675
676    /// Get the sliding sync version.
677    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
678        self.inner.sliding_sync_version.read().unwrap().clone()
679    }
680
681    /// Override the sliding sync version.
682    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
683        let mut lock = self.inner.sliding_sync_version.write().unwrap();
684        *lock = version;
685    }
686
687    /// Get the Matrix user session meta information.
688    ///
689    /// If the client is currently logged in, this will return a
690    /// [`SessionMeta`] object which contains the user ID and device ID.
691    /// Otherwise it returns `None`.
692    pub fn session_meta(&self) -> Option<&SessionMeta> {
693        self.base_client().session_meta()
694    }
695
696    /// Returns a receiver that gets events for each room info update. To watch
697    /// for new events, use `receiver.resubscribe()`.
698    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
699        self.base_client().room_info_notable_update_receiver()
700    }
701
702    /// Performs a search for users.
703    /// The search is performed case-insensitively on user IDs and display names
704    ///
705    /// # Arguments
706    ///
707    /// * `search_term` - The search term for the search
708    /// * `limit` - The maximum number of results to return. Defaults to 10.
709    ///
710    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
711    pub async fn search_users(
712        &self,
713        search_term: &str,
714        limit: u64,
715    ) -> HttpResult<search_users::v3::Response> {
716        let mut request = search_users::v3::Request::new(search_term.to_owned());
717
718        if let Some(limit) = UInt::new(limit) {
719            request.limit = limit;
720        }
721
722        self.send(request).await
723    }
724
725    /// Get the user id of the current owner of the client.
726    pub fn user_id(&self) -> Option<&UserId> {
727        self.session_meta().map(|s| s.user_id.as_ref())
728    }
729
730    /// Get the device ID that identifies the current session.
731    pub fn device_id(&self) -> Option<&DeviceId> {
732        self.session_meta().map(|s| s.device_id.as_ref())
733    }
734
735    /// Get the current access token for this session.
736    ///
737    /// Will be `None` if the client has not been logged in.
738    pub fn access_token(&self) -> Option<String> {
739        self.auth_ctx().access_token()
740    }
741
742    /// Get the current tokens for this session.
743    ///
744    /// To be notified of changes in the session tokens, use
745    /// [`Client::subscribe_to_session_changes()`] or
746    /// [`Client::set_session_callbacks()`].
747    ///
748    /// Returns `None` if the client has not been logged in.
749    pub fn session_tokens(&self) -> Option<SessionTokens> {
750        self.auth_ctx().session_tokens()
751    }
752
753    /// Access the authentication API used to log in this client.
754    ///
755    /// Will be `None` if the client has not been logged in.
756    pub fn auth_api(&self) -> Option<AuthApi> {
757        match self.auth_ctx().auth_data.get()? {
758            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
759            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
760        }
761    }
762
763    /// Get the whole session info of this client.
764    ///
765    /// Will be `None` if the client has not been logged in.
766    ///
767    /// Can be used with [`Client::restore_session`] to restore a previously
768    /// logged-in session.
769    pub fn session(&self) -> Option<AuthSession> {
770        match self.auth_api()? {
771            AuthApi::Matrix(api) => api.session().map(Into::into),
772            AuthApi::OAuth(api) => api.full_session().map(Into::into),
773        }
774    }
775
776    /// Get a reference to the state store.
777    pub fn state_store(&self) -> &DynStateStore {
778        self.base_client().state_store()
779    }
780
781    /// Get a reference to the event cache store.
782    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
783        self.base_client().event_cache_store()
784    }
785
786    /// Get a reference to the media store.
787    pub fn media_store(&self) -> &MediaStoreLock {
788        self.base_client().media_store()
789    }
790
791    /// Access the native Matrix authentication API with this client.
792    pub fn matrix_auth(&self) -> MatrixAuth {
793        MatrixAuth::new(self.clone())
794    }
795
796    /// Get the account of the current owner of the client.
797    pub fn account(&self) -> Account {
798        Account::new(self.clone())
799    }
800
801    /// Get the encryption manager of the client.
802    #[cfg(feature = "e2e-encryption")]
803    pub fn encryption(&self) -> Encryption {
804        Encryption::new(self.clone())
805    }
806
807    /// Get the media manager of the client.
808    pub fn media(&self) -> Media {
809        Media::new(self.clone())
810    }
811
812    /// Get the pusher manager of the client.
813    pub fn pusher(&self) -> Pusher {
814        Pusher::new(self.clone())
815    }
816
817    /// Access the OAuth 2.0 API of the client.
818    pub fn oauth(&self) -> OAuth {
819        OAuth::new(self.clone())
820    }
821
822    /// Register a handler for a specific event type.
823    ///
824    /// The handler is a function or closure with one or more arguments. The
825    /// first argument is the event itself. All additional arguments are
826    /// "context" arguments: They have to implement [`EventHandlerContext`].
827    /// This trait is named that way because most of the types implementing it
828    /// give additional context about an event: The room it was in, its raw form
829    /// and other similar things. As two exceptions to this,
830    /// [`Client`] and [`EventHandlerHandle`] also implement the
831    /// `EventHandlerContext` trait so you don't have to clone your client
832    /// into the event handler manually and a handler can decide to remove
833    /// itself.
834    ///
835    /// Some context arguments are not universally applicable. A context
836    /// argument that isn't available for the given event type will result in
837    /// the event handler being skipped and an error being logged. The following
838    /// context argument types are only available for a subset of event types:
839    ///
840    /// * [`Room`] is only available for room-specific events, i.e. not for
841    ///   events like global account data events or presence events.
842    ///
843    /// You can provide custom context via
844    /// [`add_event_handler_context`](Client::add_event_handler_context) and
845    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
846    /// into the event handler.
847    ///
848    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
849    ///
850    /// # Examples
851    ///
852    /// ```no_run
853    /// use matrix_sdk::{
854    ///     deserialized_responses::EncryptionInfo,
855    ///     event_handler::Ctx,
856    ///     ruma::{
857    ///         events::{
858    ///             macros::EventContent,
859    ///             push_rules::PushRulesEvent,
860    ///             room::{
861    ///                 message::SyncRoomMessageEvent,
862    ///                 topic::SyncRoomTopicEvent,
863    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
864    ///             },
865    ///         },
866    ///         push::Action,
867    ///         Int, MilliSecondsSinceUnixEpoch,
868    ///     },
869    ///     Client, Room,
870    /// };
871    /// use serde::{Deserialize, Serialize};
872    ///
873    /// # async fn example(client: Client) {
874    /// client.add_event_handler(
875    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
876    ///         // Common usage: Room event plus room and client.
877    ///     },
878    /// );
879    /// client.add_event_handler(
880    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
881    ///         async move {
882    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
883    ///             // unencrypted events and events that were decrypted by the SDK.
884    ///         }
885    ///     },
886    /// );
887    /// client.add_event_handler(
888    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
889    ///         async move {
890    ///             // A `Vec<Action>` parameter allows you to know which push actions
891    ///             // are applicable for an event. For example, an event with
892    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
893    ///             // in the timeline.
894    ///         }
895    ///     },
896    /// );
897    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
898    ///     // You can omit any or all arguments after the first.
899    /// });
900    ///
901    /// // Registering a temporary event handler:
902    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
903    ///     /* Event handler */
904    /// });
905    /// client.remove_event_handler(handle);
906    ///
907    /// // Registering custom event handler context:
908    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
909    /// struct MyContext {
910    ///     number: usize,
911    /// }
912    /// client.add_event_handler_context(MyContext { number: 5 });
913    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
914    ///     // Use the context
915    /// });
916    ///
917    /// // This will handle membership events in joined rooms. Invites are special, see below.
918    /// client.add_event_handler(
919    ///     |ev: SyncRoomMemberEvent| async move {},
920    /// );
921    ///
922    /// // To handle state events in invited rooms (including invite membership events),
923    /// // `StrippedRoomMemberEvent` should be used.
924    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
925    /// client.add_event_handler(
926    ///     |ev: StrippedRoomMemberEvent| async move {},
927    /// );
928    ///
929    /// // Custom events work exactly the same way, you just need to declare
930    /// // the content struct and use the EventContent derive macro on it.
931    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
932    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
933    /// struct TokenEventContent {
934    ///     token: String,
935    ///     #[serde(rename = "exp")]
936    ///     expires_at: MilliSecondsSinceUnixEpoch,
937    /// }
938    ///
939    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
940    ///     todo!("Display the token");
941    /// });
942    ///
943    /// // Event handler closures can also capture local variables.
944    /// // Make sure they are cheap to clone though, because they will be cloned
945    /// // every time the closure is called.
946    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
947    ///
948    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
949    ///     println!("Calling the handler with identifier {data}");
950    /// });
951    /// # }
952    /// ```
953    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
954    where
955        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
956        H: EventHandler<Ev, Ctx>,
957    {
958        self.add_event_handler_impl(handler, None)
959    }
960
961    /// Register a handler for a specific room, and event type.
962    ///
963    /// This method works the same way as
964    /// [`add_event_handler`][Self::add_event_handler], except that the handler
965    /// will only be called for events in the room with the specified ID. See
966    /// that method for more details on event handler functions.
967    ///
968    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
969    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
970    /// your use case.
971    pub fn add_room_event_handler<Ev, Ctx, H>(
972        &self,
973        room_id: &RoomId,
974        handler: H,
975    ) -> EventHandlerHandle
976    where
977        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
978        H: EventHandler<Ev, Ctx>,
979    {
980        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
981    }
982
983    /// Observe a specific event type.
984    ///
985    /// `Ev` represents the kind of event that will be observed. `Ctx`
986    /// represents the context that will come with the event. It relies on the
987    /// same mechanism as [`Client::add_event_handler`]. The main difference is
988    /// that it returns an [`ObservableEventHandler`] and doesn't require a
989    /// user-defined closure. It is possible to subscribe to the
990    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
991    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
992    /// Ctx)`.
993    ///
994    /// Be careful that only the most recent value can be observed. Subscribers
995    /// are notified when a new value is sent, but there is no guarantee
996    /// that they will see all values.
997    ///
998    /// # Example
999    ///
1000    /// Let's see a classical usage:
1001    ///
1002    /// ```
1003    /// use futures_util::StreamExt as _;
1004    /// use matrix_sdk::{
1005    ///     Client, Room,
1006    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
1007    /// };
1008    ///
1009    /// # async fn example(client: Client) -> Option<()> {
1010    /// let observer =
1011    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1012    ///
1013    /// let mut subscriber = observer.subscribe();
1014    ///
1015    /// let (event, (room, push_actions)) = subscriber.next().await?;
1016    /// # Some(())
1017    /// # }
1018    /// ```
1019    ///
1020    /// Now let's see how to get several contexts that can be useful for you:
1021    ///
1022    /// ```
1023    /// use matrix_sdk::{
1024    ///     Client, Room,
1025    ///     deserialized_responses::EncryptionInfo,
1026    ///     ruma::{
1027    ///         events::room::{
1028    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1029    ///         },
1030    ///         push::Action,
1031    ///     },
1032    /// };
1033    ///
1034    /// # async fn example(client: Client) {
1035    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1036    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1037    ///
1038    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1039    /// // to distinguish between unencrypted events and events that were decrypted
1040    /// // by the SDK.
1041    /// let _ = client
1042    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1043    ///     );
1044    ///
1045    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1046    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1047    /// // should be highlighted in the timeline.
1048    /// let _ =
1049    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1050    ///
1051    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1052    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1053    /// # }
1054    /// ```
1055    ///
1056    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1057    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1058    where
1059        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1060        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1061    {
1062        self.observe_room_events_impl(None)
1063    }
1064
1065    /// Observe a specific room, and event type.
1066    ///
1067    /// This method works the same way as [`Client::observe_events`], except
1068    /// that the observability will only be applied for events in the room with
1069    /// the specified ID. See that method for more details.
1070    ///
1071    /// Be careful that only the most recent value can be observed. Subscribers
1072    /// are notified when a new value is sent, but there is no guarantee
1073    /// that they will see all values.
1074    pub fn observe_room_events<Ev, Ctx>(
1075        &self,
1076        room_id: &RoomId,
1077    ) -> ObservableEventHandler<(Ev, Ctx)>
1078    where
1079        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1080        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1081    {
1082        self.observe_room_events_impl(Some(room_id.to_owned()))
1083    }
1084
1085    /// Shared implementation for `Client::observe_events` and
1086    /// `Client::observe_room_events`.
1087    fn observe_room_events_impl<Ev, Ctx>(
1088        &self,
1089        room_id: Option<OwnedRoomId>,
1090    ) -> ObservableEventHandler<(Ev, Ctx)>
1091    where
1092        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1093        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1094    {
1095        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1096        // new value.
1097        let shared_observable = SharedObservable::new(None);
1098
1099        ObservableEventHandler::new(
1100            shared_observable.clone(),
1101            self.event_handler_drop_guard(self.add_event_handler_impl(
1102                move |event: Ev, context: Ctx| {
1103                    shared_observable.set(Some((event, context)));
1104
1105                    ready(())
1106                },
1107                room_id,
1108            )),
1109        )
1110    }
1111
1112    /// Remove the event handler associated with the handle.
1113    ///
1114    /// Note that you **must not** call `remove_event_handler` from the
1115    /// non-async part of an event handler, that is:
1116    ///
1117    /// ```ignore
1118    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1119    ///     // ⚠ this will cause a deadlock ⚠
1120    ///     client.remove_event_handler(handle);
1121    ///
1122    ///     async move {
1123    ///         // removing the event handler here is fine
1124    ///         client.remove_event_handler(handle);
1125    ///     }
1126    /// })
1127    /// ```
1128    ///
1129    /// Note also that handlers that remove themselves will still execute with
1130    /// events received in the same sync cycle.
1131    ///
1132    /// # Arguments
1133    ///
1134    /// `handle` - The [`EventHandlerHandle`] that is returned when
1135    /// registering the event handler with [`Client::add_event_handler`].
1136    ///
1137    /// # Examples
1138    ///
1139    /// ```no_run
1140    /// # use url::Url;
1141    /// # use tokio::sync::mpsc;
1142    /// #
1143    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1144    /// #
1145    /// use matrix_sdk::{
1146    ///     Client, event_handler::EventHandlerHandle,
1147    ///     ruma::events::room::member::SyncRoomMemberEvent,
1148    /// };
1149    /// #
1150    /// # futures_executor::block_on(async {
1151    /// # let client = matrix_sdk::Client::builder()
1152    /// #     .homeserver_url(homeserver)
1153    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1154    /// #     .build()
1155    /// #     .await
1156    /// #     .unwrap();
1157    ///
1158    /// client.add_event_handler(
1159    ///     |ev: SyncRoomMemberEvent,
1160    ///      client: Client,
1161    ///      handle: EventHandlerHandle| async move {
1162    ///         // Common usage: Check arriving Event is the expected one
1163    ///         println!("Expected RoomMemberEvent received!");
1164    ///         client.remove_event_handler(handle);
1165    ///     },
1166    /// );
1167    /// # });
1168    /// ```
1169    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1170        self.inner.event_handlers.remove(handle);
1171    }
1172
1173    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1174    /// the given handle.
1175    ///
1176    /// When the returned value is dropped, the event handler will be removed.
1177    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1178        EventHandlerDropGuard::new(handle, self.clone())
1179    }
1180
1181    /// Add an arbitrary value for use as event handler context.
1182    ///
1183    /// The value can be obtained in an event handler by adding an argument of
1184    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1185    ///
1186    /// If a value of the same type has been added before, it will be
1187    /// overwritten.
1188    ///
1189    /// # Examples
1190    ///
1191    /// ```no_run
1192    /// use matrix_sdk::{
1193    ///     Room, event_handler::Ctx,
1194    ///     ruma::events::room::message::SyncRoomMessageEvent,
1195    /// };
1196    /// # #[derive(Clone)]
1197    /// # struct SomeType;
1198    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1199    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1200    /// # futures_executor::block_on(async {
1201    /// # let client = matrix_sdk::Client::builder()
1202    /// #     .homeserver_url(homeserver)
1203    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1204    /// #     .build()
1205    /// #     .await
1206    /// #     .unwrap();
1207    ///
1208    /// // Handle used to send messages to the UI part of the app
1209    /// let my_gui_handle: SomeType = obtain_gui_handle();
1210    ///
1211    /// client.add_event_handler_context(my_gui_handle.clone());
1212    /// client.add_event_handler(
1213    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1214    ///         async move {
1215    ///             // gui_handle.send(DisplayMessage { message: ev });
1216    ///         }
1217    ///     },
1218    /// );
1219    /// # });
1220    /// ```
1221    pub fn add_event_handler_context<T>(&self, ctx: T)
1222    where
1223        T: Clone + Send + Sync + 'static,
1224    {
1225        self.inner.event_handlers.add_context(ctx);
1226    }
1227
1228    /// Register a handler for a notification.
1229    ///
1230    /// Similar to [`Client::add_event_handler`], but only allows functions
1231    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1232    /// [`Client`] for now.
1233    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1234    where
1235        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1236        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1237    {
1238        self.inner.notification_handlers.write().await.push(Box::new(
1239            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1240        ));
1241
1242        self
1243    }
1244
1245    /// Subscribe to all updates for the room with the given ID.
1246    ///
1247    /// The returned receiver will receive a new message for each sync response
1248    /// that contains updates for that room.
1249    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1250        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1251            btree_map::Entry::Vacant(entry) => {
1252                let (tx, rx) = broadcast::channel(8);
1253                entry.insert(tx);
1254                rx
1255            }
1256            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1257        }
1258    }
1259
1260    /// Subscribe to all updates to all rooms, whenever any has been received in
1261    /// a sync response.
1262    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1263        self.inner.room_updates_sender.subscribe()
1264    }
1265
1266    pub(crate) async fn notification_handlers(
1267        &self,
1268    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1269        self.inner.notification_handlers.read().await
1270    }
1271
1272    /// Get all the rooms the client knows about.
1273    ///
1274    /// This will return the list of joined, invited, and left rooms.
1275    pub fn rooms(&self) -> Vec<Room> {
1276        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1277    }
1278
1279    /// Get all the rooms the client knows about, filtered by room state.
1280    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1281        self.base_client()
1282            .rooms_filtered(filter)
1283            .into_iter()
1284            .map(|room| Room::new(self.clone(), room))
1285            .collect()
1286    }
1287
1288    /// Get a stream of all the rooms, in addition to the existing rooms.
1289    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1290        let (rooms, stream) = self.base_client().rooms_stream();
1291
1292        let map_room = |room| Room::new(self.clone(), room);
1293
1294        (
1295            rooms.into_iter().map(map_room).collect(),
1296            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1297        )
1298    }
1299
1300    /// Returns the joined rooms this client knows about.
1301    pub fn joined_rooms(&self) -> Vec<Room> {
1302        self.rooms_filtered(RoomStateFilter::JOINED)
1303    }
1304
1305    /// Returns the invited rooms this client knows about.
1306    pub fn invited_rooms(&self) -> Vec<Room> {
1307        self.rooms_filtered(RoomStateFilter::INVITED)
1308    }
1309
1310    /// Returns the left rooms this client knows about.
1311    pub fn left_rooms(&self) -> Vec<Room> {
1312        self.rooms_filtered(RoomStateFilter::LEFT)
1313    }
1314
1315    /// Returns the joined space rooms this client knows about.
1316    pub fn joined_space_rooms(&self) -> Vec<Room> {
1317        self.base_client()
1318            .rooms_filtered(RoomStateFilter::JOINED)
1319            .into_iter()
1320            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1321            .collect()
1322    }
1323
1324    /// Get a room with the given room id.
1325    ///
1326    /// # Arguments
1327    ///
1328    /// `room_id` - The unique id of the room that should be fetched.
1329    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1330        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1331    }
1332
1333    /// Gets the preview of a room, whether the current user has joined it or
1334    /// not.
1335    pub async fn get_room_preview(
1336        &self,
1337        room_or_alias_id: &RoomOrAliasId,
1338        via: Vec<OwnedServerName>,
1339    ) -> Result<RoomPreview> {
1340        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1341            Ok(room_id) => room_id.to_owned(),
1342            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1343        };
1344
1345        if let Some(room) = self.get_room(&room_id) {
1346            // The cached data can only be trusted if the room state is joined or
1347            // banned: for invite and knock rooms, no updates will be received
1348            // for the rooms after the invite/knock action took place so we may
1349            // have very out to date data for important fields such as
1350            // `join_rule`. For left rooms, the homeserver should return the latest info.
1351            match room.state() {
1352                RoomState::Joined | RoomState::Banned => {
1353                    return Ok(RoomPreview::from_known_room(&room).await);
1354                }
1355                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1356            }
1357        }
1358
1359        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1360    }
1361
1362    /// Resolve a room alias to a room id and a list of servers which know
1363    /// about it.
1364    ///
1365    /// # Arguments
1366    ///
1367    /// `room_alias` - The room alias to be resolved.
1368    pub async fn resolve_room_alias(
1369        &self,
1370        room_alias: &RoomAliasId,
1371    ) -> HttpResult<get_alias::v3::Response> {
1372        let request = get_alias::v3::Request::new(room_alias.to_owned());
1373        self.send(request).await
1374    }
1375
1376    /// Checks if a room alias is not in use yet.
1377    ///
1378    /// Returns:
1379    /// - `Ok(true)` if the room alias is available.
1380    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1381    ///   status code).
1382    /// - An `Err` otherwise.
1383    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1384        match self.resolve_room_alias(alias).await {
1385            // The room alias was resolved, so it's already in use.
1386            Ok(_) => Ok(false),
1387            Err(error) => {
1388                match error.client_api_error_kind() {
1389                    // The room alias wasn't found, so it's available.
1390                    Some(ErrorKind::NotFound) => Ok(true),
1391                    _ => Err(error),
1392                }
1393            }
1394        }
1395    }
1396
1397    /// Adds a new room alias associated with a room to the room directory.
1398    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1399        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1400        self.send(request).await?;
1401        Ok(())
1402    }
1403
1404    /// Removes a room alias from the room directory.
1405    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1406        let request = delete_alias::v3::Request::new(alias.to_owned());
1407        self.send(request).await?;
1408        Ok(())
1409    }
1410
1411    /// Update the homeserver from the login response well-known if needed.
1412    ///
1413    /// # Arguments
1414    ///
1415    /// * `login_well_known` - The `well_known` field from a successful login
1416    ///   response.
1417    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1418        if self.inner.respect_login_well_known
1419            && let Some(well_known) = login_well_known
1420            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1421        {
1422            self.set_homeserver(homeserver);
1423        }
1424    }
1425
1426    /// Similar to [`Client::restore_session_with`], with
1427    /// [`RoomLoadSettings::default()`].
1428    ///
1429    /// # Panics
1430    ///
1431    /// Panics if a session was already restored or logged in.
1432    #[instrument(skip_all)]
1433    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1434        self.restore_session_with(session, RoomLoadSettings::default()).await
1435    }
1436
1437    /// Restore a session previously logged-in using one of the available
1438    /// authentication APIs. The number of rooms to restore is controlled by
1439    /// [`RoomLoadSettings`].
1440    ///
1441    /// See the documentation of the corresponding authentication API's
1442    /// `restore_session` method for more information.
1443    ///
1444    /// # Panics
1445    ///
1446    /// Panics if a session was already restored or logged in.
1447    #[instrument(skip_all)]
1448    pub async fn restore_session_with(
1449        &self,
1450        session: impl Into<AuthSession>,
1451        room_load_settings: RoomLoadSettings,
1452    ) -> Result<()> {
1453        let session = session.into();
1454        match session {
1455            AuthSession::Matrix(session) => {
1456                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1457            }
1458            AuthSession::OAuth(session) => {
1459                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1460            }
1461        }
1462    }
1463
1464    /// Refresh the access token using the authentication API used to log into
1465    /// this session.
1466    ///
1467    /// See the documentation of the authentication API's `refresh_access_token`
1468    /// method for more information.
1469    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1470        let Some(auth_api) = self.auth_api() else {
1471            return Err(RefreshTokenError::RefreshTokenRequired);
1472        };
1473
1474        match auth_api {
1475            AuthApi::Matrix(api) => {
1476                trace!("Token refresh: Using the homeserver.");
1477                Box::pin(api.refresh_access_token()).await?;
1478            }
1479            AuthApi::OAuth(api) => {
1480                trace!("Token refresh: Using OAuth 2.0.");
1481                Box::pin(api.refresh_access_token()).await?;
1482            }
1483        }
1484
1485        Ok(())
1486    }
1487
1488    /// Log out the current session using the proper authentication API.
1489    ///
1490    /// # Errors
1491    ///
1492    /// Returns an error if the session is not authenticated or if an error
1493    /// occurred while making the request to the server.
1494    pub async fn logout(&self) -> Result<(), Error> {
1495        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1496        match auth_api {
1497            AuthApi::Matrix(matrix_auth) => {
1498                matrix_auth.logout().await?;
1499                Ok(())
1500            }
1501            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1502        }
1503    }
1504
1505    /// Get or upload a sync filter.
1506    ///
1507    /// This method will either get a filter ID from the store or upload the
1508    /// filter definition to the homeserver and return the new filter ID.
1509    ///
1510    /// # Arguments
1511    ///
1512    /// * `filter_name` - The unique name of the filter, this name will be used
1513    /// locally to store and identify the filter ID returned by the server.
1514    ///
1515    /// * `definition` - The filter definition that should be uploaded to the
1516    /// server if no filter ID can be found in the store.
1517    ///
1518    /// # Examples
1519    ///
1520    /// ```no_run
1521    /// # use matrix_sdk::{
1522    /// #    Client, config::SyncSettings,
1523    /// #    ruma::api::client::{
1524    /// #        filter::{
1525    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1526    /// #        },
1527    /// #        sync::sync_events::v3::Filter,
1528    /// #    }
1529    /// # };
1530    /// # use url::Url;
1531    /// # async {
1532    /// # let homeserver = Url::parse("http://example.com").unwrap();
1533    /// # let client = Client::new(homeserver).await.unwrap();
1534    /// let mut filter = FilterDefinition::default();
1535    ///
1536    /// // Let's enable member lazy loading.
1537    /// filter.room.state.lazy_load_options =
1538    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1539    ///
1540    /// let filter_id = client
1541    ///     .get_or_upload_filter("sync", filter)
1542    ///     .await
1543    ///     .unwrap();
1544    ///
1545    /// let sync_settings = SyncSettings::new()
1546    ///     .filter(Filter::FilterId(filter_id));
1547    ///
1548    /// let response = client.sync_once(sync_settings).await.unwrap();
1549    /// # };
1550    #[instrument(skip(self, definition))]
1551    pub async fn get_or_upload_filter(
1552        &self,
1553        filter_name: &str,
1554        definition: FilterDefinition,
1555    ) -> Result<String> {
1556        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1557            debug!("Found filter locally");
1558            Ok(filter)
1559        } else {
1560            debug!("Didn't find filter locally");
1561            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1562            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1563            let response = self.send(request).await?;
1564
1565            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1566
1567            Ok(response.filter_id)
1568        }
1569    }
1570
1571    /// Prepare to join a room by ID, by getting the current details about it
1572    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1573        let room = self.get_room(room_id)?;
1574
1575        let inviter = match room.invite_details().await {
1576            Ok(details) => details.inviter,
1577            Err(Error::WrongRoomState(_)) => None,
1578            Err(e) => {
1579                warn!("Error fetching invite details for room: {e:?}");
1580                None
1581            }
1582        };
1583
1584        Some(PreJoinRoomInfo { inviter })
1585    }
1586
1587    /// Finish joining a room.
1588    ///
1589    /// If the room was an invite that should be marked as a DM, will include it
1590    /// in the DM event after creating the joined room.
1591    ///
1592    /// If encrypted history sharing is enabled, will check to see if we have a
1593    /// key bundle, and import it if so.
1594    ///
1595    /// # Arguments
1596    ///
1597    /// * `room_id` - The `RoomId` of the room that was joined.
1598    /// * `pre_join_room_info` - Information about the room before we joined.
1599    async fn finish_join_room(
1600        &self,
1601        room_id: &RoomId,
1602        pre_join_room_info: Option<PreJoinRoomInfo>,
1603    ) -> Result<Room> {
1604        info!(?room_id, ?pre_join_room_info, "Completing room join");
1605        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1606            room.state() == RoomState::Invited
1607                && room.is_direct().await.unwrap_or_else(|e| {
1608                    warn!(%room_id, "is_direct() failed: {e}");
1609                    false
1610                })
1611        } else {
1612            false
1613        };
1614
1615        let base_room = self
1616            .base_client()
1617            .room_joined(
1618                room_id,
1619                pre_join_room_info
1620                    .as_ref()
1621                    .and_then(|info| info.inviter.as_ref())
1622                    .map(|i| i.user_id().to_owned()),
1623            )
1624            .await?;
1625        let room = Room::new(self.clone(), base_room);
1626
1627        if mark_as_dm {
1628            room.set_is_direct(true).await?;
1629        }
1630
1631        #[cfg(feature = "e2e-encryption")]
1632        if self.inner.enable_share_history_on_invite
1633            && let Some(inviter) =
1634                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1635        {
1636            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1637                .await?;
1638        }
1639
1640        // Suppress "unused variable" and "unused field" lints
1641        #[cfg(not(feature = "e2e-encryption"))]
1642        let _ = pre_join_room_info.map(|i| i.inviter);
1643
1644        Ok(room)
1645    }
1646
1647    /// Join a room by `RoomId`.
1648    ///
1649    /// Returns the `Room` in the joined state.
1650    ///
1651    /// # Arguments
1652    ///
1653    /// * `room_id` - The `RoomId` of the room to be joined.
1654    #[instrument(skip(self))]
1655    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1656        // See who invited us to this room, if anyone. Note we have to do this before
1657        // making the `/join` request, otherwise we could race against the sync.
1658        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1659
1660        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1661        let response = self.send(request).await?;
1662        self.finish_join_room(&response.room_id, pre_join_info).await
1663    }
1664
1665    /// Join a room by `RoomOrAliasId`.
1666    ///
1667    /// Returns the `Room` in the joined state.
1668    ///
1669    /// # Arguments
1670    ///
1671    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1672    ///   alias looks like `#name:example.com`.
1673    /// * `server_names` - The server names to be used for resolving the alias,
1674    ///   if needs be.
1675    #[instrument(skip(self))]
1676    pub async fn join_room_by_id_or_alias(
1677        &self,
1678        alias: &RoomOrAliasId,
1679        server_names: &[OwnedServerName],
1680    ) -> Result<Room> {
1681        let pre_join_info = {
1682            match alias.try_into() {
1683                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1684                Err(_) => {
1685                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1686                    // responding to an invitation to the room, and therefore don't need to handle
1687                    // things that happen as a result of invites.
1688                    None
1689                }
1690            }
1691        };
1692        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1693            via: server_names.to_owned(),
1694        });
1695        let response = self.send(request).await?;
1696        self.finish_join_room(&response.room_id, pre_join_info).await
1697    }
1698
1699    /// Search the homeserver's directory of public rooms.
1700    ///
1701    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1702    /// a `get_public_rooms::Response`.
1703    ///
1704    /// # Arguments
1705    ///
1706    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1707    ///
1708    /// * `since` - Pagination token from a previous request.
1709    ///
1710    /// * `server` - The name of the server, if `None` the requested server is
1711    ///   used.
1712    ///
1713    /// # Examples
1714    /// ```no_run
1715    /// use matrix_sdk::Client;
1716    /// # use url::Url;
1717    /// # let homeserver = Url::parse("http://example.com").unwrap();
1718    /// # let limit = Some(10);
1719    /// # let since = Some("since token");
1720    /// # let server = Some("servername.com".try_into().unwrap());
1721    /// # async {
1722    /// let mut client = Client::new(homeserver).await.unwrap();
1723    ///
1724    /// client.public_rooms(limit, since, server).await;
1725    /// # };
1726    /// ```
1727    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1728    pub async fn public_rooms(
1729        &self,
1730        limit: Option<u32>,
1731        since: Option<&str>,
1732        server: Option<&ServerName>,
1733    ) -> HttpResult<get_public_rooms::v3::Response> {
1734        let limit = limit.map(UInt::from);
1735
1736        let request = assign!(get_public_rooms::v3::Request::new(), {
1737            limit,
1738            since: since.map(ToOwned::to_owned),
1739            server: server.map(ToOwned::to_owned),
1740        });
1741        self.send(request).await
1742    }
1743
1744    /// Create a room with the given parameters.
1745    ///
1746    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1747    /// created room.
1748    ///
1749    /// If you want to create a direct message with one specific user, you can
1750    /// use [`create_dm`][Self::create_dm], which is more convenient than
1751    /// assembling the [`create_room::v3::Request`] yourself.
1752    ///
1753    /// If the `is_direct` field of the request is set to `true` and at least
1754    /// one user is invited, the room will be automatically added to the direct
1755    /// rooms in the account data.
1756    ///
1757    /// # Examples
1758    ///
1759    /// ```no_run
1760    /// use matrix_sdk::{
1761    ///     Client,
1762    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1763    /// };
1764    /// # use url::Url;
1765    /// #
1766    /// # async {
1767    /// # let homeserver = Url::parse("http://example.com").unwrap();
1768    /// let request = CreateRoomRequest::new();
1769    /// let client = Client::new(homeserver).await.unwrap();
1770    /// assert!(client.create_room(request).await.is_ok());
1771    /// # };
1772    /// ```
1773    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1774        let invite = request.invite.clone();
1775        let is_direct_room = request.is_direct;
1776        let response = self.send(request).await?;
1777
1778        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1779
1780        let joined_room = Room::new(self.clone(), base_room);
1781
1782        if is_direct_room
1783            && !invite.is_empty()
1784            && let Err(error) =
1785                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1786        {
1787            // FIXME: Retry in the background
1788            error!("Failed to mark room as DM: {error}");
1789        }
1790
1791        Ok(joined_room)
1792    }
1793
1794    /// Create a DM room.
1795    ///
1796    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1797    /// given user being invited, the room marked `is_direct` and both the
1798    /// creator and invitee getting the default maximum power level.
1799    ///
1800    /// If the `e2e-encryption` feature is enabled, the room will also be
1801    /// encrypted.
1802    ///
1803    /// # Arguments
1804    ///
1805    /// * `user_id` - The ID of the user to create a DM for.
1806    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1807        #[cfg(feature = "e2e-encryption")]
1808        let initial_state = vec![
1809            InitialStateEvent::with_empty_state_key(
1810                RoomEncryptionEventContent::with_recommended_defaults(),
1811            )
1812            .to_raw_any(),
1813        ];
1814
1815        #[cfg(not(feature = "e2e-encryption"))]
1816        let initial_state = vec![];
1817
1818        let request = assign!(create_room::v3::Request::new(), {
1819            invite: vec![user_id.to_owned()],
1820            is_direct: true,
1821            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1822            initial_state,
1823        });
1824
1825        self.create_room(request).await
1826    }
1827
1828    /// Get the existing DM room with the given user, if any.
1829    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1830        let rooms = self.joined_rooms();
1831
1832        // Find the room we share with the `user_id` and only with `user_id`
1833        let room = rooms.into_iter().find(|r| {
1834            let targets = r.direct_targets();
1835            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1836        });
1837
1838        trace!(?user_id, ?room, "Found DM room with user");
1839        room
1840    }
1841
1842    /// Search the homeserver's directory for public rooms with a filter.
1843    ///
1844    /// # Arguments
1845    ///
1846    /// * `room_search` - The easiest way to create this request is using the
1847    ///   `get_public_rooms_filtered::Request` itself.
1848    ///
1849    /// # Examples
1850    ///
1851    /// ```no_run
1852    /// # use url::Url;
1853    /// # use matrix_sdk::Client;
1854    /// # async {
1855    /// # let homeserver = Url::parse("http://example.com")?;
1856    /// use matrix_sdk::ruma::{
1857    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1858    /// };
1859    /// # let mut client = Client::new(homeserver).await?;
1860    ///
1861    /// let mut filter = Filter::new();
1862    /// filter.generic_search_term = Some("rust".to_owned());
1863    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1864    /// request.filter = filter;
1865    ///
1866    /// let response = client.public_rooms_filtered(request).await?;
1867    ///
1868    /// for room in response.chunk {
1869    ///     println!("Found room {room:?}");
1870    /// }
1871    /// # anyhow::Ok(()) };
1872    /// ```
1873    pub async fn public_rooms_filtered(
1874        &self,
1875        request: get_public_rooms_filtered::v3::Request,
1876    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1877        self.send(request).await
1878    }
1879
1880    /// Send an arbitrary request to the server, without updating client state.
1881    ///
1882    /// **Warning:** Because this method *does not* update the client state, it
1883    /// is important to make sure that you account for this yourself, and
1884    /// use wrapper methods where available.  This method should *only* be
1885    /// used if a wrapper method for the endpoint you'd like to use is not
1886    /// available.
1887    ///
1888    /// # Arguments
1889    ///
1890    /// * `request` - A filled out and valid request for the endpoint to be hit
1891    ///
1892    /// * `timeout` - An optional request timeout setting, this overrides the
1893    ///   default request setting if one was set.
1894    ///
1895    /// # Examples
1896    ///
1897    /// ```no_run
1898    /// # use matrix_sdk::{Client, config::SyncSettings};
1899    /// # use url::Url;
1900    /// # async {
1901    /// # let homeserver = Url::parse("http://localhost:8080")?;
1902    /// # let mut client = Client::new(homeserver).await?;
1903    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1904    ///
1905    /// // First construct the request you want to make
1906    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1907    /// // for all available Endpoints
1908    /// let user_id = user_id!("@example:localhost").to_owned();
1909    /// let request = profile::get_profile::v3::Request::new(user_id);
1910    ///
1911    /// // Start the request using Client::send()
1912    /// let response = client.send(request).await?;
1913    ///
1914    /// // Check the corresponding Response struct to find out what types are
1915    /// // returned
1916    /// # anyhow::Ok(()) };
1917    /// ```
1918    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1919    where
1920        Request: OutgoingRequest + Clone + Debug,
1921        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1922        Request::PathBuilder: SupportedPathBuilder,
1923        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1924        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1925    {
1926        SendRequest {
1927            client: self.clone(),
1928            request,
1929            config: None,
1930            send_progress: Default::default(),
1931        }
1932    }
1933
1934    pub(crate) async fn send_inner<Request>(
1935        &self,
1936        request: Request,
1937        config: Option<RequestConfig>,
1938        send_progress: SharedObservable<TransmissionProgress>,
1939    ) -> HttpResult<Request::IncomingResponse>
1940    where
1941        Request: OutgoingRequest + Debug,
1942        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1943        Request::PathBuilder: SupportedPathBuilder,
1944        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1945        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1946    {
1947        let homeserver = self.homeserver().to_string();
1948        let access_token = self.access_token();
1949        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1950
1951        let path_builder_input =
1952            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1953
1954        let result = self
1955            .inner
1956            .http_client
1957            .send(
1958                request,
1959                config,
1960                homeserver,
1961                access_token.as_deref(),
1962                path_builder_input,
1963                send_progress,
1964            )
1965            .await;
1966
1967        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1968            result.as_ref().map_err(HttpError::client_api_error_kind)
1969            && let Some(access_token) = &access_token
1970        {
1971            // Mark the access token as expired.
1972            self.auth_ctx().set_access_token_expired(access_token);
1973        }
1974
1975        result
1976    }
1977
1978    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1979        _ = self
1980            .inner
1981            .auth_ctx
1982            .session_change_sender
1983            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1984    }
1985
1986    /// Fetches server versions from network; no caching.
1987    pub async fn fetch_server_versions(
1988        &self,
1989        request_config: Option<RequestConfig>,
1990    ) -> HttpResult<get_supported_versions::Response> {
1991        // Since this was called by the user, try to refresh the access token if
1992        // necessary.
1993        self.fetch_server_versions_inner(false, request_config).await
1994    }
1995
1996    /// Fetches server versions from network; no caching.
1997    ///
1998    /// If the access token is expired and `failsafe` is `false`, this will
1999    /// attempt to refresh the access token, otherwise this will try to make an
2000    /// unauthenticated request instead.
2001    pub(crate) async fn fetch_server_versions_inner(
2002        &self,
2003        failsafe: bool,
2004        request_config: Option<RequestConfig>,
2005    ) -> HttpResult<get_supported_versions::Response> {
2006        if !failsafe {
2007            // `Client::send()` handles refreshing access tokens.
2008            return self
2009                .send(get_supported_versions::Request::new())
2010                .with_request_config(request_config)
2011                .await;
2012        }
2013
2014        let homeserver = self.homeserver().to_string();
2015
2016        // If we have a fresh access token, try with it first.
2017        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2018            && self.auth_ctx().has_valid_access_token()
2019            && let Some(access_token) = self.access_token()
2020        {
2021            let result = self
2022                .inner
2023                .http_client
2024                .send(
2025                    get_supported_versions::Request::new(),
2026                    request_config,
2027                    homeserver.clone(),
2028                    Some(&access_token),
2029                    (),
2030                    Default::default(),
2031                )
2032                .await;
2033
2034            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2035                result.as_ref().map_err(HttpError::client_api_error_kind)
2036            {
2037                // If the access token is actually expired, mark it as expired and fallback to
2038                // the unauthenticated request below.
2039                self.auth_ctx().set_access_token_expired(&access_token);
2040            } else {
2041                // If the request succeeded or it's an other error, just stop now.
2042                return result;
2043            }
2044        }
2045
2046        // Try without authentication.
2047        self.inner
2048            .http_client
2049            .send(
2050                get_supported_versions::Request::new(),
2051                request_config,
2052                homeserver.clone(),
2053                None,
2054                (),
2055                Default::default(),
2056            )
2057            .await
2058    }
2059
2060    /// Fetches client well_known from network; no caching.
2061    ///
2062    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2063    ///    well-known contents.
2064    /// 2. If it's not, we try extracting the server name from the
2065    ///    [`Client::user_id`] and building the server URL from it.
2066    /// 3. If we couldn't get the well-known contents with either the explicit
2067    ///    server name or the implicit extracted one, we try the homeserver URL
2068    ///    as a last resort.
2069    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2070        let homeserver = self.homeserver();
2071        let scheme = homeserver.scheme();
2072
2073        // Use the server name, either an explicit one or an implicit one taken from
2074        // the user id: sometimes we'll have only the homeserver url available and no
2075        // server name, but the server name can be extracted from the current user id.
2076        let server_url = self
2077            .server()
2078            .map(|server| server.to_string())
2079            // If the server name wasn't available, extract it from the user id and build a URL:
2080            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2081            // will be the same for the public server url, lacking a better candidate.
2082            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2083
2084        // If the server name is available, first try using it
2085        let response = if let Some(server_url) = server_url {
2086            // First try using the server name
2087            self.fetch_client_well_known_with_url(server_url).await
2088        } else {
2089            None
2090        };
2091
2092        // If we didn't get a well-known value yet, try with the homeserver url instead:
2093        if response.is_none() {
2094            // Sometimes people configure their well-known directly on the homeserver so use
2095            // this as a fallback when the server name is unknown.
2096            warn!(
2097                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2098            );
2099            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2100        } else {
2101            response
2102        }
2103    }
2104
2105    async fn fetch_client_well_known_with_url(
2106        &self,
2107        url: String,
2108    ) -> Option<discover_homeserver::Response> {
2109        let well_known = self
2110            .inner
2111            .http_client
2112            .send(
2113                discover_homeserver::Request::new(),
2114                Some(RequestConfig::short_retry()),
2115                url,
2116                None,
2117                (),
2118                Default::default(),
2119            )
2120            .await;
2121
2122        match well_known {
2123            Ok(well_known) => Some(well_known),
2124            Err(http_error) => {
2125                // It is perfectly valid to not have a well-known file.
2126                // Maybe we should check for a specific error code to be sure?
2127                warn!("Failed to fetch client well-known: {http_error}");
2128                None
2129            }
2130        }
2131    }
2132
2133    /// Load supported versions from storage, or fetch them from network and
2134    /// cache them.
2135    ///
2136    /// If `failsafe` is true, this will try to minimize side effects to avoid
2137    /// possible deadlocks.
2138    async fn load_or_fetch_supported_versions(
2139        &self,
2140        failsafe: bool,
2141    ) -> HttpResult<SupportedVersionsResponse> {
2142        match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2143            Ok(Some(stored)) => {
2144                if let Some(supported_versions) =
2145                    stored.into_supported_versions().and_then(|value| value.into_data())
2146                {
2147                    return Ok(supported_versions);
2148                }
2149            }
2150            Ok(None) => {
2151                // fallthrough: cache is empty
2152            }
2153            Err(err) => {
2154                warn!("error when loading cached supported versions: {err}");
2155                // fallthrough to network.
2156            }
2157        }
2158
2159        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2160        let supported_versions = SupportedVersionsResponse {
2161            versions: server_versions.versions,
2162            unstable_features: server_versions.unstable_features,
2163        };
2164
2165        // Only attempt to cache the result in storage if the request was authenticated.
2166        if self.auth_ctx().has_valid_access_token()
2167            && let Err(err) = self
2168                .state_store()
2169                .set_kv_data(
2170                    StateStoreDataKey::SupportedVersions,
2171                    StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2172                        supported_versions.clone(),
2173                    )),
2174                )
2175                .await
2176        {
2177            warn!("error when caching supported versions: {err}");
2178        }
2179
2180        Ok(supported_versions)
2181    }
2182
2183    pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2184        let supported_versions = &self.inner.caches.supported_versions;
2185
2186        if let CachedValue::Cached(val) = &*supported_versions.read().await {
2187            Some(val.clone())
2188        } else {
2189            None
2190        }
2191    }
2192
2193    /// Get the Matrix versions and features supported by the homeserver by
2194    /// fetching them from the server or the cache.
2195    ///
2196    /// This is equivalent to calling both [`Client::server_versions()`] and
2197    /// [`Client::unstable_features()`]. To always fetch the result from the
2198    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2199    /// and then `.as_supported_versions()` on the response.
2200    ///
2201    /// # Examples
2202    ///
2203    /// ```no_run
2204    /// use ruma::api::{FeatureFlag, MatrixVersion};
2205    /// # use matrix_sdk::{Client, config::SyncSettings};
2206    /// # use url::Url;
2207    /// # async {
2208    /// # let homeserver = Url::parse("http://localhost:8080")?;
2209    /// # let mut client = Client::new(homeserver).await?;
2210    ///
2211    /// let supported = client.supported_versions().await?;
2212    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2213    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2214    ///
2215    /// let msc_x_feature = FeatureFlag::from("msc_x");
2216    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2217    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2218    /// # anyhow::Ok(()) };
2219    /// ```
2220    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2221        self.supported_versions_inner(false).await
2222    }
2223
2224    /// Get the Matrix versions and features supported by the homeserver by
2225    /// fetching them from the server or the cache.
2226    ///
2227    /// If `failsafe` is true, this will try to minimize side effects to avoid
2228    /// possible deadlocks.
2229    pub(crate) async fn supported_versions_inner(
2230        &self,
2231        failsafe: bool,
2232    ) -> HttpResult<SupportedVersions> {
2233        let cached_supported_versions = &self.inner.caches.supported_versions;
2234        if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2235            return Ok(val.clone());
2236        }
2237
2238        let mut guarded_supported_versions = cached_supported_versions.write().await;
2239        if let CachedValue::Cached(val) = &*guarded_supported_versions {
2240            return Ok(val.clone());
2241        }
2242
2243        let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2244
2245        // Fill both unstable features and server versions at once.
2246        let mut supported_versions = supported.supported_versions();
2247        if supported_versions.versions.is_empty() {
2248            supported_versions.versions = [MatrixVersion::V1_0].into();
2249        }
2250
2251        // Only cache the result if the request was authenticated.
2252        if self.auth_ctx().has_valid_access_token() {
2253            *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2254        }
2255
2256        Ok(supported_versions)
2257    }
2258
2259    /// Get the Matrix versions and features supported by the homeserver by
2260    /// fetching them from the cache.
2261    ///
2262    /// For a version of this function that fetches the supported versions and
2263    /// features from the homeserver if the [`SupportedVersions`] aren't
2264    /// found in the cache, take a look at the [`Client::server_versions()`]
2265    /// method.
2266    ///
2267    /// # Examples
2268    ///
2269    /// ```no_run
2270    /// use ruma::api::{FeatureFlag, MatrixVersion};
2271    /// # use matrix_sdk::{Client, config::SyncSettings};
2272    /// # use url::Url;
2273    /// # async {
2274    /// # let homeserver = Url::parse("http://localhost:8080")?;
2275    /// # let mut client = Client::new(homeserver).await?;
2276    ///
2277    /// let supported =
2278    ///     if let Some(supported) = client.supported_versions_cached().await? {
2279    ///         supported
2280    ///     } else {
2281    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2282    ///     };
2283    ///
2284    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2285    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2286    ///
2287    /// let msc_x_feature = FeatureFlag::from("msc_x");
2288    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2289    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2290    /// # anyhow::Ok(()) };
2291    /// ```
2292    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2293        if let Some(cached) = self.get_cached_supported_versions().await {
2294            Ok(Some(cached))
2295        } else if let Some(stored) =
2296            self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2297        {
2298            if let Some(supported) =
2299                stored.into_supported_versions().and_then(|value| value.into_data())
2300            {
2301                Ok(Some(supported.supported_versions()))
2302            } else {
2303                Ok(None)
2304            }
2305        } else {
2306            Ok(None)
2307        }
2308    }
2309
2310    /// Get the Matrix versions supported by the homeserver by fetching them
2311    /// from the server or the cache.
2312    ///
2313    /// # Examples
2314    ///
2315    /// ```no_run
2316    /// use ruma::api::MatrixVersion;
2317    /// # use matrix_sdk::{Client, config::SyncSettings};
2318    /// # use url::Url;
2319    /// # async {
2320    /// # let homeserver = Url::parse("http://localhost:8080")?;
2321    /// # let mut client = Client::new(homeserver).await?;
2322    ///
2323    /// let server_versions = client.server_versions().await?;
2324    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2325    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2326    /// # anyhow::Ok(()) };
2327    /// ```
2328    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2329        Ok(self.supported_versions().await?.versions)
2330    }
2331
2332    /// Get the unstable features supported by the homeserver by fetching them
2333    /// from the server or the cache.
2334    ///
2335    /// # Examples
2336    ///
2337    /// ```no_run
2338    /// use matrix_sdk::ruma::api::FeatureFlag;
2339    /// # use matrix_sdk::{Client, config::SyncSettings};
2340    /// # use url::Url;
2341    /// # async {
2342    /// # let homeserver = Url::parse("http://localhost:8080")?;
2343    /// # let mut client = Client::new(homeserver).await?;
2344    ///
2345    /// let msc_x_feature = FeatureFlag::from("msc_x");
2346    /// let unstable_features = client.unstable_features().await?;
2347    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2348    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2349    /// # anyhow::Ok(()) };
2350    /// ```
2351    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2352        Ok(self.supported_versions().await?.features)
2353    }
2354
2355    /// Empty the supported versions and unstable features cache.
2356    ///
2357    /// Since the SDK caches the supported versions, it's possible to have a
2358    /// stale entry in the cache. This functions makes it possible to force
2359    /// reset it.
2360    pub async fn reset_supported_versions(&self) -> Result<()> {
2361        // Empty the in-memory caches.
2362        self.inner.caches.supported_versions.write().await.take();
2363
2364        // Empty the store cache.
2365        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2366    }
2367
2368    /// Load well-known from storage, or fetch it from network and cache it.
2369    async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2370        match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2371            Ok(Some(stored)) => {
2372                if let Some(well_known) =
2373                    stored.into_well_known().and_then(|value| value.into_data())
2374                {
2375                    return Ok(well_known);
2376                }
2377            }
2378            Ok(None) => {
2379                // fallthrough: cache is empty
2380            }
2381            Err(err) => {
2382                warn!("error when loading cached well-known: {err}");
2383                // fallthrough to network.
2384            }
2385        }
2386
2387        let well_known = self.fetch_client_well_known().await.map(Into::into);
2388
2389        // Attempt to cache the result in storage.
2390        if let Err(err) = self
2391            .state_store()
2392            .set_kv_data(
2393                StateStoreDataKey::WellKnown,
2394                StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2395            )
2396            .await
2397        {
2398            warn!("error when caching well-known: {err}");
2399        }
2400
2401        Ok(well_known)
2402    }
2403
2404    async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2405        let well_known = &self.inner.caches.well_known;
2406        if let CachedValue::Cached(val) = &*well_known.read().await {
2407            return Ok(val.clone());
2408        }
2409
2410        let mut guarded_well_known = well_known.write().await;
2411        if let CachedValue::Cached(val) = &*guarded_well_known {
2412            return Ok(val.clone());
2413        }
2414
2415        let well_known = self.load_or_fetch_well_known().await?;
2416
2417        *guarded_well_known = CachedValue::Cached(well_known.clone());
2418
2419        Ok(well_known)
2420    }
2421
2422    /// Get information about the homeserver's advertised RTC foci by fetching
2423    /// the well-known file from the server or the cache.
2424    ///
2425    /// # Examples
2426    /// ```no_run
2427    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2428    /// # use url::Url;
2429    /// # async {
2430    /// # let homeserver = Url::parse("http://localhost:8080")?;
2431    /// # let mut client = Client::new(homeserver).await?;
2432    /// let rtc_foci = client.rtc_foci().await?;
2433    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2434    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2435    ///     _ => None,
2436    /// });
2437    /// if let Some(info) = default_livekit_focus_info {
2438    ///     println!("Default LiveKit service URL: {}", info.service_url);
2439    /// }
2440    /// # anyhow::Ok(()) };
2441    /// ```
2442    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2443        let well_known = self.get_or_load_and_cache_well_known().await?;
2444
2445        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2446    }
2447
2448    /// Empty the well-known cache.
2449    ///
2450    /// Since the SDK caches the well-known, it's possible to have a stale entry
2451    /// in the cache. This functions makes it possible to force reset it.
2452    pub async fn reset_well_known(&self) -> Result<()> {
2453        // Empty the in-memory caches.
2454        self.inner.caches.well_known.write().await.take();
2455
2456        // Empty the store cache.
2457        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2458    }
2459
2460    /// Check whether MSC 4028 is enabled on the homeserver.
2461    ///
2462    /// # Examples
2463    ///
2464    /// ```no_run
2465    /// # use matrix_sdk::{Client, config::SyncSettings};
2466    /// # use url::Url;
2467    /// # async {
2468    /// # let homeserver = Url::parse("http://localhost:8080")?;
2469    /// # let mut client = Client::new(homeserver).await?;
2470    /// let msc4028_enabled =
2471    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2472    /// # anyhow::Ok(()) };
2473    /// ```
2474    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2475        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2476    }
2477
2478    /// Get information of all our own devices.
2479    ///
2480    /// # Examples
2481    ///
2482    /// ```no_run
2483    /// # use matrix_sdk::{Client, config::SyncSettings};
2484    /// # use url::Url;
2485    /// # async {
2486    /// # let homeserver = Url::parse("http://localhost:8080")?;
2487    /// # let mut client = Client::new(homeserver).await?;
2488    /// let response = client.devices().await?;
2489    ///
2490    /// for device in response.devices {
2491    ///     println!(
2492    ///         "Device: {} {}",
2493    ///         device.device_id,
2494    ///         device.display_name.as_deref().unwrap_or("")
2495    ///     );
2496    /// }
2497    /// # anyhow::Ok(()) };
2498    /// ```
2499    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2500        let request = get_devices::v3::Request::new();
2501
2502        self.send(request).await
2503    }
2504
2505    /// Delete the given devices from the server.
2506    ///
2507    /// # Arguments
2508    ///
2509    /// * `devices` - The list of devices that should be deleted from the
2510    ///   server.
2511    ///
2512    /// * `auth_data` - This request requires user interactive auth, the first
2513    ///   request needs to set this to `None` and will always fail with an
2514    ///   `UiaaResponse`. The response will contain information for the
2515    ///   interactive auth and the same request needs to be made but this time
2516    ///   with some `auth_data` provided.
2517    ///
2518    /// ```no_run
2519    /// # use matrix_sdk::{
2520    /// #    ruma::{api::client::uiaa, device_id},
2521    /// #    Client, Error, config::SyncSettings,
2522    /// # };
2523    /// # use serde_json::json;
2524    /// # use url::Url;
2525    /// # use std::collections::BTreeMap;
2526    /// # async {
2527    /// # let homeserver = Url::parse("http://localhost:8080")?;
2528    /// # let mut client = Client::new(homeserver).await?;
2529    /// let devices = &[device_id!("DEVICEID").to_owned()];
2530    ///
2531    /// if let Err(e) = client.delete_devices(devices, None).await {
2532    ///     if let Some(info) = e.as_uiaa_response() {
2533    ///         let mut password = uiaa::Password::new(
2534    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2535    ///             "wordpass".to_owned(),
2536    ///         );
2537    ///         password.session = info.session.clone();
2538    ///
2539    ///         client
2540    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2541    ///             .await?;
2542    ///     }
2543    /// }
2544    /// # anyhow::Ok(()) };
2545    pub async fn delete_devices(
2546        &self,
2547        devices: &[OwnedDeviceId],
2548        auth_data: Option<uiaa::AuthData>,
2549    ) -> HttpResult<delete_devices::v3::Response> {
2550        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2551        request.auth = auth_data;
2552
2553        self.send(request).await
2554    }
2555
2556    /// Change the display name of a device owned by the current user.
2557    ///
2558    /// Returns a `update_device::Response` which specifies the result
2559    /// of the operation.
2560    ///
2561    /// # Arguments
2562    ///
2563    /// * `device_id` - The ID of the device to change the display name of.
2564    /// * `display_name` - The new display name to set.
2565    pub async fn rename_device(
2566        &self,
2567        device_id: &DeviceId,
2568        display_name: &str,
2569    ) -> HttpResult<update_device::v3::Response> {
2570        let mut request = update_device::v3::Request::new(device_id.to_owned());
2571        request.display_name = Some(display_name.to_owned());
2572
2573        self.send(request).await
2574    }
2575
2576    /// Check whether a device with a specific ID exists on the server.
2577    ///
2578    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2579    /// with 404 and the underlying error otherwise.
2580    ///
2581    /// # Arguments
2582    ///
2583    /// * `device_id` - The ID of the device to query.
2584    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2585        let request = device::get_device::v3::Request::new(device_id);
2586        match self.send(request).await {
2587            Ok(_) => Ok(true),
2588            Err(err) => {
2589                if let Some(error) = err.as_client_api_error()
2590                    && error.status_code == 404
2591                {
2592                    Ok(false)
2593                } else {
2594                    Err(err.into())
2595                }
2596            }
2597        }
2598    }
2599
2600    /// Synchronize the client's state with the latest state on the server.
2601    ///
2602    /// ## Syncing Events
2603    ///
2604    /// Messages or any other type of event need to be periodically fetched from
2605    /// the server, this is achieved by sending a `/sync` request to the server.
2606    ///
2607    /// The first sync is sent out without a [`token`]. The response of the
2608    /// first sync will contain a [`next_batch`] field which should then be
2609    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2610    /// don't receive the same events multiple times.
2611    ///
2612    /// ## Long Polling
2613    ///
2614    /// A sync should in the usual case always be in flight. The
2615    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2616    /// long the server will wait for new events before it will respond.
2617    /// The server will respond immediately if some new events arrive before the
2618    /// timeout has expired. If no changes arrive and the timeout expires an
2619    /// empty sync response will be sent to the client.
2620    ///
2621    /// This method of sending a request that may not receive a response
2622    /// immediately is called long polling.
2623    ///
2624    /// ## Filtering Events
2625    ///
2626    /// The number or type of messages and events that the client should receive
2627    /// from the server can be altered using a [`Filter`].
2628    ///
2629    /// Filters can be non-trivial and, since they will be sent with every sync
2630    /// request, they may take up a bunch of unnecessary bandwidth.
2631    ///
2632    /// Luckily filters can be uploaded to the server and reused using an unique
2633    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2634    /// method.
2635    ///
2636    /// # Arguments
2637    ///
2638    /// * `sync_settings` - Settings for the sync call, this allows us to set
2639    /// various options to configure the sync:
2640    ///     * [`filter`] - To configure which events we receive and which get
2641    ///       [filtered] by the server
2642    ///     * [`timeout`] - To configure our [long polling] setup.
2643    ///     * [`token`] - To tell the server which events we already received
2644    ///       and where we wish to continue syncing.
2645    ///     * [`full_state`] - To tell the server that we wish to receive all
2646    ///       state events, regardless of our configured [`token`].
2647    ///     * [`set_presence`] - To tell the server to set the presence and to
2648    ///       which state.
2649    ///
2650    /// # Examples
2651    ///
2652    /// ```no_run
2653    /// # use url::Url;
2654    /// # async {
2655    /// # let homeserver = Url::parse("http://localhost:8080")?;
2656    /// # let username = "";
2657    /// # let password = "";
2658    /// use matrix_sdk::{
2659    ///     Client, config::SyncSettings,
2660    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2661    /// };
2662    ///
2663    /// let client = Client::new(homeserver).await?;
2664    /// client.matrix_auth().login_username(username, password).send().await?;
2665    ///
2666    /// // Sync once so we receive the client state and old messages.
2667    /// client.sync_once(SyncSettings::default()).await?;
2668    ///
2669    /// // Register our handler so we start responding once we receive a new
2670    /// // event.
2671    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2672    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2673    /// });
2674    ///
2675    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2676    /// // from our `sync_once()` call automatically.
2677    /// client.sync(SyncSettings::default()).await;
2678    /// # anyhow::Ok(()) };
2679    /// ```
2680    ///
2681    /// [`sync`]: #method.sync
2682    /// [`SyncSettings`]: crate::config::SyncSettings
2683    /// [`token`]: crate::config::SyncSettings#method.token
2684    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2685    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2686    /// [`set_presence`]: ruma::presence::PresenceState
2687    /// [`filter`]: crate::config::SyncSettings#method.filter
2688    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2689    /// [`next_batch`]: SyncResponse#structfield.next_batch
2690    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2691    /// [long polling]: #long-polling
2692    /// [filtered]: #filtering-events
2693    #[instrument(skip(self))]
2694    pub async fn sync_once(
2695        &self,
2696        sync_settings: crate::config::SyncSettings,
2697    ) -> Result<SyncResponse> {
2698        // The sync might not return for quite a while due to the timeout.
2699        // We'll see if there's anything crypto related to send out before we
2700        // sync, i.e. if we closed our client after a sync but before the
2701        // crypto requests were sent out.
2702        //
2703        // This will mostly be a no-op.
2704        #[cfg(feature = "e2e-encryption")]
2705        if let Err(e) = self.send_outgoing_requests().await {
2706            error!(error = ?e, "Error while sending outgoing E2EE requests");
2707        }
2708
2709        let token = match sync_settings.token {
2710            SyncToken::Specific(token) => Some(token),
2711            SyncToken::NoToken => None,
2712            SyncToken::ReusePrevious => self.sync_token().await,
2713        };
2714
2715        let request = assign!(sync_events::v3::Request::new(), {
2716            filter: sync_settings.filter.map(|f| *f),
2717            since: token,
2718            full_state: sync_settings.full_state,
2719            set_presence: sync_settings.set_presence,
2720            timeout: sync_settings.timeout,
2721            use_state_after: true,
2722        });
2723        let mut request_config = self.request_config();
2724        if let Some(timeout) = sync_settings.timeout {
2725            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2726            request_config.timeout = Some(base_timeout + timeout);
2727        }
2728
2729        let response = self.send(request).with_request_config(request_config).await?;
2730        let next_batch = response.next_batch.clone();
2731        let response = self.process_sync(response).await?;
2732
2733        #[cfg(feature = "e2e-encryption")]
2734        if let Err(e) = self.send_outgoing_requests().await {
2735            error!(error = ?e, "Error while sending outgoing E2EE requests");
2736        }
2737
2738        self.inner.sync_beat.notify(usize::MAX);
2739
2740        Ok(SyncResponse::new(next_batch, response))
2741    }
2742
2743    /// Repeatedly synchronize the client state with the server.
2744    ///
2745    /// This method will only return on error, if cancellation is needed
2746    /// the method should be wrapped in a cancelable task or the
2747    /// [`Client::sync_with_callback`] method can be used or
2748    /// [`Client::sync_with_result_callback`] if you want to handle error
2749    /// cases in the loop, too.
2750    ///
2751    /// This method will internally call [`Client::sync_once`] in a loop.
2752    ///
2753    /// This method can be used with the [`Client::add_event_handler`]
2754    /// method to react to individual events. If you instead wish to handle
2755    /// events in a bulk manner the [`Client::sync_with_callback`],
2756    /// [`Client::sync_with_result_callback`] and
2757    /// [`Client::sync_stream`] methods can be used instead. Those methods
2758    /// repeatedly return the whole sync response.
2759    ///
2760    /// # Arguments
2761    ///
2762    /// * `sync_settings` - Settings for the sync call. *Note* that those
2763    ///   settings will be only used for the first sync call. See the argument
2764    ///   docs for [`Client::sync_once`] for more info.
2765    ///
2766    /// # Return
2767    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2768    /// up to the user of the API to check the error and decide whether the sync
2769    /// should continue or not.
2770    ///
2771    /// # Examples
2772    ///
2773    /// ```no_run
2774    /// # use url::Url;
2775    /// # async {
2776    /// # let homeserver = Url::parse("http://localhost:8080")?;
2777    /// # let username = "";
2778    /// # let password = "";
2779    /// use matrix_sdk::{
2780    ///     Client, config::SyncSettings,
2781    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2782    /// };
2783    ///
2784    /// let client = Client::new(homeserver).await?;
2785    /// client.matrix_auth().login_username(&username, &password).send().await?;
2786    ///
2787    /// // Register our handler so we start responding once we receive a new
2788    /// // event.
2789    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2790    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2791    /// });
2792    ///
2793    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2794    /// // automatically.
2795    /// client.sync(SyncSettings::default()).await?;
2796    /// # anyhow::Ok(()) };
2797    /// ```
2798    ///
2799    /// [argument docs]: #method.sync_once
2800    /// [`sync_with_callback`]: #method.sync_with_callback
2801    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2802        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2803    }
2804
2805    /// Repeatedly call sync to synchronize the client state with the server.
2806    ///
2807    /// # Arguments
2808    ///
2809    /// * `sync_settings` - Settings for the sync call. *Note* that those
2810    ///   settings will be only used for the first sync call. See the argument
2811    ///   docs for [`Client::sync_once`] for more info.
2812    ///
2813    /// * `callback` - A callback that will be called every time a successful
2814    ///   response has been fetched from the server. The callback must return a
2815    ///   boolean which signalizes if the method should stop syncing. If the
2816    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2817    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2818    ///
2819    /// # Return
2820    /// The sync runs until an error occurs or the
2821    /// callback indicates that the Loop should stop. If the callback asked for
2822    /// a regular stop, the result will be `Ok(())` otherwise the
2823    /// `Err(Error)` is returned.
2824    ///
2825    /// # Examples
2826    ///
2827    /// The following example demonstrates how to sync forever while sending all
2828    /// the interesting events through a mpsc channel to another thread e.g. a
2829    /// UI thread.
2830    ///
2831    /// ```no_run
2832    /// # use std::time::Duration;
2833    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2834    /// # use url::Url;
2835    /// # async {
2836    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2837    /// # let mut client = Client::new(homeserver).await.unwrap();
2838    ///
2839    /// use tokio::sync::mpsc::channel;
2840    ///
2841    /// let (tx, rx) = channel(100);
2842    ///
2843    /// let sync_channel = &tx;
2844    /// let sync_settings = SyncSettings::new()
2845    ///     .timeout(Duration::from_secs(30));
2846    ///
2847    /// client
2848    ///     .sync_with_callback(sync_settings, |response| async move {
2849    ///         let channel = sync_channel;
2850    ///         for (room_id, room) in response.rooms.joined {
2851    ///             for event in room.timeline.events {
2852    ///                 channel.send(event).await.unwrap();
2853    ///             }
2854    ///         }
2855    ///
2856    ///         LoopCtrl::Continue
2857    ///     })
2858    ///     .await;
2859    /// };
2860    /// ```
2861    #[instrument(skip_all)]
2862    pub async fn sync_with_callback<C>(
2863        &self,
2864        sync_settings: crate::config::SyncSettings,
2865        callback: impl Fn(SyncResponse) -> C,
2866    ) -> Result<(), Error>
2867    where
2868        C: Future<Output = LoopCtrl>,
2869    {
2870        self.sync_with_result_callback(sync_settings, |result| async {
2871            Ok(callback(result?).await)
2872        })
2873        .await
2874    }
2875
2876    /// Repeatedly call sync to synchronize the client state with the server.
2877    ///
2878    /// # Arguments
2879    ///
2880    /// * `sync_settings` - Settings for the sync call. *Note* that those
2881    ///   settings will be only used for the first sync call. See the argument
2882    ///   docs for [`Client::sync_once`] for more info.
2883    ///
2884    /// * `callback` - A callback that will be called every time after a
2885    ///   response has been received, failure or not. The callback returns a
2886    ///   `Result<LoopCtrl, Error>`, too. When returning
2887    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2888    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2889    ///   function returns `Ok(())`. In case the callback can't handle the
2890    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2891    ///   which results in the sync ending and the `Err(Error)` being returned.
2892    ///
2893    /// # Return
2894    /// The sync runs until an error occurs that the callback can't handle or
2895    /// the callback indicates that the Loop should stop. If the callback
2896    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2897    /// `Err(Error)` is returned.
2898    ///
2899    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2900    /// this, and are handled first without sending the result to the
2901    /// callback. Only after they have exceeded is the `Result` handed to
2902    /// the callback.
2903    ///
2904    /// # Examples
2905    ///
2906    /// The following example demonstrates how to sync forever while sending all
2907    /// the interesting events through a mpsc channel to another thread e.g. a
2908    /// UI thread.
2909    ///
2910    /// ```no_run
2911    /// # use std::time::Duration;
2912    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2913    /// # use url::Url;
2914    /// # async {
2915    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2916    /// # let mut client = Client::new(homeserver).await.unwrap();
2917    /// #
2918    /// use tokio::sync::mpsc::channel;
2919    ///
2920    /// let (tx, rx) = channel(100);
2921    ///
2922    /// let sync_channel = &tx;
2923    /// let sync_settings = SyncSettings::new()
2924    ///     .timeout(Duration::from_secs(30));
2925    ///
2926    /// client
2927    ///     .sync_with_result_callback(sync_settings, |response| async move {
2928    ///         let channel = sync_channel;
2929    ///         let sync_response = response?;
2930    ///         for (room_id, room) in sync_response.rooms.joined {
2931    ///              for event in room.timeline.events {
2932    ///                  channel.send(event).await.unwrap();
2933    ///               }
2934    ///         }
2935    ///
2936    ///         Ok(LoopCtrl::Continue)
2937    ///     })
2938    ///     .await;
2939    /// };
2940    /// ```
2941    #[instrument(skip(self, callback))]
2942    pub async fn sync_with_result_callback<C>(
2943        &self,
2944        sync_settings: crate::config::SyncSettings,
2945        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2946    ) -> Result<(), Error>
2947    where
2948        C: Future<Output = Result<LoopCtrl, Error>>,
2949    {
2950        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2951
2952        while let Some(result) = sync_stream.next().await {
2953            trace!("Running callback");
2954            if callback(result).await? == LoopCtrl::Break {
2955                trace!("Callback told us to stop");
2956                break;
2957            }
2958            trace!("Done running callback");
2959        }
2960
2961        Ok(())
2962    }
2963
2964    //// Repeatedly synchronize the client state with the server.
2965    ///
2966    /// This method will internally call [`Client::sync_once`] in a loop and is
2967    /// equivalent to the [`Client::sync`] method but the responses are provided
2968    /// as an async stream.
2969    ///
2970    /// # Arguments
2971    ///
2972    /// * `sync_settings` - Settings for the sync call. *Note* that those
2973    ///   settings will be only used for the first sync call. See the argument
2974    ///   docs for [`Client::sync_once`] for more info.
2975    ///
2976    /// # Examples
2977    ///
2978    /// ```no_run
2979    /// # use url::Url;
2980    /// # async {
2981    /// # let homeserver = Url::parse("http://localhost:8080")?;
2982    /// # let username = "";
2983    /// # let password = "";
2984    /// use futures_util::StreamExt;
2985    /// use matrix_sdk::{Client, config::SyncSettings};
2986    ///
2987    /// let client = Client::new(homeserver).await?;
2988    /// client.matrix_auth().login_username(&username, &password).send().await?;
2989    ///
2990    /// let mut sync_stream =
2991    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2992    ///
2993    /// while let Some(Ok(response)) = sync_stream.next().await {
2994    ///     for room in response.rooms.joined.values() {
2995    ///         for e in &room.timeline.events {
2996    ///             if let Ok(event) = e.raw().deserialize() {
2997    ///                 println!("Received event {:?}", event);
2998    ///             }
2999    ///         }
3000    ///     }
3001    /// }
3002    ///
3003    /// # anyhow::Ok(()) };
3004    /// ```
3005    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
3006    #[instrument(skip(self))]
3007    pub async fn sync_stream(
3008        &self,
3009        mut sync_settings: crate::config::SyncSettings,
3010    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
3011        let mut is_first_sync = true;
3012        let mut timeout = None;
3013        let mut last_sync_time: Option<Instant> = None;
3014
3015        let parent_span = Span::current();
3016
3017        async_stream::stream!({
3018            loop {
3019                trace!("Syncing");
3020
3021                if sync_settings.ignore_timeout_on_first_sync {
3022                    if is_first_sync {
3023                        timeout = sync_settings.timeout.take();
3024                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3025                        sync_settings.timeout = timeout.take();
3026                    }
3027
3028                    is_first_sync = false;
3029                }
3030
3031                yield self
3032                    .sync_loop_helper(&mut sync_settings)
3033                    .instrument(parent_span.clone())
3034                    .await;
3035
3036                Client::delay_sync(&mut last_sync_time).await
3037            }
3038        })
3039    }
3040
3041    /// Get the current, if any, sync token of the client.
3042    /// This will be None if the client didn't sync at least once.
3043    pub(crate) async fn sync_token(&self) -> Option<String> {
3044        self.inner.base_client.sync_token().await
3045    }
3046
3047    /// Gets information about the owner of a given access token.
3048    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3049        let request = whoami::v3::Request::new();
3050        self.send(request).await
3051    }
3052
3053    /// Subscribes a new receiver to client SessionChange broadcasts.
3054    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3055        let broadcast = &self.auth_ctx().session_change_sender;
3056        broadcast.subscribe()
3057    }
3058
3059    /// Sets the save/restore session callbacks.
3060    ///
3061    /// This is another mechanism to get synchronous updates to session tokens,
3062    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3063    pub fn set_session_callbacks(
3064        &self,
3065        reload_session_callback: Box<ReloadSessionCallback>,
3066        save_session_callback: Box<SaveSessionCallback>,
3067    ) -> Result<()> {
3068        self.inner
3069            .auth_ctx
3070            .reload_session_callback
3071            .set(reload_session_callback)
3072            .map_err(|_| Error::MultipleSessionCallbacks)?;
3073
3074        self.inner
3075            .auth_ctx
3076            .save_session_callback
3077            .set(save_session_callback)
3078            .map_err(|_| Error::MultipleSessionCallbacks)?;
3079
3080        Ok(())
3081    }
3082
3083    /// Get the notification settings of the current owner of the client.
3084    pub async fn notification_settings(&self) -> NotificationSettings {
3085        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3086        NotificationSettings::new(self.clone(), ruleset)
3087    }
3088
3089    /// Create a new specialized `Client` that can process notifications.
3090    ///
3091    /// See [`CrossProcessLock::new`] to learn more about
3092    /// `cross_process_store_locks_holder_name`.
3093    ///
3094    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3095    pub async fn notification_client(
3096        &self,
3097        cross_process_store_locks_holder_name: String,
3098    ) -> Result<Client> {
3099        let client = Client {
3100            inner: ClientInner::new(
3101                self.inner.auth_ctx.clone(),
3102                self.server().cloned(),
3103                self.homeserver(),
3104                self.sliding_sync_version(),
3105                self.inner.http_client.clone(),
3106                self.inner
3107                    .base_client
3108                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
3109                    .await?,
3110                self.inner.caches.supported_versions.read().await.clone(),
3111                self.inner.caches.well_known.read().await.clone(),
3112                self.inner.respect_login_well_known,
3113                self.inner.event_cache.clone(),
3114                self.inner.send_queue_data.clone(),
3115                self.inner.latest_events.clone(),
3116                #[cfg(feature = "e2e-encryption")]
3117                self.inner.e2ee.encryption_settings,
3118                #[cfg(feature = "e2e-encryption")]
3119                self.inner.enable_share_history_on_invite,
3120                cross_process_store_locks_holder_name,
3121                #[cfg(feature = "experimental-search")]
3122                self.inner.search_index.clone(),
3123                self.inner.thread_subscription_catchup.clone(),
3124            )
3125            .await,
3126        };
3127
3128        Ok(client)
3129    }
3130
3131    /// The [`EventCache`] instance for this [`Client`].
3132    pub fn event_cache(&self) -> &EventCache {
3133        // SAFETY: always initialized in the `Client` ctor.
3134        self.inner.event_cache.get().unwrap()
3135    }
3136
3137    /// The [`LatestEvents`] instance for this [`Client`].
3138    pub async fn latest_events(&self) -> &LatestEvents {
3139        self.inner
3140            .latest_events
3141            .get_or_init(|| async {
3142                LatestEvents::new(
3143                    WeakClient::from_client(self),
3144                    self.event_cache().clone(),
3145                    SendQueue::new(self.clone()),
3146                    self.room_info_notable_update_receiver(),
3147                )
3148            })
3149            .await
3150    }
3151
3152    /// Waits until an at least partially synced room is received, and returns
3153    /// it.
3154    ///
3155    /// **Note: this function will loop endlessly until either it finds the room
3156    /// or an externally set timeout happens.**
3157    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3158        loop {
3159            if let Some(room) = self.get_room(room_id) {
3160                if room.is_state_partially_or_fully_synced() {
3161                    debug!("Found just created room!");
3162                    return room;
3163                }
3164                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3165            } else {
3166                debug!("Room wasn't found, waiting for sync beat to try again");
3167            }
3168            self.inner.sync_beat.listen().await;
3169        }
3170    }
3171
3172    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3173    /// join it.
3174    pub async fn knock(
3175        &self,
3176        room_id_or_alias: OwnedRoomOrAliasId,
3177        reason: Option<String>,
3178        server_names: Vec<OwnedServerName>,
3179    ) -> Result<Room> {
3180        let request =
3181            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3182        let response = self.send(request).await?;
3183        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3184        Ok(Room::new(self.clone(), base_room))
3185    }
3186
3187    /// Checks whether the provided `user_id` belongs to an ignored user.
3188    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3189        self.base_client().is_user_ignored(user_id).await
3190    }
3191
3192    /// Gets the `max_upload_size` value from the homeserver, getting either a
3193    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3194    /// missing.
3195    ///
3196    /// Check the spec for more info:
3197    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3198    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3199        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3200        if let Some(data) = max_upload_size_lock.get() {
3201            return Ok(data.to_owned());
3202        }
3203
3204        // Use the authenticated endpoint when the server supports it.
3205        let supported_versions = self.supported_versions().await?;
3206        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3207            .is_supported(&supported_versions);
3208
3209        let upload_size = if use_auth {
3210            self.send(authenticated_media::get_media_config::v1::Request::default())
3211                .await?
3212                .upload_size
3213        } else {
3214            #[allow(deprecated)]
3215            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3216        };
3217
3218        match max_upload_size_lock.set(upload_size) {
3219            Ok(_) => Ok(upload_size),
3220            Err(error) => {
3221                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3222            }
3223        }
3224    }
3225
3226    /// The settings to use for decrypting events.
3227    #[cfg(feature = "e2e-encryption")]
3228    pub fn decryption_settings(&self) -> &DecryptionSettings {
3229        &self.base_client().decryption_settings
3230    }
3231
3232    /// Returns the [`SearchIndex`] for this [`Client`].
3233    #[cfg(feature = "experimental-search")]
3234    pub fn search_index(&self) -> &SearchIndex {
3235        &self.inner.search_index
3236    }
3237
3238    /// Whether the client is configured to take thread subscriptions (MSC4306
3239    /// and MSC4308) into account.
3240    ///
3241    /// This may cause filtering out of thread subscriptions, and loading the
3242    /// thread subscriptions via the sliding sync extension, when the room
3243    /// list service is being used.
3244    pub fn enabled_thread_subscriptions(&self) -> bool {
3245        match self.base_client().threading_support {
3246            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3247            ThreadingSupport::Disabled => false,
3248        }
3249    }
3250
3251    /// Fetch thread subscriptions changes between `from` and up to `to`.
3252    ///
3253    /// The `limit` optional parameter can be used to limit the number of
3254    /// entries in a response. It can also be overridden by the server, if
3255    /// it's deemed too large.
3256    pub async fn fetch_thread_subscriptions(
3257        &self,
3258        from: Option<String>,
3259        to: Option<String>,
3260        limit: Option<UInt>,
3261    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3262        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3263            from,
3264            to,
3265            limit,
3266        });
3267        Ok(self.send(request).await?)
3268    }
3269
3270    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3271        self.inner.thread_subscription_catchup.get().unwrap()
3272    }
3273
3274    /// Perform database optimizations if any are available, i.e. vacuuming in
3275    /// SQLite.
3276    ///
3277    /// **Warning:** this was added to check if SQLite fragmentation was the
3278    /// source of performance issues, **DO NOT use in production**.
3279    #[doc(hidden)]
3280    pub async fn optimize_stores(&self) -> Result<()> {
3281        trace!("Optimizing state store...");
3282        self.state_store().optimize().await?;
3283
3284        trace!("Optimizing event cache store...");
3285        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3286            clean_lock.optimize().await?;
3287        }
3288
3289        trace!("Optimizing media store...");
3290        self.media_store().lock().await?.optimize().await?;
3291
3292        Ok(())
3293    }
3294
3295    /// Returns the sizes of the existing stores, if known.
3296    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3297        #[cfg(feature = "e2e-encryption")]
3298        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3299            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3300        {
3301            Some(store_size)
3302        } else {
3303            None
3304        };
3305        #[cfg(not(feature = "e2e-encryption"))]
3306        let crypto_store_size = None;
3307
3308        let state_store_size = self.state_store().get_size().await.ok().flatten();
3309
3310        let event_cache_store_size = if let Some(clean_lock) =
3311            self.event_cache_store().lock().await?.as_clean()
3312            && let Ok(Some(store_size)) = clean_lock.get_size().await
3313        {
3314            Some(store_size)
3315        } else {
3316            None
3317        };
3318
3319        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3320
3321        Ok(StoreSizes {
3322            crypto_store: crypto_store_size,
3323            state_store: state_store_size,
3324            event_cache_store: event_cache_store_size,
3325            media_store: media_store_size,
3326        })
3327    }
3328
3329    /// Get a reference to the client's task monitor, for spawning background
3330    /// tasks.
3331    pub fn task_monitor(&self) -> &TaskMonitor {
3332        &self.inner.task_monitor
3333    }
3334
3335    /// Add a subscriber for duplicate key upload error notifications triggered
3336    /// by requests to /keys/upload.
3337    #[cfg(feature = "e2e-encryption")]
3338    pub fn subscribe_to_duplicate_key_upload_errors(
3339        &self,
3340    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3341        self.inner.duplicate_key_upload_error_sender.subscribe()
3342    }
3343}
3344
3345/// Contains the disk size of the different stores, if known. It won't be
3346/// available for in-memory stores.
3347#[derive(Debug, Clone)]
3348pub struct StoreSizes {
3349    /// The size of the CryptoStore.
3350    pub crypto_store: Option<usize>,
3351    /// The size of the StateStore.
3352    pub state_store: Option<usize>,
3353    /// The size of the EventCacheStore.
3354    pub event_cache_store: Option<usize>,
3355    /// The size of the MediaStore.
3356    pub media_store: Option<usize>,
3357}
3358
3359#[cfg(any(feature = "testing", test))]
3360impl Client {
3361    /// Test helper to mark users as tracked by the crypto layer.
3362    #[cfg(feature = "e2e-encryption")]
3363    pub async fn update_tracked_users_for_testing(
3364        &self,
3365        user_ids: impl IntoIterator<Item = &UserId>,
3366    ) {
3367        let olm = self.olm_machine().await;
3368        let olm = olm.as_ref().unwrap();
3369        olm.update_tracked_users(user_ids).await.unwrap();
3370    }
3371}
3372
3373/// A weak reference to the inner client, useful when trying to get a handle
3374/// on the owning client.
3375#[derive(Clone, Debug)]
3376pub(crate) struct WeakClient {
3377    client: Weak<ClientInner>,
3378}
3379
3380impl WeakClient {
3381    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3382    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3383        Self { client: Arc::downgrade(client) }
3384    }
3385
3386    /// Construct a [`WeakClient`] from a [`Client`].
3387    pub fn from_client(client: &Client) -> Self {
3388        Self::from_inner(&client.inner)
3389    }
3390
3391    /// Attempts to get a [`Client`] from this [`WeakClient`].
3392    pub fn get(&self) -> Option<Client> {
3393        self.client.upgrade().map(|inner| Client { inner })
3394    }
3395
3396    /// Gets the number of strong (`Arc`) pointers still pointing to this
3397    /// client.
3398    #[allow(dead_code)]
3399    pub fn strong_count(&self) -> usize {
3400        self.client.strong_count()
3401    }
3402}
3403
3404/// Information about the state of a room before we joined it.
3405#[derive(Debug, Clone, Default)]
3406struct PreJoinRoomInfo {
3407    /// The user who invited us to the room, if any.
3408    pub inviter: Option<RoomMember>,
3409}
3410
3411// The http mocking library is not supported for wasm32
3412#[cfg(all(test, not(target_family = "wasm")))]
3413pub(crate) mod tests {
3414    use std::{sync::Arc, time::Duration};
3415
3416    use assert_matches::assert_matches;
3417    use assert_matches2::assert_let;
3418    use eyeball::SharedObservable;
3419    use futures_util::{FutureExt, StreamExt, pin_mut};
3420    use js_int::{UInt, uint};
3421    use matrix_sdk_base::{
3422        RoomState,
3423        store::{MemoryStore, StoreConfig},
3424    };
3425    use matrix_sdk_test::{
3426        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3427        event_factory::EventFactory,
3428    };
3429    #[cfg(target_family = "wasm")]
3430    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3431
3432    use ruma::{
3433        RoomId, ServerName, UserId,
3434        api::{
3435            FeatureFlag, MatrixVersion,
3436            client::{
3437                discovery::discover_homeserver::RtcFocusInfo,
3438                room::create_room::v3::Request as CreateRoomRequest,
3439            },
3440        },
3441        assign,
3442        events::{
3443            ignored_user_list::IgnoredUserListEventContent,
3444            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3445        },
3446        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3447    };
3448    use serde_json::json;
3449    use stream_assert::{assert_next_matches, assert_pending};
3450    use tokio::{
3451        spawn,
3452        time::{sleep, timeout},
3453    };
3454    use url::Url;
3455
3456    use super::Client;
3457    use crate::{
3458        Error, TransmissionProgress,
3459        client::{WeakClient, futures::SendMediaUploadRequest},
3460        config::{RequestConfig, SyncSettings},
3461        futures::SendRequest,
3462        media::MediaError,
3463        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3464    };
3465
3466    #[async_test]
3467    async fn test_account_data() {
3468        let server = MatrixMockServer::new().await;
3469        let client = server.client_builder().build().await;
3470
3471        let f = EventFactory::new();
3472        server
3473            .mock_sync()
3474            .ok_and_run(&client, |builder| {
3475                builder.add_global_account_data(
3476                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3477                );
3478            })
3479            .await;
3480
3481        let content = client
3482            .account()
3483            .account_data::<IgnoredUserListEventContent>()
3484            .await
3485            .unwrap()
3486            .unwrap()
3487            .deserialize()
3488            .unwrap();
3489
3490        assert_eq!(content.ignored_users.len(), 1);
3491    }
3492
3493    #[async_test]
3494    async fn test_successful_discovery() {
3495        // Imagine this is `matrix.org`.
3496        let server = MatrixMockServer::new().await;
3497        let server_url = server.uri();
3498
3499        // Imagine this is `matrix-client.matrix.org`.
3500        let homeserver = MatrixMockServer::new().await;
3501        let homeserver_url = homeserver.uri();
3502
3503        // Imagine Alice has the user ID `@alice:matrix.org`.
3504        let domain = server_url.strip_prefix("http://").unwrap();
3505        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3506
3507        // The `.well-known` is on the server (e.g. `matrix.org`).
3508        server
3509            .mock_well_known()
3510            .ok_with_homeserver_url(&homeserver_url)
3511            .mock_once()
3512            .named("well-known")
3513            .mount()
3514            .await;
3515
3516        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3517        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3518
3519        let client = Client::builder()
3520            .insecure_server_name_no_tls(alice.server_name())
3521            .build()
3522            .await
3523            .unwrap();
3524
3525        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3526        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3527        client.server_versions().await.unwrap();
3528    }
3529
3530    #[async_test]
3531    async fn test_discovery_broken_server() {
3532        let server = MatrixMockServer::new().await;
3533        let server_url = server.uri();
3534        let domain = server_url.strip_prefix("http://").unwrap();
3535        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3536
3537        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3538
3539        assert!(
3540            Client::builder()
3541                .insecure_server_name_no_tls(alice.server_name())
3542                .build()
3543                .await
3544                .is_err(),
3545            "Creating a client from a user ID should fail when the .well-known request fails."
3546        );
3547    }
3548
3549    #[async_test]
3550    async fn test_room_creation() {
3551        let server = MatrixMockServer::new().await;
3552        let client = server.client_builder().build().await;
3553
3554        server
3555            .mock_sync()
3556            .ok_and_run(&client, |builder| {
3557                builder.add_joined_room(
3558                    JoinedRoomBuilder::default()
3559                        .add_state_event(StateTestEvent::Member)
3560                        .add_state_event(StateTestEvent::PowerLevels),
3561                );
3562            })
3563            .await;
3564
3565        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3566        assert_eq!(room.state(), RoomState::Joined);
3567    }
3568
3569    #[async_test]
3570    async fn test_retry_limit_http_requests() {
3571        let server = MatrixMockServer::new().await;
3572        let client = server
3573            .client_builder()
3574            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3575            .build()
3576            .await;
3577
3578        assert!(client.request_config().retry_limit.unwrap() == 4);
3579
3580        server.mock_who_am_i().error500().expect(4).mount().await;
3581
3582        client.whoami().await.unwrap_err();
3583    }
3584
3585    #[async_test]
3586    async fn test_retry_timeout_http_requests() {
3587        // Keep this timeout small so that the test doesn't take long
3588        let retry_timeout = Duration::from_secs(5);
3589        let server = MatrixMockServer::new().await;
3590        let client = server
3591            .client_builder()
3592            .on_builder(|builder| {
3593                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3594            })
3595            .build()
3596            .await;
3597
3598        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3599
3600        server.mock_login().error500().expect(2..).mount().await;
3601
3602        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3603    }
3604
3605    #[async_test]
3606    async fn test_short_retry_initial_http_requests() {
3607        let server = MatrixMockServer::new().await;
3608        let client = server
3609            .client_builder()
3610            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3611            .build()
3612            .await;
3613
3614        server.mock_login().error500().expect(3..).mount().await;
3615
3616        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3617    }
3618
3619    #[async_test]
3620    async fn test_no_retry_http_requests() {
3621        let server = MatrixMockServer::new().await;
3622        let client = server.client_builder().build().await;
3623
3624        server.mock_devices().error500().mock_once().mount().await;
3625
3626        client.devices().await.unwrap_err();
3627    }
3628
3629    #[async_test]
3630    async fn test_set_homeserver() {
3631        let client = MockClientBuilder::new(None).build().await;
3632        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3633
3634        let homeserver = Url::parse("http://example.com/").unwrap();
3635        client.set_homeserver(homeserver.clone());
3636        assert_eq!(client.homeserver(), homeserver);
3637    }
3638
3639    #[async_test]
3640    async fn test_search_user_request() {
3641        let server = MatrixMockServer::new().await;
3642        let client = server.client_builder().build().await;
3643
3644        server.mock_user_directory().ok().mock_once().mount().await;
3645
3646        let response = client.search_users("test", 50).await.unwrap();
3647        assert_eq!(response.results.len(), 1);
3648        let result = &response.results[0];
3649        assert_eq!(result.user_id.to_string(), "@test:example.me");
3650        assert_eq!(result.display_name.clone().unwrap(), "Test");
3651        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3652        assert!(!response.limited);
3653    }
3654
3655    #[async_test]
3656    async fn test_request_unstable_features() {
3657        let server = MatrixMockServer::new().await;
3658        let client = server.client_builder().no_server_versions().build().await;
3659
3660        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3661
3662        let unstable_features = client.unstable_features().await.unwrap();
3663        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3664        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3665    }
3666
3667    #[async_test]
3668    async fn test_can_homeserver_push_encrypted_event_to_device() {
3669        let server = MatrixMockServer::new().await;
3670        let client = server.client_builder().no_server_versions().build().await;
3671
3672        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3673
3674        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3675        assert!(msc4028_enabled);
3676    }
3677
3678    #[async_test]
3679    async fn test_recently_visited_rooms() {
3680        // Tracking recently visited rooms requires authentication
3681        let client = MockClientBuilder::new(None).unlogged().build().await;
3682        assert_matches!(
3683            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3684            Err(Error::AuthenticationRequired)
3685        );
3686
3687        let client = MockClientBuilder::new(None).build().await;
3688        let account = client.account();
3689
3690        // We should start off with an empty list
3691        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3692
3693        // Tracking a valid room id should add it to the list
3694        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3695        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3696        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3697
3698        // And the existing list shouldn't be changed
3699        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3700        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3701
3702        // Tracking the same room again shouldn't change the list
3703        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3704        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3705        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3706
3707        // Tracking a second room should add it to the front of the list
3708        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3709        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3710        assert_eq!(
3711            account.get_recently_visited_rooms().await.unwrap(),
3712            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3713        );
3714
3715        // Tracking the first room yet again should move it to the front of the list
3716        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3717        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3718        assert_eq!(
3719            account.get_recently_visited_rooms().await.unwrap(),
3720            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3721        );
3722
3723        // Tracking should be capped at 20
3724        for n in 0..20 {
3725            account
3726                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3727                .await
3728                .unwrap();
3729        }
3730
3731        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3732
3733        // And the initial rooms should've been pushed out
3734        let rooms = account.get_recently_visited_rooms().await.unwrap();
3735        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3736        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3737
3738        // And the last tracked room should be the first
3739        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3740    }
3741
3742    #[async_test]
3743    async fn test_client_no_cycle_with_event_cache() {
3744        let client = MockClientBuilder::new(None).build().await;
3745
3746        // Wait for the init tasks to die.
3747        sleep(Duration::from_secs(1)).await;
3748
3749        let weak_client = WeakClient::from_client(&client);
3750        assert_eq!(weak_client.strong_count(), 1);
3751
3752        {
3753            let room_id = room_id!("!room:example.org");
3754
3755            // Have the client know the room.
3756            let response = SyncResponseBuilder::default()
3757                .add_joined_room(JoinedRoomBuilder::new(room_id))
3758                .build_sync_response();
3759            client.inner.base_client.receive_sync_response(response).await.unwrap();
3760
3761            client.event_cache().subscribe().unwrap();
3762
3763            let (_room_event_cache, _drop_handles) =
3764                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3765        }
3766
3767        drop(client);
3768
3769        // Give a bit of time for background tasks to die.
3770        sleep(Duration::from_secs(1)).await;
3771
3772        // The weak client must be the last reference to the client now.
3773        assert_eq!(weak_client.strong_count(), 0);
3774        let client = weak_client.get();
3775        assert!(
3776            client.is_none(),
3777            "too many strong references to the client: {}",
3778            Arc::strong_count(&client.unwrap().inner)
3779        );
3780    }
3781
3782    #[async_test]
3783    async fn test_supported_versions_caching() {
3784        let server = MatrixMockServer::new().await;
3785
3786        let versions_mock = server
3787            .mock_versions()
3788            .expect_default_access_token()
3789            .ok_with_unstable_features()
3790            .named("first versions mock")
3791            .expect(1)
3792            .mount_as_scoped()
3793            .await;
3794
3795        let memory_store = Arc::new(MemoryStore::new());
3796        let client = server
3797            .client_builder()
3798            .no_server_versions()
3799            .on_builder(|builder| {
3800                builder.store_config(
3801                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3802                        .state_store(memory_store.clone()),
3803                )
3804            })
3805            .build()
3806            .await;
3807
3808        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3809
3810        // The result was cached.
3811        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3812        // This subsequent call hits the in-memory cache.
3813        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3814
3815        drop(client);
3816
3817        let client = server
3818            .client_builder()
3819            .no_server_versions()
3820            .on_builder(|builder| {
3821                builder.store_config(
3822                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3823                        .state_store(memory_store.clone()),
3824                )
3825            })
3826            .build()
3827            .await;
3828
3829        // These calls to the new client hit the on-disk cache.
3830        assert!(
3831            client
3832                .unstable_features()
3833                .await
3834                .unwrap()
3835                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3836        );
3837        let supported = client.supported_versions().await.unwrap();
3838        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3839        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3840
3841        // Then this call hits the in-memory cache.
3842        let supported = client.supported_versions().await.unwrap();
3843        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3844        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3845
3846        drop(versions_mock);
3847
3848        // Now, reset the cache, and observe the endpoints being called again once.
3849        client.reset_supported_versions().await.unwrap();
3850
3851        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3852
3853        // Hits network again.
3854        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3855        // Hits in-memory cache again.
3856        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3857    }
3858
3859    #[async_test]
3860    async fn test_well_known_caching() {
3861        let server = MatrixMockServer::new().await;
3862        let server_url = server.uri();
3863        let domain = server_url.strip_prefix("http://").unwrap();
3864        let server_name = <&ServerName>::try_from(domain).unwrap();
3865        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3866
3867        let well_known_mock = server
3868            .mock_well_known()
3869            .ok()
3870            .named("well known mock")
3871            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3872            .mount_as_scoped()
3873            .await;
3874
3875        let memory_store = Arc::new(MemoryStore::new());
3876        let client = Client::builder()
3877            .insecure_server_name_no_tls(server_name)
3878            .store_config(
3879                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3880                    .state_store(memory_store.clone()),
3881            )
3882            .build()
3883            .await
3884            .unwrap();
3885
3886        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3887
3888        // This subsequent call hits the in-memory cache.
3889        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3890
3891        drop(client);
3892
3893        let client = server
3894            .client_builder()
3895            .no_server_versions()
3896            .on_builder(|builder| {
3897                builder.store_config(
3898                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3899                        .state_store(memory_store.clone()),
3900                )
3901            })
3902            .build()
3903            .await;
3904
3905        // This call to the new client hits the on-disk cache.
3906        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3907
3908        // Then this call hits the in-memory cache.
3909        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3910
3911        drop(well_known_mock);
3912
3913        // Now, reset the cache, and observe the endpoints being called again once.
3914        client.reset_well_known().await.unwrap();
3915
3916        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3917
3918        // Hits network again.
3919        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3920        // Hits in-memory cache again.
3921        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3922    }
3923
3924    #[async_test]
3925    async fn test_missing_well_known_caching() {
3926        let server = MatrixMockServer::new().await;
3927        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3928
3929        let well_known_mock = server
3930            .mock_well_known()
3931            .error_unrecognized()
3932            .named("first well-known mock")
3933            .expect(1)
3934            .mount_as_scoped()
3935            .await;
3936
3937        let memory_store = Arc::new(MemoryStore::new());
3938        let client = server
3939            .client_builder()
3940            .on_builder(|builder| {
3941                builder.store_config(
3942                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3943                        .state_store(memory_store.clone()),
3944                )
3945            })
3946            .build()
3947            .await;
3948
3949        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3950
3951        // This subsequent call hits the in-memory cache.
3952        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3953
3954        drop(client);
3955
3956        let client = server
3957            .client_builder()
3958            .on_builder(|builder| {
3959                builder.store_config(
3960                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3961                        .state_store(memory_store.clone()),
3962                )
3963            })
3964            .build()
3965            .await;
3966
3967        // This call to the new client hits the on-disk cache.
3968        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3969
3970        // Then this call hits the in-memory cache.
3971        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3972
3973        drop(well_known_mock);
3974
3975        // Now, reset the cache, and observe the endpoints being called again once.
3976        client.reset_well_known().await.unwrap();
3977
3978        server
3979            .mock_well_known()
3980            .error_unrecognized()
3981            .expect(1)
3982            .named("second well-known mock")
3983            .mount()
3984            .await;
3985
3986        // Hits network again.
3987        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3988        // Hits in-memory cache again.
3989        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3990    }
3991
3992    #[async_test]
3993    async fn test_no_network_doesnt_cause_infinite_retries() {
3994        // We want infinite retries for transient errors.
3995        let client = MockClientBuilder::new(None)
3996            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3997            .build()
3998            .await;
3999
4000        // We don't define a mock server on purpose here, so that the error is really a
4001        // network error.
4002        client.whoami().await.unwrap_err();
4003    }
4004
4005    #[async_test]
4006    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4007        let server = MatrixMockServer::new().await;
4008        let client = server.client_builder().build().await;
4009
4010        let room_id = room_id!("!room:example.org");
4011
4012        server
4013            .mock_sync()
4014            .ok_and_run(&client, |builder| {
4015                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4016            })
4017            .await;
4018
4019        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4020        assert_eq!(room.room_id(), room_id);
4021    }
4022
4023    #[async_test]
4024    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4025        let server = MatrixMockServer::new().await;
4026        let client = server.client_builder().build().await;
4027
4028        let room_id = room_id!("!room:example.org");
4029
4030        let client = Arc::new(client);
4031
4032        // Perform the /sync request with a delay so it starts after the
4033        // `await_room_remote_echo` call has happened
4034        spawn({
4035            let client = client.clone();
4036            async move {
4037                sleep(Duration::from_millis(100)).await;
4038
4039                server
4040                    .mock_sync()
4041                    .ok_and_run(&client, |builder| {
4042                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4043                    })
4044                    .await;
4045            }
4046        });
4047
4048        let room =
4049            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4050        assert_eq!(room.room_id(), room_id);
4051    }
4052
4053    #[async_test]
4054    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4055        let client = MockClientBuilder::new(None).build().await;
4056
4057        let room_id = room_id!("!room:example.org");
4058        // Room is not present so the client won't be able to find it. The call will
4059        // timeout.
4060        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4061    }
4062
4063    #[async_test]
4064    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4065        let server = MatrixMockServer::new().await;
4066        let client = server.client_builder().build().await;
4067
4068        server.mock_create_room().ok().mount().await;
4069
4070        // Create a room in the internal store
4071        let room = client
4072            .create_room(assign!(CreateRoomRequest::new(), {
4073                invite: vec![],
4074                is_direct: false,
4075            }))
4076            .await
4077            .unwrap();
4078
4079        // Room is locally present, but not synced, the call will timeout
4080        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4081            .await
4082            .unwrap_err();
4083    }
4084
4085    #[async_test]
4086    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4087        let server = MatrixMockServer::new().await;
4088        let client = server.client_builder().build().await;
4089
4090        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4091
4092        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4093        assert_matches!(ret, Ok(true));
4094    }
4095
4096    #[async_test]
4097    async fn test_is_room_alias_available_if_alias_is_resolved() {
4098        let server = MatrixMockServer::new().await;
4099        let client = server.client_builder().build().await;
4100
4101        server
4102            .mock_room_directory_resolve_alias()
4103            .ok("!some_room_id:matrix.org", Vec::new())
4104            .expect(1)
4105            .mount()
4106            .await;
4107
4108        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4109        assert_matches!(ret, Ok(false));
4110    }
4111
4112    #[async_test]
4113    async fn test_is_room_alias_available_if_error_found() {
4114        let server = MatrixMockServer::new().await;
4115        let client = server.client_builder().build().await;
4116
4117        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4118
4119        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4120        assert_matches!(ret, Err(_));
4121    }
4122
4123    #[async_test]
4124    async fn test_create_room_alias() {
4125        let server = MatrixMockServer::new().await;
4126        let client = server.client_builder().build().await;
4127
4128        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4129
4130        let ret = client
4131            .create_room_alias(
4132                room_alias_id!("#some_alias:matrix.org"),
4133                room_id!("!some_room:matrix.org"),
4134            )
4135            .await;
4136        assert_matches!(ret, Ok(()));
4137    }
4138
4139    #[async_test]
4140    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4141        let server = MatrixMockServer::new().await;
4142        let client = server.client_builder().build().await;
4143
4144        let room_id = room_id!("!a-room:matrix.org");
4145
4146        // Make sure the summary endpoint is called once
4147        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4148
4149        // We create a locally cached invited room
4150        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4151
4152        // And we get a preview, the server endpoint was reached
4153        let preview = client
4154            .get_room_preview(room_id.into(), Vec::new())
4155            .await
4156            .expect("Room preview should be retrieved");
4157
4158        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
4159    }
4160
4161    #[async_test]
4162    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4163        let server = MatrixMockServer::new().await;
4164        let client = server.client_builder().build().await;
4165
4166        let room_id = room_id!("!a-room:matrix.org");
4167
4168        // Make sure the summary endpoint is called once
4169        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4170
4171        // We create a locally cached left room
4172        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4173
4174        // And we get a preview, the server endpoint was reached
4175        let preview = client
4176            .get_room_preview(room_id.into(), Vec::new())
4177            .await
4178            .expect("Room preview should be retrieved");
4179
4180        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
4181    }
4182
4183    #[async_test]
4184    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4185        let server = MatrixMockServer::new().await;
4186        let client = server.client_builder().build().await;
4187
4188        let room_id = room_id!("!a-room:matrix.org");
4189
4190        // Make sure the summary endpoint is called once
4191        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4192
4193        // We create a locally cached knocked room
4194        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4195
4196        // And we get a preview, the server endpoint was reached
4197        let preview = client
4198            .get_room_preview(room_id.into(), Vec::new())
4199            .await
4200            .expect("Room preview should be retrieved");
4201
4202        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
4203    }
4204
4205    #[async_test]
4206    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4207        let server = MatrixMockServer::new().await;
4208        let client = server.client_builder().build().await;
4209
4210        let room_id = room_id!("!a-room:matrix.org");
4211
4212        // Make sure the summary endpoint is not called
4213        server.mock_room_summary().ok(room_id).never().mount().await;
4214
4215        // We create a locally cached joined room
4216        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4217
4218        // And we get a preview, no server endpoint was reached
4219        let preview = client
4220            .get_room_preview(room_id.into(), Vec::new())
4221            .await
4222            .expect("Room preview should be retrieved");
4223
4224        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
4225    }
4226
4227    #[async_test]
4228    async fn test_media_preview_config() {
4229        let server = MatrixMockServer::new().await;
4230        let client = server.client_builder().build().await;
4231
4232        server
4233            .mock_sync()
4234            .ok_and_run(&client, |builder| {
4235                builder.add_custom_global_account_data(json!({
4236                    "content": {
4237                        "media_previews": "private",
4238                        "invite_avatars": "off"
4239                    },
4240                    "type": "m.media_preview_config"
4241                }));
4242            })
4243            .await;
4244
4245        let (initial_value, stream) =
4246            client.account().observe_media_preview_config().await.unwrap();
4247
4248        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4249        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4250        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4251        pin_mut!(stream);
4252        assert_pending!(stream);
4253
4254        server
4255            .mock_sync()
4256            .ok_and_run(&client, |builder| {
4257                builder.add_custom_global_account_data(json!({
4258                    "content": {
4259                        "media_previews": "off",
4260                        "invite_avatars": "on"
4261                    },
4262                    "type": "m.media_preview_config"
4263                }));
4264            })
4265            .await;
4266
4267        assert_next_matches!(
4268            stream,
4269            MediaPreviewConfigEventContent {
4270                media_previews: Some(MediaPreviews::Off),
4271                invite_avatars: Some(InviteAvatars::On),
4272                ..
4273            }
4274        );
4275        assert_pending!(stream);
4276    }
4277
4278    #[async_test]
4279    async fn test_unstable_media_preview_config() {
4280        let server = MatrixMockServer::new().await;
4281        let client = server.client_builder().build().await;
4282
4283        server
4284            .mock_sync()
4285            .ok_and_run(&client, |builder| {
4286                builder.add_custom_global_account_data(json!({
4287                    "content": {
4288                        "media_previews": "private",
4289                        "invite_avatars": "off"
4290                    },
4291                    "type": "io.element.msc4278.media_preview_config"
4292                }));
4293            })
4294            .await;
4295
4296        let (initial_value, stream) =
4297            client.account().observe_media_preview_config().await.unwrap();
4298
4299        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4300        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4301        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4302        pin_mut!(stream);
4303        assert_pending!(stream);
4304
4305        server
4306            .mock_sync()
4307            .ok_and_run(&client, |builder| {
4308                builder.add_custom_global_account_data(json!({
4309                    "content": {
4310                        "media_previews": "off",
4311                        "invite_avatars": "on"
4312                    },
4313                    "type": "io.element.msc4278.media_preview_config"
4314                }));
4315            })
4316            .await;
4317
4318        assert_next_matches!(
4319            stream,
4320            MediaPreviewConfigEventContent {
4321                media_previews: Some(MediaPreviews::Off),
4322                invite_avatars: Some(InviteAvatars::On),
4323                ..
4324            }
4325        );
4326        assert_pending!(stream);
4327    }
4328
4329    #[async_test]
4330    async fn test_media_preview_config_not_found() {
4331        let server = MatrixMockServer::new().await;
4332        let client = server.client_builder().build().await;
4333
4334        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4335
4336        assert!(initial_value.is_none());
4337    }
4338
4339    #[async_test]
4340    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4341        // The default Matrix version we use is 1.11 or higher, so authenticated media
4342        // is supported.
4343        let server = MatrixMockServer::new().await;
4344        let client = server.client_builder().build().await;
4345
4346        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4347
4348        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4349        client.load_or_fetch_max_upload_size().await.unwrap();
4350
4351        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4352    }
4353
4354    #[async_test]
4355    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4356        // The server must advertise support for the stable feature for authenticated
4357        // media support, so we mock the `GET /versions` response.
4358        let server = MatrixMockServer::new().await;
4359        let client = server.client_builder().no_server_versions().build().await;
4360
4361        server
4362            .mock_versions()
4363            .ok_custom(
4364                &["v1.7", "v1.8", "v1.9", "v1.10"],
4365                &[("org.matrix.msc3916.stable", true)].into(),
4366            )
4367            .named("versions")
4368            .expect(1)
4369            .mount()
4370            .await;
4371
4372        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4373
4374        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4375        client.load_or_fetch_max_upload_size().await.unwrap();
4376
4377        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4378    }
4379
4380    #[async_test]
4381    async fn test_load_or_fetch_max_upload_size_no_auth() {
4382        // The server must not support Matrix 1.11 or higher for unauthenticated
4383        // media requests, so we mock the `GET /versions` response.
4384        let server = MatrixMockServer::new().await;
4385        let client = server.client_builder().no_server_versions().build().await;
4386
4387        server
4388            .mock_versions()
4389            .ok_custom(&["v1.1"], &Default::default())
4390            .named("versions")
4391            .expect(1)
4392            .mount()
4393            .await;
4394
4395        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4396
4397        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4398        client.load_or_fetch_max_upload_size().await.unwrap();
4399
4400        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4401    }
4402
4403    #[async_test]
4404    async fn test_uploading_a_too_large_media_file() {
4405        let server = MatrixMockServer::new().await;
4406        let client = server.client_builder().build().await;
4407
4408        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4409        client.load_or_fetch_max_upload_size().await.unwrap();
4410        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4411
4412        let data = vec![1, 2];
4413        let upload_request =
4414            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4415        let request = SendRequest {
4416            client: client.clone(),
4417            request: upload_request,
4418            config: None,
4419            send_progress: SharedObservable::new(TransmissionProgress::default()),
4420        };
4421        let media_request = SendMediaUploadRequest::new(request);
4422
4423        let error = media_request.await.err();
4424        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4425        assert_eq!(max, uint!(1));
4426        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4427    }
4428
4429    #[async_test]
4430    async fn test_dont_ignore_timeout_on_first_sync() {
4431        let server = MatrixMockServer::new().await;
4432        let client = server.client_builder().build().await;
4433
4434        server
4435            .mock_sync()
4436            .timeout(Some(Duration::from_secs(30)))
4437            .ok(|_| {})
4438            .mock_once()
4439            .named("sync_with_timeout")
4440            .mount()
4441            .await;
4442
4443        // Call the endpoint once to check the timeout.
4444        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4445
4446        timeout(Duration::from_secs(1), async {
4447            stream.next().await.unwrap().unwrap();
4448        })
4449        .await
4450        .unwrap();
4451    }
4452
4453    #[async_test]
4454    async fn test_ignore_timeout_on_first_sync() {
4455        let server = MatrixMockServer::new().await;
4456        let client = server.client_builder().build().await;
4457
4458        server
4459            .mock_sync()
4460            .timeout(None)
4461            .ok(|_| {})
4462            .mock_once()
4463            .named("sync_no_timeout")
4464            .mount()
4465            .await;
4466        server
4467            .mock_sync()
4468            .timeout(Some(Duration::from_secs(30)))
4469            .ok(|_| {})
4470            .mock_once()
4471            .named("sync_with_timeout")
4472            .mount()
4473            .await;
4474
4475        // Call each version of the endpoint once to check the timeouts.
4476        let mut stream = Box::pin(
4477            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4478        );
4479
4480        timeout(Duration::from_secs(1), async {
4481            stream.next().await.unwrap().unwrap();
4482            stream.next().await.unwrap().unwrap();
4483        })
4484        .await
4485        .unwrap();
4486    }
4487
4488    #[async_test]
4489    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4490        let server = MatrixMockServer::new().await;
4491        let client = server.client_builder().build().await;
4492        // This is the user ID that is inside MemberAdditional.
4493        // Note the confusing username, so we can share
4494        // GlobalAccountDataTestEvent::Direct with the invited test.
4495        let user_id = user_id!("@invited:localhost");
4496
4497        // When we receive a sync response saying "invited" is invited to a DM
4498        let f = EventFactory::new();
4499        let response = SyncResponseBuilder::default()
4500            .add_joined_room(
4501                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4502            )
4503            .add_global_account_data(
4504                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4505            )
4506            .build_sync_response();
4507        client.base_client().receive_sync_response(response).await.unwrap();
4508
4509        // Then get_dm_room finds this room
4510        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4511        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4512    }
4513
4514    #[async_test]
4515    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4516        let server = MatrixMockServer::new().await;
4517        let client = server.client_builder().build().await;
4518        // This is the user ID that is inside MemberInvite
4519        let user_id = user_id!("@invited:localhost");
4520
4521        // When we receive a sync response saying "invited" is invited to a DM
4522        let f = EventFactory::new();
4523        let response = SyncResponseBuilder::default()
4524            .add_joined_room(
4525                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4526            )
4527            .add_global_account_data(
4528                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4529            )
4530            .build_sync_response();
4531        client.base_client().receive_sync_response(response).await.unwrap();
4532
4533        // Then get_dm_room finds this room
4534        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4535        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4536    }
4537
4538    #[async_test]
4539    async fn test_get_dm_room_still_finds_left_room() {
4540        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4541        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4542
4543        let server = MatrixMockServer::new().await;
4544        let client = server.client_builder().build().await;
4545        // This is the user ID that is inside MemberAdditional.
4546        // Note the confusing username, so we can share
4547        // GlobalAccountDataTestEvent::Direct with the invited test.
4548        let user_id = user_id!("@invited:localhost");
4549
4550        // When we receive a sync response saying "invited" is invited to a DM
4551        let f = EventFactory::new();
4552        let response = SyncResponseBuilder::default()
4553            .add_joined_room(
4554                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4555            )
4556            .add_global_account_data(
4557                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4558            )
4559            .build_sync_response();
4560        client.base_client().receive_sync_response(response).await.unwrap();
4561
4562        // Then get_dm_room finds this room
4563        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4564        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4565    }
4566
4567    #[async_test]
4568    async fn test_device_exists() {
4569        let server = MatrixMockServer::new().await;
4570        let client = server.client_builder().build().await;
4571
4572        server.mock_get_device().ok().expect(1).mount().await;
4573
4574        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4575    }
4576
4577    #[async_test]
4578    async fn test_device_exists_404() {
4579        let server = MatrixMockServer::new().await;
4580        let client = server.client_builder().build().await;
4581
4582        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4583    }
4584
4585    #[async_test]
4586    async fn test_device_exists_500() {
4587        let server = MatrixMockServer::new().await;
4588        let client = server.client_builder().build().await;
4589
4590        server.mock_get_device().error500().expect(1).mount().await;
4591
4592        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4593    }
4594
4595    #[async_test]
4596    async fn test_fetching_well_known_with_homeserver_url() {
4597        let server = MatrixMockServer::new().await;
4598        let client = server.client_builder().build().await;
4599        server.mock_well_known().ok().mount().await;
4600
4601        assert_matches!(client.fetch_client_well_known().await, Some(_));
4602    }
4603
4604    #[async_test]
4605    async fn test_fetching_well_known_with_server_name() {
4606        let server = MatrixMockServer::new().await;
4607        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4608
4609        server.mock_well_known().ok().mount().await;
4610
4611        let client = MockClientBuilder::new(None)
4612            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4613            .build()
4614            .await;
4615
4616        assert_matches!(client.fetch_client_well_known().await, Some(_));
4617    }
4618
4619    #[async_test]
4620    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4621        let server = MatrixMockServer::new().await;
4622        server.mock_well_known().ok().mount().await;
4623
4624        let user_id =
4625            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4626        let client = MockClientBuilder::new(None)
4627            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4628            .build()
4629            .await;
4630
4631        assert_matches!(client.fetch_client_well_known().await, Some(_));
4632    }
4633}