Skip to main content

matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{
32    DecryptionSettings, store::LockableCryptoStore, store::types::RoomPendingKeyBundleDetails,
33};
34use matrix_sdk_base::{
35    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
36    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
37    event_cache::store::EventCacheStoreLock,
38    media::store::MediaStoreLock,
39    store::{
40        DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
41        WellKnownResponse,
42    },
43    sync::{Notification, RoomUpdates},
44    task_monitor::TaskMonitor,
45};
46use matrix_sdk_common::{cross_process_lock::CrossProcessLockConfig, ttl_cache::TtlCache};
47#[cfg(feature = "e2e-encryption")]
48use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
49use ruma::{
50    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
51    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
52    api::{
53        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
54        client::{
55            account::whoami,
56            alias::{create_alias, delete_alias, get_alias},
57            authenticated_media,
58            device::{self, delete_devices, get_devices, update_device},
59            directory::{get_public_rooms, get_public_rooms_filtered},
60            discovery::{
61                discover_homeserver::{self, RtcFocusInfo},
62                get_supported_versions,
63            },
64            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
65            knock::knock_room,
66            media,
67            membership::{join_room_by_id, join_room_by_id_or_alias},
68            room::create_room,
69            session::login::v3::DiscoveryInfo,
70            sync::sync_events,
71            threads::get_thread_subscriptions_changes,
72            uiaa,
73            user_directory::search_users,
74        },
75        error::{ErrorKind, FromHttpResponseError, UnknownTokenErrorData},
76        path_builder::PathBuilder,
77    },
78    assign,
79    events::direct::DirectUserIdentifier,
80    push::Ruleset,
81    time::Instant,
82};
83use serde::de::DeserializeOwned;
84use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
85use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
86use url::Url;
87
88use self::{
89    caches::{CachedValue, ClientCaches},
90    futures::SendRequest,
91};
92use crate::{
93    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
94    Room, SessionTokens, TransmissionProgress,
95    authentication::{
96        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
97        oauth::OAuth,
98    },
99    client::{
100        homeserver_capabilities::HomeserverCapabilities,
101        thread_subscriptions::ThreadSubscriptionCatchup,
102    },
103    config::{RequestConfig, SyncToken},
104    deduplicating_handler::DeduplicatingHandler,
105    error::HttpResult,
106    event_cache::EventCache,
107    event_handler::{
108        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
109        EventHandlerStore, ObservableEventHandler, SyncEvent,
110    },
111    http_client::{HttpClient, SupportedAuthScheme, SupportedPathBuilder},
112    latest_events::LatestEvents,
113    media::MediaError,
114    notification_settings::NotificationSettings,
115    room::RoomMember,
116    room_preview::RoomPreview,
117    send_queue::{SendQueue, SendQueueData},
118    sliding_sync::Version as SlidingSyncVersion,
119    sync::{RoomUpdate, SyncResponse},
120};
121#[cfg(feature = "e2e-encryption")]
122use crate::{
123    cross_process_lock::CrossProcessLock,
124    encryption::{
125        DuplicateOneTimeKeyErrorMessage, Encryption, EncryptionData, EncryptionSettings,
126        VerificationState,
127    },
128};
129
130mod builder;
131pub(crate) mod caches;
132pub(crate) mod futures;
133pub(crate) mod homeserver_capabilities;
134pub(crate) mod thread_subscriptions;
135
136pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
137#[cfg(feature = "experimental-search")]
138use crate::search_index::SearchIndex;
139
140#[cfg(not(target_family = "wasm"))]
141type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
142#[cfg(target_family = "wasm")]
143type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
144
145#[cfg(not(target_family = "wasm"))]
146type NotificationHandlerFn =
147    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
148#[cfg(target_family = "wasm")]
149type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
150
151/// Enum controlling if a loop running callbacks should continue or abort.
152///
153/// This is mainly used in the [`sync_with_callback`] method, the return value
154/// of the provided callback controls if the sync loop should be exited.
155///
156/// [`sync_with_callback`]: #method.sync_with_callback
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum LoopCtrl {
159    /// Continue running the loop.
160    Continue,
161    /// Break out of the loop.
162    Break,
163}
164
165/// Represents changes that can occur to a `Client`s `Session`.
166#[derive(Debug, Clone, PartialEq)]
167pub enum SessionChange {
168    /// The session's token is no longer valid.
169    UnknownToken(UnknownTokenErrorData),
170    /// The session's tokens have been refreshed.
171    TokensRefreshed,
172}
173
174/// Information about the server vendor obtained from the federation API.
175#[derive(Debug, Clone, PartialEq, Eq)]
176#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
177pub struct ServerVendorInfo {
178    /// The server name.
179    pub server_name: String,
180    /// The server version.
181    pub version: String,
182}
183
184/// An async/await enabled Matrix client.
185///
186/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
187#[derive(Clone)]
188pub struct Client {
189    pub(crate) inner: Arc<ClientInner>,
190}
191
192#[derive(Default)]
193pub(crate) struct ClientLocks {
194    /// Lock ensuring that only a single room may be marked as a DM at once.
195    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
196    /// explanation.
197    pub(crate) mark_as_dm_lock: Mutex<()>,
198
199    /// Lock ensuring that only a single secret store is getting opened at the
200    /// same time.
201    ///
202    /// This is important so we don't accidentally create multiple different new
203    /// default secret storage keys.
204    #[cfg(feature = "e2e-encryption")]
205    pub(crate) open_secret_store_lock: Mutex<()>,
206
207    /// Lock ensuring that we're only storing a single secret at a time.
208    ///
209    /// Take a look at the [`SecretStore::put_secret`] method for a more
210    /// detailed explanation.
211    ///
212    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
213    #[cfg(feature = "e2e-encryption")]
214    pub(crate) store_secret_lock: Mutex<()>,
215
216    /// Lock ensuring that only one method at a time might modify our backup.
217    #[cfg(feature = "e2e-encryption")]
218    pub(crate) backup_modify_lock: Mutex<()>,
219
220    /// Lock ensuring that we're going to attempt to upload backups for a single
221    /// requester.
222    #[cfg(feature = "e2e-encryption")]
223    pub(crate) backup_upload_lock: Mutex<()>,
224
225    /// Handler making sure we only have one group session sharing request in
226    /// flight per room.
227    #[cfg(feature = "e2e-encryption")]
228    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230    /// Lock making sure we're only doing one key claim request at a time.
231    #[cfg(feature = "e2e-encryption")]
232    pub(crate) key_claim_lock: Mutex<()>,
233
234    /// Handler to ensure that only one members request is running at a time,
235    /// given a room.
236    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238    /// Handler to ensure that only one encryption state request is running at a
239    /// time, given a room.
240    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
241
242    /// Deduplicating handler for sending read receipts. The string is an
243    /// internal implementation detail, see [`Self::send_single_receipt`].
244    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
245
246    #[cfg(feature = "e2e-encryption")]
247    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
248}
249
250pub(crate) struct ClientInner {
251    /// All the data related to authentication and authorization.
252    pub(crate) auth_ctx: Arc<AuthCtx>,
253
254    /// The URL of the server.
255    ///
256    /// Not to be confused with the `Self::homeserver`. `server` is usually
257    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
258    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
259    /// homeserver (at the time of writing — 2024-08-28).
260    ///
261    /// This value is optional depending on how the `Client` has been built.
262    /// If it's been built from a homeserver URL directly, we don't know the
263    /// server. However, if the `Client` has been built from a server URL or
264    /// name, then the homeserver has been discovered, and we know both.
265    server: Option<Url>,
266
267    /// The URL of the homeserver to connect to.
268    ///
269    /// This is the URL for the client-server Matrix API.
270    homeserver: StdRwLock<Url>,
271
272    /// The sliding sync version.
273    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
274
275    /// The underlying HTTP client.
276    pub(crate) http_client: HttpClient,
277
278    /// User session data.
279    pub(super) base_client: BaseClient,
280
281    /// Collection of in-memory caches for the [`Client`].
282    pub(crate) caches: ClientCaches,
283
284    /// Collection of locks individual client methods might want to use, either
285    /// to ensure that only a single call to a method happens at once or to
286    /// deduplicate multiple calls to a method.
287    pub(crate) locks: ClientLocks,
288
289    /// The cross-process lock configuration.
290    ///
291    /// The SDK provides cross-process store locks (see
292    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when
293    /// [`CrossProcessLockConfig::MultiProcess`] is used.
294    ///
295    /// If multiple `Client`s are running in different processes, this
296    /// value MUST be different for each `Client`.
297    cross_process_lock_config: CrossProcessLockConfig,
298
299    /// A mapping of the times at which the current user sent typing notices,
300    /// keyed by room.
301    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
302
303    /// Event handlers. See `add_event_handler`.
304    pub(crate) event_handlers: EventHandlerStore,
305
306    /// Notification handlers. See `register_notification_handler`.
307    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
308
309    /// The sender-side of channels used to receive room updates.
310    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
311
312    /// The sender-side of a channel used to observe all the room updates of a
313    /// sync response.
314    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
315
316    /// Whether the client should update its homeserver URL with the discovery
317    /// information present in the login response.
318    respect_login_well_known: bool,
319
320    /// An event that can be listened on to wait for a successful sync. The
321    /// event will only be fired if a sync loop is running. Can be used for
322    /// synchronization, e.g. if we send out a request to create a room, we can
323    /// wait for the sync to get the data to fetch a room object from the state
324    /// store.
325    pub(crate) sync_beat: event_listener::Event,
326
327    /// A central cache for events, inactive first.
328    ///
329    /// It becomes active when [`EventCache::subscribe`] is called.
330    pub(crate) event_cache: OnceCell<EventCache>,
331
332    /// End-to-end encryption related state.
333    #[cfg(feature = "e2e-encryption")]
334    pub(crate) e2ee: EncryptionData,
335
336    /// The verification state of our own device.
337    #[cfg(feature = "e2e-encryption")]
338    pub(crate) verification_state: SharedObservable<VerificationState>,
339
340    /// Whether to enable the experimental support for sending and receiving
341    /// encrypted room history on invite, per [MSC4268].
342    ///
343    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
344    #[cfg(feature = "e2e-encryption")]
345    pub(crate) enable_share_history_on_invite: bool,
346
347    /// Data related to the [`SendQueue`].
348    ///
349    /// [`SendQueue`]: crate::send_queue::SendQueue
350    pub(crate) send_queue_data: Arc<SendQueueData>,
351
352    /// The `max_upload_size` value of the homeserver, it contains the max
353    /// request size you can send.
354    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
355
356    /// The entry point to get the [`LatestEvent`] of rooms and threads.
357    ///
358    /// [`LatestEvent`]: crate::latest_event::LatestEvent
359    latest_events: OnceCell<LatestEvents>,
360
361    /// Service handling the catching up of thread subscriptions in the
362    /// background.
363    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
364
365    #[cfg(feature = "experimental-search")]
366    /// Handler for [`RoomIndex`]'s of each room
367    search_index: SearchIndex,
368
369    /// A monitor for background tasks spawned by the client.
370    pub(crate) task_monitor: TaskMonitor,
371
372    /// A sender to notify subscribers about duplicate key upload errors
373    /// triggered by requests to /keys/upload.
374    #[cfg(feature = "e2e-encryption")]
375    pub(crate) duplicate_key_upload_error_sender:
376        broadcast::Sender<Option<DuplicateOneTimeKeyErrorMessage>>,
377}
378
379impl ClientInner {
380    /// Create a new `ClientInner`.
381    ///
382    /// All the fields passed as parameters here are those that must be cloned
383    /// upon instantiation of a sub-client, e.g. a client specialized for
384    /// notifications.
385    #[allow(clippy::too_many_arguments)]
386    async fn new(
387        auth_ctx: Arc<AuthCtx>,
388        server: Option<Url>,
389        homeserver: Url,
390        sliding_sync_version: SlidingSyncVersion,
391        http_client: HttpClient,
392        base_client: BaseClient,
393        supported_versions: CachedValue<SupportedVersions>,
394        well_known: CachedValue<Option<WellKnownResponse>>,
395        respect_login_well_known: bool,
396        event_cache: OnceCell<EventCache>,
397        send_queue: Arc<SendQueueData>,
398        latest_events: OnceCell<LatestEvents>,
399        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
400        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
401        cross_process_lock_config: CrossProcessLockConfig,
402        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
403        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
404    ) -> Arc<Self> {
405        let caches = ClientCaches {
406            supported_versions: supported_versions.into(),
407            well_known: well_known.into(),
408            server_metadata: Mutex::new(TtlCache::new()),
409        };
410
411        let client = Self {
412            server,
413            homeserver: StdRwLock::new(homeserver),
414            auth_ctx,
415            sliding_sync_version: StdRwLock::new(sliding_sync_version),
416            http_client,
417            base_client,
418            caches,
419            locks: Default::default(),
420            cross_process_lock_config,
421            typing_notice_times: Default::default(),
422            event_handlers: Default::default(),
423            notification_handlers: Default::default(),
424            room_update_channels: Default::default(),
425            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426            // ballast for all observers to catch up.
427            room_updates_sender: broadcast::Sender::new(32),
428            respect_login_well_known,
429            sync_beat: event_listener::Event::new(),
430            event_cache,
431            send_queue_data: send_queue,
432            latest_events,
433            #[cfg(feature = "e2e-encryption")]
434            e2ee: EncryptionData::new(encryption_settings),
435            #[cfg(feature = "e2e-encryption")]
436            verification_state: SharedObservable::new(VerificationState::Unknown),
437            #[cfg(feature = "e2e-encryption")]
438            enable_share_history_on_invite,
439            server_max_upload_size: Mutex::new(OnceCell::new()),
440            #[cfg(feature = "experimental-search")]
441            search_index: search_index_handler,
442            thread_subscription_catchup,
443            task_monitor: TaskMonitor::new(),
444            #[cfg(feature = "e2e-encryption")]
445            duplicate_key_upload_error_sender: broadcast::channel(1).0,
446        };
447
448        #[allow(clippy::let_and_return)]
449        let client = Arc::new(client);
450
451        #[cfg(feature = "e2e-encryption")]
452        client.e2ee.initialize_tasks(&client);
453
454        let _ = client
455            .event_cache
456            .get_or_init(|| async {
457                EventCache::new(&client, client.base_client.event_cache_store().clone())
458            })
459            .await;
460
461        let _ = client
462            .thread_subscription_catchup
463            .get_or_init(|| async {
464                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465            })
466            .await;
467
468        client
469    }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475        write!(fmt, "Client")
476    }
477}
478
479impl Client {
480    /// Create a new [`Client`] that will use the given homeserver.
481    ///
482    /// # Arguments
483    ///
484    /// * `homeserver_url` - The homeserver that the client should connect to.
485    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486        Self::builder().homeserver_url(homeserver_url).build().await
487    }
488
489    /// Returns a subscriber that publishes an event every time the ignore user
490    /// list changes.
491    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492        self.inner.base_client.subscribe_to_ignore_user_list_changes()
493    }
494
495    /// Create a new [`ClientBuilder`].
496    pub fn builder() -> ClientBuilder {
497        ClientBuilder::new()
498    }
499
500    pub(crate) fn base_client(&self) -> &BaseClient {
501        &self.inner.base_client
502    }
503
504    /// The underlying HTTP client.
505    pub fn http_client(&self) -> &reqwest::Client {
506        &self.inner.http_client.inner
507    }
508
509    pub(crate) fn locks(&self) -> &ClientLocks {
510        &self.inner.locks
511    }
512
513    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514        &self.inner.auth_ctx
515    }
516
517    /// The cross-process store lock configuration used by this [`Client`].
518    ///
519    /// The SDK provides cross-process store locks (see
520    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]) when this
521    /// value is [`CrossProcessLockConfig::MultiProcess`]. Its holder name is
522    /// the value used for all cross-process store locks used by this
523    /// `Client`.
524    pub fn cross_process_lock_config(&self) -> &CrossProcessLockConfig {
525        &self.inner.cross_process_lock_config
526    }
527
528    /// Change the homeserver URL used by this client.
529    ///
530    /// # Arguments
531    ///
532    /// * `homeserver_url` - The new URL to use.
533    fn set_homeserver(&self, homeserver_url: Url) {
534        *self.inner.homeserver.write().unwrap() = homeserver_url;
535    }
536
537    /// Change to a different homeserver and re-resolve well-known.
538    #[cfg(feature = "e2e-encryption")]
539    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
540        &self,
541        homeserver_url: Url,
542    ) -> Result<()> {
543        self.set_homeserver(homeserver_url);
544        self.reset_well_known().await?;
545        if let Some(well_known) = self.load_or_fetch_well_known().await? {
546            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
547        }
548        Ok(())
549    }
550
551    /// Retrieves a helper component to access the [`HomeserverCapabilities`]
552    /// supported or disabled by the homeserver.
553    pub fn homeserver_capabilities(&self) -> HomeserverCapabilities {
554        HomeserverCapabilities::new(self.clone())
555    }
556
557    /// Get the server vendor information from the federation API.
558    ///
559    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
560    /// both the server's software name and version.
561    ///
562    /// # Examples
563    ///
564    /// ```no_run
565    /// # use matrix_sdk::Client;
566    /// # use url::Url;
567    /// # async {
568    /// # let homeserver = Url::parse("http://example.com")?;
569    /// let client = Client::new(homeserver).await?;
570    ///
571    /// let server_info = client.server_vendor_info(None).await?;
572    /// println!(
573    ///     "Server: {}, Version: {}",
574    ///     server_info.server_name, server_info.version
575    /// );
576    /// # anyhow::Ok(()) };
577    /// ```
578    #[cfg(feature = "federation-api")]
579    pub async fn server_vendor_info(
580        &self,
581        request_config: Option<RequestConfig>,
582    ) -> HttpResult<ServerVendorInfo> {
583        use ruma::api::federation::discovery::get_server_version;
584
585        let res = self
586            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
587            .await?;
588
589        // Extract server info, using defaults if fields are missing.
590        let server = res.server.unwrap_or_default();
591        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
592        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
593
594        Ok(ServerVendorInfo { server_name: server_name_str, version })
595    }
596
597    /// Get a copy of the default request config.
598    ///
599    /// The default request config is what's used when sending requests if no
600    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
601    /// function with such a parameter.
602    ///
603    /// If the default request config was not customized through
604    /// [`ClientBuilder`] when creating this `Client`, the returned value will
605    /// be equivalent to [`RequestConfig::default()`].
606    pub fn request_config(&self) -> RequestConfig {
607        self.inner.http_client.request_config
608    }
609
610    /// Check whether the client has been activated.
611    ///
612    /// A client is considered active when:
613    ///
614    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
615    ///    is logged in,
616    /// 2. Has loaded cached data from storage,
617    /// 3. If encryption is enabled, it also initialized or restored its
618    ///    `OlmMachine`.
619    pub fn is_active(&self) -> bool {
620        self.inner.base_client.is_active()
621    }
622
623    /// The server used by the client.
624    ///
625    /// See `Self::server` to learn more.
626    pub fn server(&self) -> Option<&Url> {
627        self.inner.server.as_ref()
628    }
629
630    /// The homeserver of the client.
631    pub fn homeserver(&self) -> Url {
632        self.inner.homeserver.read().unwrap().clone()
633    }
634
635    /// Get the sliding sync version.
636    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
637        self.inner.sliding_sync_version.read().unwrap().clone()
638    }
639
640    /// Override the sliding sync version.
641    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
642        let mut lock = self.inner.sliding_sync_version.write().unwrap();
643        *lock = version;
644    }
645
646    /// Get the Matrix user session meta information.
647    ///
648    /// If the client is currently logged in, this will return a
649    /// [`SessionMeta`] object which contains the user ID and device ID.
650    /// Otherwise it returns `None`.
651    pub fn session_meta(&self) -> Option<&SessionMeta> {
652        self.base_client().session_meta()
653    }
654
655    /// Returns a receiver that gets events for each room info update. To watch
656    /// for new events, use `receiver.resubscribe()`.
657    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
658        self.base_client().room_info_notable_update_receiver()
659    }
660
661    /// Performs a search for users.
662    /// The search is performed case-insensitively on user IDs and display names
663    ///
664    /// # Arguments
665    ///
666    /// * `search_term` - The search term for the search
667    /// * `limit` - The maximum number of results to return. Defaults to 10.
668    ///
669    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
670    pub async fn search_users(
671        &self,
672        search_term: &str,
673        limit: u64,
674    ) -> HttpResult<search_users::v3::Response> {
675        let mut request = search_users::v3::Request::new(search_term.to_owned());
676
677        if let Some(limit) = UInt::new(limit) {
678            request.limit = limit;
679        }
680
681        self.send(request).await
682    }
683
684    /// Get the user id of the current owner of the client.
685    pub fn user_id(&self) -> Option<&UserId> {
686        self.session_meta().map(|s| s.user_id.as_ref())
687    }
688
689    /// Get the device ID that identifies the current session.
690    pub fn device_id(&self) -> Option<&DeviceId> {
691        self.session_meta().map(|s| s.device_id.as_ref())
692    }
693
694    /// Get the current access token for this session.
695    ///
696    /// Will be `None` if the client has not been logged in.
697    pub fn access_token(&self) -> Option<String> {
698        self.auth_ctx().access_token()
699    }
700
701    /// Get the current tokens for this session.
702    ///
703    /// To be notified of changes in the session tokens, use
704    /// [`Client::subscribe_to_session_changes()`] or
705    /// [`Client::set_session_callbacks()`].
706    ///
707    /// Returns `None` if the client has not been logged in.
708    pub fn session_tokens(&self) -> Option<SessionTokens> {
709        self.auth_ctx().session_tokens()
710    }
711
712    /// Access the authentication API used to log in this client.
713    ///
714    /// Will be `None` if the client has not been logged in.
715    pub fn auth_api(&self) -> Option<AuthApi> {
716        match self.auth_ctx().auth_data.get()? {
717            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
718            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
719        }
720    }
721
722    /// Get the whole session info of this client.
723    ///
724    /// Will be `None` if the client has not been logged in.
725    ///
726    /// Can be used with [`Client::restore_session`] to restore a previously
727    /// logged-in session.
728    pub fn session(&self) -> Option<AuthSession> {
729        match self.auth_api()? {
730            AuthApi::Matrix(api) => api.session().map(Into::into),
731            AuthApi::OAuth(api) => api.full_session().map(Into::into),
732        }
733    }
734
735    /// Get a reference to the state store.
736    pub fn state_store(&self) -> &DynStateStore {
737        self.base_client().state_store()
738    }
739
740    /// Get a reference to the event cache store.
741    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
742        self.base_client().event_cache_store()
743    }
744
745    /// Get a reference to the media store.
746    pub fn media_store(&self) -> &MediaStoreLock {
747        self.base_client().media_store()
748    }
749
750    /// Access the native Matrix authentication API with this client.
751    pub fn matrix_auth(&self) -> MatrixAuth {
752        MatrixAuth::new(self.clone())
753    }
754
755    /// Get the account of the current owner of the client.
756    pub fn account(&self) -> Account {
757        Account::new(self.clone())
758    }
759
760    /// Get the encryption manager of the client.
761    #[cfg(feature = "e2e-encryption")]
762    pub fn encryption(&self) -> Encryption {
763        Encryption::new(self.clone())
764    }
765
766    /// Get the media manager of the client.
767    pub fn media(&self) -> Media {
768        Media::new(self.clone())
769    }
770
771    /// Get the pusher manager of the client.
772    pub fn pusher(&self) -> Pusher {
773        Pusher::new(self.clone())
774    }
775
776    /// Access the OAuth 2.0 API of the client.
777    pub fn oauth(&self) -> OAuth {
778        OAuth::new(self.clone())
779    }
780
781    /// Register a handler for a specific event type.
782    ///
783    /// The handler is a function or closure with one or more arguments. The
784    /// first argument is the event itself. All additional arguments are
785    /// "context" arguments: They have to implement [`EventHandlerContext`].
786    /// This trait is named that way because most of the types implementing it
787    /// give additional context about an event: The room it was in, its raw form
788    /// and other similar things. As two exceptions to this,
789    /// [`Client`] and [`EventHandlerHandle`] also implement the
790    /// `EventHandlerContext` trait so you don't have to clone your client
791    /// into the event handler manually and a handler can decide to remove
792    /// itself.
793    ///
794    /// Some context arguments are not universally applicable. A context
795    /// argument that isn't available for the given event type will result in
796    /// the event handler being skipped and an error being logged. The following
797    /// context argument types are only available for a subset of event types:
798    ///
799    /// * [`Room`] is only available for room-specific events, i.e. not for
800    ///   events like global account data events or presence events.
801    ///
802    /// You can provide custom context via
803    /// [`add_event_handler_context`](Client::add_event_handler_context) and
804    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
805    /// into the event handler.
806    ///
807    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
808    ///
809    /// # Examples
810    ///
811    /// ```no_run
812    /// use matrix_sdk::{
813    ///     deserialized_responses::EncryptionInfo,
814    ///     event_handler::Ctx,
815    ///     ruma::{
816    ///         events::{
817    ///             macros::EventContent,
818    ///             push_rules::PushRulesEvent,
819    ///             room::{
820    ///                 message::SyncRoomMessageEvent,
821    ///                 topic::SyncRoomTopicEvent,
822    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
823    ///             },
824    ///         },
825    ///         push::Action,
826    ///         Int, MilliSecondsSinceUnixEpoch,
827    ///     },
828    ///     Client, Room,
829    /// };
830    /// use serde::{Deserialize, Serialize};
831    ///
832    /// # async fn example(client: Client) {
833    /// client.add_event_handler(
834    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
835    ///         // Common usage: Room event plus room and client.
836    ///     },
837    /// );
838    /// client.add_event_handler(
839    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
840    ///         async move {
841    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
842    ///             // unencrypted events and events that were decrypted by the SDK.
843    ///         }
844    ///     },
845    /// );
846    /// client.add_event_handler(
847    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
848    ///         async move {
849    ///             // A `Vec<Action>` parameter allows you to know which push actions
850    ///             // are applicable for an event. For example, an event with
851    ///             // `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
852    ///             // should be highlighted in the timeline.
853    ///         }
854    ///     },
855    /// );
856    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
857    ///     // You can omit any or all arguments after the first.
858    /// });
859    ///
860    /// // Registering a temporary event handler:
861    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
862    ///     /* Event handler */
863    /// });
864    /// client.remove_event_handler(handle);
865    ///
866    /// // Registering custom event handler context:
867    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
868    /// struct MyContext {
869    ///     number: usize,
870    /// }
871    /// client.add_event_handler_context(MyContext { number: 5 });
872    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
873    ///     // Use the context
874    /// });
875    ///
876    /// // This will handle membership events in joined rooms. Invites are special, see below.
877    /// client.add_event_handler(
878    ///     |ev: SyncRoomMemberEvent| async move {},
879    /// );
880    ///
881    /// // To handle state events in invited rooms (including invite membership events),
882    /// // `StrippedRoomMemberEvent` should be used.
883    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
884    /// client.add_event_handler(
885    ///     |ev: StrippedRoomMemberEvent| async move {},
886    /// );
887    ///
888    /// // Custom events work exactly the same way, you just need to declare
889    /// // the content struct and use the EventContent derive macro on it.
890    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
891    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
892    /// struct TokenEventContent {
893    ///     token: String,
894    ///     #[serde(rename = "exp")]
895    ///     expires_at: MilliSecondsSinceUnixEpoch,
896    /// }
897    ///
898    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
899    ///     todo!("Display the token");
900    /// });
901    ///
902    /// // Event handler closures can also capture local variables.
903    /// // Make sure they are cheap to clone though, because they will be cloned
904    /// // every time the closure is called.
905    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
906    ///
907    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
908    ///     println!("Calling the handler with identifier {data}");
909    /// });
910    /// # }
911    /// ```
912    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
913    where
914        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
915        H: EventHandler<Ev, Ctx>,
916    {
917        self.add_event_handler_impl(handler, None)
918    }
919
920    /// Register a handler for a specific room, and event type.
921    ///
922    /// This method works the same way as
923    /// [`add_event_handler`][Self::add_event_handler], except that the handler
924    /// will only be called for events in the room with the specified ID. See
925    /// that method for more details on event handler functions.
926    ///
927    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
928    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
929    /// your use case.
930    pub fn add_room_event_handler<Ev, Ctx, H>(
931        &self,
932        room_id: &RoomId,
933        handler: H,
934    ) -> EventHandlerHandle
935    where
936        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
937        H: EventHandler<Ev, Ctx>,
938    {
939        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
940    }
941
942    /// Observe a specific event type.
943    ///
944    /// `Ev` represents the kind of event that will be observed. `Ctx`
945    /// represents the context that will come with the event. It relies on the
946    /// same mechanism as [`Client::add_event_handler`]. The main difference is
947    /// that it returns an [`ObservableEventHandler`] and doesn't require a
948    /// user-defined closure. It is possible to subscribe to the
949    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
950    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
951    /// Ctx)`.
952    ///
953    /// Be careful that only the most recent value can be observed. Subscribers
954    /// are notified when a new value is sent, but there is no guarantee
955    /// that they will see all values.
956    ///
957    /// # Example
958    ///
959    /// Let's see a classical usage:
960    ///
961    /// ```
962    /// use futures_util::StreamExt as _;
963    /// use matrix_sdk::{
964    ///     Client, Room,
965    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
966    /// };
967    ///
968    /// # async fn example(client: Client) -> Option<()> {
969    /// let observer =
970    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
971    ///
972    /// let mut subscriber = observer.subscribe();
973    ///
974    /// let (event, (room, push_actions)) = subscriber.next().await?;
975    /// # Some(())
976    /// # }
977    /// ```
978    ///
979    /// Now let's see how to get several contexts that can be useful for you:
980    ///
981    /// ```
982    /// use matrix_sdk::{
983    ///     Client, Room,
984    ///     deserialized_responses::EncryptionInfo,
985    ///     ruma::{
986    ///         events::room::{
987    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
988    ///         },
989    ///         push::Action,
990    ///     },
991    /// };
992    ///
993    /// # async fn example(client: Client) {
994    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
995    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
996    ///
997    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
998    /// // to distinguish between unencrypted events and events that were decrypted
999    /// // by the SDK.
1000    /// let _ = client
1001    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1002    ///     );
1003    ///
1004    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1005    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(HighlightTweakValue::Yes))`
1006    /// // should be highlighted in the timeline.
1007    /// let _ =
1008    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1009    ///
1010    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1011    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1012    /// # }
1013    /// ```
1014    ///
1015    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1016    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1017    where
1018        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1019        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1020    {
1021        self.observe_room_events_impl(None)
1022    }
1023
1024    /// Observe a specific room, and event type.
1025    ///
1026    /// This method works the same way as [`Client::observe_events`], except
1027    /// that the observability will only be applied for events in the room with
1028    /// the specified ID. See that method for more details.
1029    ///
1030    /// Be careful that only the most recent value can be observed. Subscribers
1031    /// are notified when a new value is sent, but there is no guarantee
1032    /// that they will see all values.
1033    pub fn observe_room_events<Ev, Ctx>(
1034        &self,
1035        room_id: &RoomId,
1036    ) -> ObservableEventHandler<(Ev, Ctx)>
1037    where
1038        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1039        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1040    {
1041        self.observe_room_events_impl(Some(room_id.to_owned()))
1042    }
1043
1044    /// Shared implementation for `Client::observe_events` and
1045    /// `Client::observe_room_events`.
1046    fn observe_room_events_impl<Ev, Ctx>(
1047        &self,
1048        room_id: Option<OwnedRoomId>,
1049    ) -> ObservableEventHandler<(Ev, Ctx)>
1050    where
1051        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1052        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1053    {
1054        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1055        // new value.
1056        let shared_observable = SharedObservable::new(None);
1057
1058        ObservableEventHandler::new(
1059            shared_observable.clone(),
1060            self.event_handler_drop_guard(self.add_event_handler_impl(
1061                move |event: Ev, context: Ctx| {
1062                    shared_observable.set(Some((event, context)));
1063
1064                    ready(())
1065                },
1066                room_id,
1067            )),
1068        )
1069    }
1070
1071    /// Remove the event handler associated with the handle.
1072    ///
1073    /// Note that you **must not** call `remove_event_handler` from the
1074    /// non-async part of an event handler, that is:
1075    ///
1076    /// ```ignore
1077    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1078    ///     // ⚠ this will cause a deadlock ⚠
1079    ///     client.remove_event_handler(handle);
1080    ///
1081    ///     async move {
1082    ///         // removing the event handler here is fine
1083    ///         client.remove_event_handler(handle);
1084    ///     }
1085    /// })
1086    /// ```
1087    ///
1088    /// Note also that handlers that remove themselves will still execute with
1089    /// events received in the same sync cycle.
1090    ///
1091    /// # Arguments
1092    ///
1093    /// `handle` - The [`EventHandlerHandle`] that is returned when
1094    /// registering the event handler with [`Client::add_event_handler`].
1095    ///
1096    /// # Examples
1097    ///
1098    /// ```no_run
1099    /// # use url::Url;
1100    /// # use tokio::sync::mpsc;
1101    /// #
1102    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1103    /// #
1104    /// use matrix_sdk::{
1105    ///     Client, event_handler::EventHandlerHandle,
1106    ///     ruma::events::room::member::SyncRoomMemberEvent,
1107    /// };
1108    /// #
1109    /// # futures_executor::block_on(async {
1110    /// # let client = matrix_sdk::Client::builder()
1111    /// #     .homeserver_url(homeserver)
1112    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1113    /// #     .build()
1114    /// #     .await
1115    /// #     .unwrap();
1116    ///
1117    /// client.add_event_handler(
1118    ///     |ev: SyncRoomMemberEvent,
1119    ///      client: Client,
1120    ///      handle: EventHandlerHandle| async move {
1121    ///         // Common usage: Check arriving Event is the expected one
1122    ///         println!("Expected RoomMemberEvent received!");
1123    ///         client.remove_event_handler(handle);
1124    ///     },
1125    /// );
1126    /// # });
1127    /// ```
1128    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1129        self.inner.event_handlers.remove(handle);
1130    }
1131
1132    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1133    /// the given handle.
1134    ///
1135    /// When the returned value is dropped, the event handler will be removed.
1136    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1137        EventHandlerDropGuard::new(handle, self.clone())
1138    }
1139
1140    /// Add an arbitrary value for use as event handler context.
1141    ///
1142    /// The value can be obtained in an event handler by adding an argument of
1143    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1144    ///
1145    /// If a value of the same type has been added before, it will be
1146    /// overwritten.
1147    ///
1148    /// # Examples
1149    ///
1150    /// ```no_run
1151    /// use matrix_sdk::{
1152    ///     Room, event_handler::Ctx,
1153    ///     ruma::events::room::message::SyncRoomMessageEvent,
1154    /// };
1155    /// # #[derive(Clone)]
1156    /// # struct SomeType;
1157    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1158    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1159    /// # futures_executor::block_on(async {
1160    /// # let client = matrix_sdk::Client::builder()
1161    /// #     .homeserver_url(homeserver)
1162    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1163    /// #     .build()
1164    /// #     .await
1165    /// #     .unwrap();
1166    ///
1167    /// // Handle used to send messages to the UI part of the app
1168    /// let my_gui_handle: SomeType = obtain_gui_handle();
1169    ///
1170    /// client.add_event_handler_context(my_gui_handle.clone());
1171    /// client.add_event_handler(
1172    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1173    ///         async move {
1174    ///             // gui_handle.send(DisplayMessage { message: ev });
1175    ///         }
1176    ///     },
1177    /// );
1178    /// # });
1179    /// ```
1180    pub fn add_event_handler_context<T>(&self, ctx: T)
1181    where
1182        T: Clone + Send + Sync + 'static,
1183    {
1184        self.inner.event_handlers.add_context(ctx);
1185    }
1186
1187    /// Register a handler for a notification.
1188    ///
1189    /// Similar to [`Client::add_event_handler`], but only allows functions
1190    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1191    /// [`Client`] for now.
1192    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1193    where
1194        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1195        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1196    {
1197        self.inner.notification_handlers.write().await.push(Box::new(
1198            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1199        ));
1200
1201        self
1202    }
1203
1204    /// Subscribe to all updates for the room with the given ID.
1205    ///
1206    /// The returned receiver will receive a new message for each sync response
1207    /// that contains updates for that room.
1208    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1209        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1210            btree_map::Entry::Vacant(entry) => {
1211                let (tx, rx) = broadcast::channel(8);
1212                entry.insert(tx);
1213                rx
1214            }
1215            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1216        }
1217    }
1218
1219    /// Subscribe to all updates to all rooms, whenever any has been received in
1220    /// a sync response.
1221    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1222        self.inner.room_updates_sender.subscribe()
1223    }
1224
1225    pub(crate) async fn notification_handlers(
1226        &self,
1227    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1228        self.inner.notification_handlers.read().await
1229    }
1230
1231    /// Get all the rooms the client knows about.
1232    ///
1233    /// This will return the list of joined, invited, and left rooms.
1234    pub fn rooms(&self) -> Vec<Room> {
1235        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1236    }
1237
1238    /// Get all the rooms the client knows about, filtered by room state.
1239    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1240        self.base_client()
1241            .rooms_filtered(filter)
1242            .into_iter()
1243            .map(|room| Room::new(self.clone(), room))
1244            .collect()
1245    }
1246
1247    /// Get a stream of all the rooms, in addition to the existing rooms.
1248    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1249        let (rooms, stream) = self.base_client().rooms_stream();
1250
1251        let map_room = |room| Room::new(self.clone(), room);
1252
1253        (
1254            rooms.into_iter().map(map_room).collect(),
1255            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1256        )
1257    }
1258
1259    /// Returns the joined rooms this client knows about.
1260    pub fn joined_rooms(&self) -> Vec<Room> {
1261        self.rooms_filtered(RoomStateFilter::JOINED)
1262    }
1263
1264    /// Returns the invited rooms this client knows about.
1265    pub fn invited_rooms(&self) -> Vec<Room> {
1266        self.rooms_filtered(RoomStateFilter::INVITED)
1267    }
1268
1269    /// Returns the left rooms this client knows about.
1270    pub fn left_rooms(&self) -> Vec<Room> {
1271        self.rooms_filtered(RoomStateFilter::LEFT)
1272    }
1273
1274    /// Returns the joined space rooms this client knows about.
1275    pub fn joined_space_rooms(&self) -> Vec<Room> {
1276        self.base_client()
1277            .rooms_filtered(RoomStateFilter::JOINED)
1278            .into_iter()
1279            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1280            .collect()
1281    }
1282
1283    /// Get a room with the given room id.
1284    ///
1285    /// # Arguments
1286    ///
1287    /// `room_id` - The unique id of the room that should be fetched.
1288    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1289        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1290    }
1291
1292    /// Gets the preview of a room, whether the current user has joined it or
1293    /// not.
1294    pub async fn get_room_preview(
1295        &self,
1296        room_or_alias_id: &RoomOrAliasId,
1297        via: Vec<OwnedServerName>,
1298    ) -> Result<RoomPreview> {
1299        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1300            Ok(room_id) => room_id.to_owned(),
1301            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1302        };
1303
1304        if let Some(room) = self.get_room(&room_id) {
1305            // The cached data can only be trusted if the room state is joined or
1306            // banned: for invite and knock rooms, no updates will be received
1307            // for the rooms after the invite/knock action took place so we may
1308            // have very out to date data for important fields such as
1309            // `join_rule`. For left rooms, the homeserver should return the latest info.
1310            match room.state() {
1311                RoomState::Joined | RoomState::Banned => {
1312                    return Ok(RoomPreview::from_known_room(&room).await);
1313                }
1314                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1315            }
1316        }
1317
1318        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1319    }
1320
1321    /// Resolve a room alias to a room id and a list of servers which know
1322    /// about it.
1323    ///
1324    /// # Arguments
1325    ///
1326    /// `room_alias` - The room alias to be resolved.
1327    pub async fn resolve_room_alias(
1328        &self,
1329        room_alias: &RoomAliasId,
1330    ) -> HttpResult<get_alias::v3::Response> {
1331        let request = get_alias::v3::Request::new(room_alias.to_owned());
1332        self.send(request).await
1333    }
1334
1335    /// Checks if a room alias is not in use yet.
1336    ///
1337    /// Returns:
1338    /// - `Ok(true)` if the room alias is available.
1339    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1340    ///   status code).
1341    /// - An `Err` otherwise.
1342    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1343        match self.resolve_room_alias(alias).await {
1344            // The room alias was resolved, so it's already in use.
1345            Ok(_) => Ok(false),
1346            Err(error) => {
1347                match error.client_api_error_kind() {
1348                    // The room alias wasn't found, so it's available.
1349                    Some(ErrorKind::NotFound) => Ok(true),
1350                    _ => Err(error),
1351                }
1352            }
1353        }
1354    }
1355
1356    /// Adds a new room alias associated with a room to the room directory.
1357    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1358        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1359        self.send(request).await?;
1360        Ok(())
1361    }
1362
1363    /// Removes a room alias from the room directory.
1364    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1365        let request = delete_alias::v3::Request::new(alias.to_owned());
1366        self.send(request).await?;
1367        Ok(())
1368    }
1369
1370    /// Update the homeserver from the login response well-known if needed.
1371    ///
1372    /// # Arguments
1373    ///
1374    /// * `login_well_known` - The `well_known` field from a successful login
1375    ///   response.
1376    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1377        if self.inner.respect_login_well_known
1378            && let Some(well_known) = login_well_known
1379            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1380        {
1381            self.set_homeserver(homeserver);
1382        }
1383    }
1384
1385    /// Similar to [`Client::restore_session_with`], with
1386    /// [`RoomLoadSettings::default()`].
1387    ///
1388    /// # Panics
1389    ///
1390    /// Panics if a session was already restored or logged in.
1391    #[instrument(skip_all)]
1392    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1393        self.restore_session_with(session, RoomLoadSettings::default()).await
1394    }
1395
1396    /// Restore a session previously logged-in using one of the available
1397    /// authentication APIs. The number of rooms to restore is controlled by
1398    /// [`RoomLoadSettings`].
1399    ///
1400    /// See the documentation of the corresponding authentication API's
1401    /// `restore_session` method for more information.
1402    ///
1403    /// # Panics
1404    ///
1405    /// Panics if a session was already restored or logged in.
1406    #[instrument(skip_all)]
1407    pub async fn restore_session_with(
1408        &self,
1409        session: impl Into<AuthSession>,
1410        room_load_settings: RoomLoadSettings,
1411    ) -> Result<()> {
1412        let session = session.into();
1413        match session {
1414            AuthSession::Matrix(session) => {
1415                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1416            }
1417            AuthSession::OAuth(session) => {
1418                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1419            }
1420        }
1421    }
1422
1423    /// Refresh the access token using the authentication API used to log into
1424    /// this session.
1425    ///
1426    /// See the documentation of the authentication API's `refresh_access_token`
1427    /// method for more information.
1428    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1429        let Some(auth_api) = self.auth_api() else {
1430            return Err(RefreshTokenError::RefreshTokenRequired);
1431        };
1432
1433        match auth_api {
1434            AuthApi::Matrix(api) => {
1435                trace!("Token refresh: Using the homeserver.");
1436                Box::pin(api.refresh_access_token()).await?;
1437            }
1438            AuthApi::OAuth(api) => {
1439                trace!("Token refresh: Using OAuth 2.0.");
1440                Box::pin(api.refresh_access_token()).await?;
1441            }
1442        }
1443
1444        Ok(())
1445    }
1446
1447    /// Log out the current session using the proper authentication API.
1448    ///
1449    /// # Errors
1450    ///
1451    /// Returns an error if the session is not authenticated or if an error
1452    /// occurred while making the request to the server.
1453    pub async fn logout(&self) -> Result<(), Error> {
1454        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1455        match auth_api {
1456            AuthApi::Matrix(matrix_auth) => {
1457                matrix_auth.logout().await?;
1458                Ok(())
1459            }
1460            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1461        }
1462    }
1463
1464    /// Get or upload a sync filter.
1465    ///
1466    /// This method will either get a filter ID from the store or upload the
1467    /// filter definition to the homeserver and return the new filter ID.
1468    ///
1469    /// # Arguments
1470    ///
1471    /// * `filter_name` - The unique name of the filter, this name will be used
1472    /// locally to store and identify the filter ID returned by the server.
1473    ///
1474    /// * `definition` - The filter definition that should be uploaded to the
1475    /// server if no filter ID can be found in the store.
1476    ///
1477    /// # Examples
1478    ///
1479    /// ```no_run
1480    /// # use matrix_sdk::{
1481    /// #    Client, config::SyncSettings,
1482    /// #    ruma::api::client::{
1483    /// #        filter::{
1484    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1485    /// #        },
1486    /// #        sync::sync_events::v3::Filter,
1487    /// #    }
1488    /// # };
1489    /// # use url::Url;
1490    /// # async {
1491    /// # let homeserver = Url::parse("http://example.com").unwrap();
1492    /// # let client = Client::new(homeserver).await.unwrap();
1493    /// let mut filter = FilterDefinition::default();
1494    ///
1495    /// // Let's enable member lazy loading.
1496    /// filter.room.state.lazy_load_options =
1497    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1498    ///
1499    /// let filter_id = client
1500    ///     .get_or_upload_filter("sync", filter)
1501    ///     .await
1502    ///     .unwrap();
1503    ///
1504    /// let sync_settings = SyncSettings::new()
1505    ///     .filter(Filter::FilterId(filter_id));
1506    ///
1507    /// let response = client.sync_once(sync_settings).await.unwrap();
1508    /// # };
1509    #[instrument(skip(self, definition))]
1510    pub async fn get_or_upload_filter(
1511        &self,
1512        filter_name: &str,
1513        definition: FilterDefinition,
1514    ) -> Result<String> {
1515        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1516            debug!("Found filter locally");
1517            Ok(filter)
1518        } else {
1519            debug!("Didn't find filter locally");
1520            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1521            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1522            let response = self.send(request).await?;
1523
1524            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1525
1526            Ok(response.filter_id)
1527        }
1528    }
1529
1530    /// Prepare to join a room by ID, by getting the current details about it
1531    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1532        let room = self.get_room(room_id)?;
1533
1534        let inviter = match room.invite_details().await {
1535            Ok(details) => details.inviter,
1536            Err(Error::WrongRoomState(_)) => None,
1537            Err(e) => {
1538                warn!("Error fetching invite details for room: {e:?}");
1539                None
1540            }
1541        };
1542
1543        Some(PreJoinRoomInfo { inviter })
1544    }
1545
1546    /// Finish joining a room.
1547    ///
1548    /// If the room was an invite that should be marked as a DM, will include it
1549    /// in the DM event after creating the joined room.
1550    ///
1551    /// If encrypted history sharing is enabled, will check to see if we have a
1552    /// key bundle, and import it if so.
1553    ///
1554    /// # Arguments
1555    ///
1556    /// * `room_id` - The `RoomId` of the room that was joined.
1557    /// * `pre_join_room_info` - Information about the room before we joined.
1558    async fn finish_join_room(
1559        &self,
1560        room_id: &RoomId,
1561        pre_join_room_info: Option<PreJoinRoomInfo>,
1562    ) -> Result<Room> {
1563        info!(?room_id, ?pre_join_room_info, "Completing room join");
1564        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1565            room.state() == RoomState::Invited
1566                && room.is_direct().await.unwrap_or_else(|e| {
1567                    warn!(%room_id, "is_direct() failed: {e}");
1568                    false
1569                })
1570        } else {
1571            false
1572        };
1573
1574        let base_room = self
1575            .base_client()
1576            .room_joined(
1577                room_id,
1578                pre_join_room_info
1579                    .as_ref()
1580                    .and_then(|info| info.inviter.as_ref())
1581                    .map(|i| i.user_id().to_owned()),
1582            )
1583            .await?;
1584        let room = Room::new(self.clone(), base_room);
1585
1586        if mark_as_dm {
1587            room.set_is_direct(true).await?;
1588        }
1589
1590        // If we joined following an invite, check if we had previously received a key
1591        // bundle from the inviter, and import it if so.
1592        //
1593        // It's important that we only do this once `BaseClient::room_joined` has
1594        // completed: see the notes on `BundleReceiverTask::handle_bundle` on avoiding a
1595        // race.
1596        #[cfg(feature = "e2e-encryption")]
1597        if self.inner.enable_share_history_on_invite
1598            && let Some(inviter) =
1599                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1600        {
1601            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1602                .await?;
1603        }
1604
1605        // Suppress "unused variable" and "unused field" lints
1606        #[cfg(not(feature = "e2e-encryption"))]
1607        let _ = pre_join_room_info.map(|i| i.inviter);
1608
1609        Ok(room)
1610    }
1611
1612    /// Join a room by `RoomId`.
1613    ///
1614    /// Returns the `Room` in the joined state.
1615    ///
1616    /// # Arguments
1617    ///
1618    /// * `room_id` - The `RoomId` of the room to be joined.
1619    #[instrument(skip(self))]
1620    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1621        // See who invited us to this room, if anyone. Note we have to do this before
1622        // making the `/join` request, otherwise we could race against the sync.
1623        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1624
1625        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1626        let response = self.send(request).await?;
1627        self.finish_join_room(&response.room_id, pre_join_info).await
1628    }
1629
1630    /// Join a room by `RoomOrAliasId`.
1631    ///
1632    /// Returns the `Room` in the joined state.
1633    ///
1634    /// # Arguments
1635    ///
1636    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1637    ///   alias looks like `#name:example.com`.
1638    /// * `server_names` - The server names to be used for resolving the alias,
1639    ///   if needs be.
1640    #[instrument(skip(self))]
1641    pub async fn join_room_by_id_or_alias(
1642        &self,
1643        alias: &RoomOrAliasId,
1644        server_names: &[OwnedServerName],
1645    ) -> Result<Room> {
1646        let pre_join_info = {
1647            match alias.try_into() {
1648                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1649                Err(_) => {
1650                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1651                    // responding to an invitation to the room, and therefore don't need to handle
1652                    // things that happen as a result of invites.
1653                    None
1654                }
1655            }
1656        };
1657        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1658            via: server_names.to_owned(),
1659        });
1660        let response = self.send(request).await?;
1661        self.finish_join_room(&response.room_id, pre_join_info).await
1662    }
1663
1664    /// Search the homeserver's directory of public rooms.
1665    ///
1666    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1667    /// a `get_public_rooms::Response`.
1668    ///
1669    /// # Arguments
1670    ///
1671    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1672    ///
1673    /// * `since` - Pagination token from a previous request.
1674    ///
1675    /// * `server` - The name of the server, if `None` the requested server is
1676    ///   used.
1677    ///
1678    /// # Examples
1679    /// ```no_run
1680    /// use matrix_sdk::Client;
1681    /// # use url::Url;
1682    /// # let homeserver = Url::parse("http://example.com").unwrap();
1683    /// # let limit = Some(10);
1684    /// # let since = Some("since token");
1685    /// # let server = Some("servername.com".try_into().unwrap());
1686    /// # async {
1687    /// let mut client = Client::new(homeserver).await.unwrap();
1688    ///
1689    /// client.public_rooms(limit, since, server).await;
1690    /// # };
1691    /// ```
1692    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1693    pub async fn public_rooms(
1694        &self,
1695        limit: Option<u32>,
1696        since: Option<&str>,
1697        server: Option<&ServerName>,
1698    ) -> HttpResult<get_public_rooms::v3::Response> {
1699        let limit = limit.map(UInt::from);
1700
1701        let request = assign!(get_public_rooms::v3::Request::new(), {
1702            limit,
1703            since: since.map(ToOwned::to_owned),
1704            server: server.map(ToOwned::to_owned),
1705        });
1706        self.send(request).await
1707    }
1708
1709    /// Create a room with the given parameters.
1710    ///
1711    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1712    /// created room.
1713    ///
1714    /// If you want to create a direct message with one specific user, you can
1715    /// use [`create_dm`][Self::create_dm], which is more convenient than
1716    /// assembling the [`create_room::v3::Request`] yourself.
1717    ///
1718    /// If the `is_direct` field of the request is set to `true` and at least
1719    /// one user is invited, the room will be automatically added to the direct
1720    /// rooms in the account data.
1721    ///
1722    /// # Examples
1723    ///
1724    /// ```no_run
1725    /// use matrix_sdk::{
1726    ///     Client,
1727    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1728    /// };
1729    /// # use url::Url;
1730    /// #
1731    /// # async {
1732    /// # let homeserver = Url::parse("http://example.com").unwrap();
1733    /// let request = CreateRoomRequest::new();
1734    /// let client = Client::new(homeserver).await.unwrap();
1735    /// assert!(client.create_room(request).await.is_ok());
1736    /// # };
1737    /// ```
1738    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1739        let invite = request.invite.clone();
1740        let is_direct_room = request.is_direct;
1741        let response = self.send(request).await?;
1742
1743        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1744
1745        let joined_room = Room::new(self.clone(), base_room);
1746
1747        if is_direct_room
1748            && !invite.is_empty()
1749            && let Err(error) =
1750                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1751        {
1752            // FIXME: Retry in the background
1753            error!("Failed to mark room as DM: {error}");
1754        }
1755
1756        Ok(joined_room)
1757    }
1758
1759    /// Create a DM room.
1760    ///
1761    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1762    /// given user being invited, the room marked `is_direct` and both the
1763    /// creator and invitee getting the default maximum power level.
1764    ///
1765    /// If the `e2e-encryption` feature is enabled, the room will also be
1766    /// encrypted.
1767    ///
1768    /// # Arguments
1769    ///
1770    /// * `user_id` - The ID of the user to create a DM for.
1771    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1772        #[cfg(feature = "e2e-encryption")]
1773        let initial_state = vec![
1774            InitialStateEvent::with_empty_state_key(
1775                RoomEncryptionEventContent::with_recommended_defaults(),
1776            )
1777            .to_raw_any(),
1778        ];
1779
1780        #[cfg(not(feature = "e2e-encryption"))]
1781        let initial_state = vec![];
1782
1783        let request = assign!(create_room::v3::Request::new(), {
1784            invite: vec![user_id.to_owned()],
1785            is_direct: true,
1786            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1787            initial_state,
1788        });
1789
1790        self.create_room(request).await
1791    }
1792
1793    /// Get the existing DM room with the given user, if any.
1794    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1795        let rooms = self.joined_rooms();
1796
1797        // Find the room we share with the `user_id` and only with `user_id`
1798        let room = rooms.into_iter().find(|r| {
1799            let targets = r.direct_targets();
1800            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1801        });
1802
1803        trace!(?user_id, ?room, "Found DM room with user");
1804        room
1805    }
1806
1807    /// Search the homeserver's directory for public rooms with a filter.
1808    ///
1809    /// # Arguments
1810    ///
1811    /// * `room_search` - The easiest way to create this request is using the
1812    ///   `get_public_rooms_filtered::Request` itself.
1813    ///
1814    /// # Examples
1815    ///
1816    /// ```no_run
1817    /// # use url::Url;
1818    /// # use matrix_sdk::Client;
1819    /// # async {
1820    /// # let homeserver = Url::parse("http://example.com")?;
1821    /// use matrix_sdk::ruma::{
1822    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1823    /// };
1824    /// # let mut client = Client::new(homeserver).await?;
1825    ///
1826    /// let mut filter = Filter::new();
1827    /// filter.generic_search_term = Some("rust".to_owned());
1828    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1829    /// request.filter = filter;
1830    ///
1831    /// let response = client.public_rooms_filtered(request).await?;
1832    ///
1833    /// for room in response.chunk {
1834    ///     println!("Found room {room:?}");
1835    /// }
1836    /// # anyhow::Ok(()) };
1837    /// ```
1838    pub async fn public_rooms_filtered(
1839        &self,
1840        request: get_public_rooms_filtered::v3::Request,
1841    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1842        self.send(request).await
1843    }
1844
1845    /// Send an arbitrary request to the server, without updating client state.
1846    ///
1847    /// **Warning:** Because this method *does not* update the client state, it
1848    /// is important to make sure that you account for this yourself, and
1849    /// use wrapper methods where available.  This method should *only* be
1850    /// used if a wrapper method for the endpoint you'd like to use is not
1851    /// available.
1852    ///
1853    /// # Arguments
1854    ///
1855    /// * `request` - A filled out and valid request for the endpoint to be hit
1856    ///
1857    /// * `timeout` - An optional request timeout setting, this overrides the
1858    ///   default request setting if one was set.
1859    ///
1860    /// # Examples
1861    ///
1862    /// ```no_run
1863    /// # use matrix_sdk::{Client, config::SyncSettings};
1864    /// # use url::Url;
1865    /// # async {
1866    /// # let homeserver = Url::parse("http://localhost:8080")?;
1867    /// # let mut client = Client::new(homeserver).await?;
1868    /// use matrix_sdk::ruma::{api::client::profile, owned_user_id};
1869    ///
1870    /// // First construct the request you want to make
1871    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1872    /// // for all available Endpoints
1873    /// let user_id = owned_user_id!("@example:localhost");
1874    /// let request = profile::get_profile::v3::Request::new(user_id);
1875    ///
1876    /// // Start the request using Client::send()
1877    /// let response = client.send(request).await?;
1878    ///
1879    /// // Check the corresponding Response struct to find out what types are
1880    /// // returned
1881    /// # anyhow::Ok(()) };
1882    /// ```
1883    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1884    where
1885        Request: OutgoingRequest + Clone + Debug,
1886        Request::Authentication: SupportedAuthScheme,
1887        Request::PathBuilder: SupportedPathBuilder,
1888        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1889        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1890    {
1891        SendRequest {
1892            client: self.clone(),
1893            request,
1894            config: None,
1895            send_progress: Default::default(),
1896        }
1897    }
1898
1899    pub(crate) async fn send_inner<Request>(
1900        &self,
1901        request: Request,
1902        config: Option<RequestConfig>,
1903        send_progress: SharedObservable<TransmissionProgress>,
1904    ) -> HttpResult<Request::IncomingResponse>
1905    where
1906        Request: OutgoingRequest + Debug,
1907        Request::Authentication: SupportedAuthScheme,
1908        Request::PathBuilder: SupportedPathBuilder,
1909        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1910        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1911    {
1912        let homeserver = self.homeserver().to_string();
1913        let access_token = self.access_token();
1914        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1915
1916        let path_builder_input =
1917            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1918
1919        let result = self
1920            .inner
1921            .http_client
1922            .send(
1923                request,
1924                config,
1925                homeserver,
1926                access_token.as_deref(),
1927                path_builder_input,
1928                send_progress,
1929            )
1930            .await;
1931
1932        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1933            result.as_ref().map_err(HttpError::client_api_error_kind)
1934            && let Some(access_token) = &access_token
1935        {
1936            // Mark the access token as expired.
1937            self.auth_ctx().set_access_token_expired(access_token);
1938        }
1939
1940        result
1941    }
1942
1943    fn broadcast_unknown_token(&self, unknown_token_data: &UnknownTokenErrorData) {
1944        _ = self
1945            .inner
1946            .auth_ctx
1947            .session_change_sender
1948            .send(SessionChange::UnknownToken(unknown_token_data.clone()));
1949    }
1950
1951    /// Fetches server versions from network; no caching.
1952    pub async fn fetch_server_versions(
1953        &self,
1954        request_config: Option<RequestConfig>,
1955    ) -> HttpResult<get_supported_versions::Response> {
1956        // Since this was called by the user, try to refresh the access token if
1957        // necessary.
1958        self.fetch_server_versions_inner(false, request_config).await
1959    }
1960
1961    /// Fetches server versions from network; no caching.
1962    ///
1963    /// If the access token is expired and `failsafe` is `false`, this will
1964    /// attempt to refresh the access token, otherwise this will try to make an
1965    /// unauthenticated request instead.
1966    pub(crate) async fn fetch_server_versions_inner(
1967        &self,
1968        failsafe: bool,
1969        request_config: Option<RequestConfig>,
1970    ) -> HttpResult<get_supported_versions::Response> {
1971        if !failsafe {
1972            // `Client::send()` handles refreshing access tokens.
1973            return self
1974                .send(get_supported_versions::Request::new())
1975                .with_request_config(request_config)
1976                .await;
1977        }
1978
1979        let homeserver = self.homeserver().to_string();
1980
1981        // If we have a fresh access token, try with it first.
1982        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
1983            && self.auth_ctx().has_valid_access_token()
1984            && let Some(access_token) = self.access_token()
1985        {
1986            let result = self
1987                .inner
1988                .http_client
1989                .send(
1990                    get_supported_versions::Request::new(),
1991                    request_config,
1992                    homeserver.clone(),
1993                    Some(&access_token),
1994                    (),
1995                    Default::default(),
1996                )
1997                .await;
1998
1999            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2000                result.as_ref().map_err(HttpError::client_api_error_kind)
2001            {
2002                // If the access token is actually expired, mark it as expired and fallback to
2003                // the unauthenticated request below.
2004                self.auth_ctx().set_access_token_expired(&access_token);
2005            } else {
2006                // If the request succeeded or it's an other error, just stop now.
2007                return result;
2008            }
2009        }
2010
2011        // Try without authentication.
2012        self.inner
2013            .http_client
2014            .send(
2015                get_supported_versions::Request::new(),
2016                request_config,
2017                homeserver.clone(),
2018                None,
2019                (),
2020                Default::default(),
2021            )
2022            .await
2023    }
2024
2025    /// Fetches client well_known from network; no caching.
2026    ///
2027    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2028    ///    well-known contents.
2029    /// 2. If it's not, we try extracting the server name from the
2030    ///    [`Client::user_id`] and building the server URL from it.
2031    /// 3. If we couldn't get the well-known contents with either the explicit
2032    ///    server name or the implicit extracted one, we try the homeserver URL
2033    ///    as a last resort.
2034    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2035        let homeserver = self.homeserver();
2036        let scheme = homeserver.scheme();
2037
2038        // Use the server name, either an explicit one or an implicit one taken from
2039        // the user id: sometimes we'll have only the homeserver url available and no
2040        // server name, but the server name can be extracted from the current user id.
2041        let server_url = self
2042            .server()
2043            .map(|server| server.to_string())
2044            // If the server name wasn't available, extract it from the user id and build a URL:
2045            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2046            // will be the same for the public server url, lacking a better candidate.
2047            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2048
2049        // If the server name is available, first try using it
2050        let response = if let Some(server_url) = server_url {
2051            // First try using the server name
2052            self.fetch_client_well_known_with_url(server_url).await
2053        } else {
2054            None
2055        };
2056
2057        // If we didn't get a well-known value yet, try with the homeserver url instead:
2058        if response.is_none() {
2059            // Sometimes people configure their well-known directly on the homeserver so use
2060            // this as a fallback when the server name is unknown.
2061            warn!(
2062                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2063            );
2064            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2065        } else {
2066            response
2067        }
2068    }
2069
2070    async fn fetch_client_well_known_with_url(
2071        &self,
2072        url: String,
2073    ) -> Option<discover_homeserver::Response> {
2074        let well_known = self
2075            .inner
2076            .http_client
2077            .send(
2078                discover_homeserver::Request::new(),
2079                Some(RequestConfig::short_retry()),
2080                url,
2081                None,
2082                (),
2083                Default::default(),
2084            )
2085            .await;
2086
2087        match well_known {
2088            Ok(well_known) => Some(well_known),
2089            Err(http_error) => {
2090                // It is perfectly valid to not have a well-known file.
2091                // Maybe we should check for a specific error code to be sure?
2092                warn!("Failed to fetch client well-known: {http_error}");
2093                None
2094            }
2095        }
2096    }
2097
2098    /// Load supported versions from storage, or fetch them from network and
2099    /// cache them.
2100    ///
2101    /// If `failsafe` is true, this will try to minimize side effects to avoid
2102    /// possible deadlocks.
2103    async fn load_or_fetch_supported_versions(
2104        &self,
2105        failsafe: bool,
2106    ) -> HttpResult<SupportedVersionsResponse> {
2107        match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2108            Ok(Some(stored)) => {
2109                if let Some(supported_versions) =
2110                    stored.into_supported_versions().and_then(|value| value.into_data())
2111                {
2112                    return Ok(supported_versions);
2113                }
2114            }
2115            Ok(None) => {
2116                // fallthrough: cache is empty
2117            }
2118            Err(err) => {
2119                warn!("error when loading cached supported versions: {err}");
2120                // fallthrough to network.
2121            }
2122        }
2123
2124        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2125        let supported_versions = SupportedVersionsResponse {
2126            versions: server_versions.versions,
2127            unstable_features: server_versions.unstable_features,
2128        };
2129
2130        // Only attempt to cache the result in storage if the request was authenticated.
2131        if self.auth_ctx().has_valid_access_token()
2132            && let Err(err) = self
2133                .state_store()
2134                .set_kv_data(
2135                    StateStoreDataKey::SupportedVersions,
2136                    StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2137                        supported_versions.clone(),
2138                    )),
2139                )
2140                .await
2141        {
2142            warn!("error when caching supported versions: {err}");
2143        }
2144
2145        Ok(supported_versions)
2146    }
2147
2148    pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2149        let supported_versions = &self.inner.caches.supported_versions;
2150
2151        if let CachedValue::Cached(val) = &*supported_versions.read().await {
2152            Some(val.clone())
2153        } else {
2154            None
2155        }
2156    }
2157
2158    /// Get the Matrix versions and features supported by the homeserver by
2159    /// fetching them from the server or the cache.
2160    ///
2161    /// This is equivalent to calling both [`Client::server_versions()`] and
2162    /// [`Client::unstable_features()`]. To always fetch the result from the
2163    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2164    /// and then `.as_supported_versions()` on the response.
2165    ///
2166    /// # Examples
2167    ///
2168    /// ```no_run
2169    /// use ruma::api::{FeatureFlag, MatrixVersion};
2170    /// # use matrix_sdk::{Client, config::SyncSettings};
2171    /// # use url::Url;
2172    /// # async {
2173    /// # let homeserver = Url::parse("http://localhost:8080")?;
2174    /// # let mut client = Client::new(homeserver).await?;
2175    ///
2176    /// let supported = client.supported_versions().await?;
2177    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2178    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2179    ///
2180    /// let msc_x_feature = FeatureFlag::from("msc_x");
2181    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2182    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2183    /// # anyhow::Ok(()) };
2184    /// ```
2185    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2186        self.supported_versions_inner(false).await
2187    }
2188
2189    /// Get the Matrix versions and features supported by the homeserver by
2190    /// fetching them from the server or the cache.
2191    ///
2192    /// If `failsafe` is true, this will try to minimize side effects to avoid
2193    /// possible deadlocks.
2194    pub(crate) async fn supported_versions_inner(
2195        &self,
2196        failsafe: bool,
2197    ) -> HttpResult<SupportedVersions> {
2198        let cached_supported_versions = &self.inner.caches.supported_versions;
2199        if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2200            return Ok(val.clone());
2201        }
2202
2203        let mut guarded_supported_versions = cached_supported_versions.write().await;
2204        if let CachedValue::Cached(val) = &*guarded_supported_versions {
2205            return Ok(val.clone());
2206        }
2207
2208        let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2209
2210        // Fill both unstable features and server versions at once.
2211        let mut supported_versions = supported.supported_versions();
2212        if supported_versions.versions.is_empty() {
2213            supported_versions.versions = [MatrixVersion::V1_0].into();
2214        }
2215
2216        // Only cache the result if the request was authenticated.
2217        if self.auth_ctx().has_valid_access_token() {
2218            *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2219        }
2220
2221        Ok(supported_versions)
2222    }
2223
2224    /// Get the Matrix versions and features supported by the homeserver by
2225    /// fetching them from the cache.
2226    ///
2227    /// For a version of this function that fetches the supported versions and
2228    /// features from the homeserver if the [`SupportedVersions`] aren't
2229    /// found in the cache, take a look at the [`Client::server_versions()`]
2230    /// method.
2231    ///
2232    /// # Examples
2233    ///
2234    /// ```no_run
2235    /// use ruma::api::{FeatureFlag, MatrixVersion};
2236    /// # use matrix_sdk::{Client, config::SyncSettings};
2237    /// # use url::Url;
2238    /// # async {
2239    /// # let homeserver = Url::parse("http://localhost:8080")?;
2240    /// # let mut client = Client::new(homeserver).await?;
2241    ///
2242    /// let supported =
2243    ///     if let Some(supported) = client.supported_versions_cached().await? {
2244    ///         supported
2245    ///     } else {
2246    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2247    ///     };
2248    ///
2249    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2250    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2251    ///
2252    /// let msc_x_feature = FeatureFlag::from("msc_x");
2253    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2254    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2255    /// # anyhow::Ok(()) };
2256    /// ```
2257    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2258        if let Some(cached) = self.get_cached_supported_versions().await {
2259            Ok(Some(cached))
2260        } else if let Some(stored) =
2261            self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2262        {
2263            if let Some(supported) =
2264                stored.into_supported_versions().and_then(|value| value.into_data())
2265            {
2266                Ok(Some(supported.supported_versions()))
2267            } else {
2268                Ok(None)
2269            }
2270        } else {
2271            Ok(None)
2272        }
2273    }
2274
2275    /// Get the Matrix versions supported by the homeserver by fetching them
2276    /// from the server or the cache.
2277    ///
2278    /// # Examples
2279    ///
2280    /// ```no_run
2281    /// use ruma::api::MatrixVersion;
2282    /// # use matrix_sdk::{Client, config::SyncSettings};
2283    /// # use url::Url;
2284    /// # async {
2285    /// # let homeserver = Url::parse("http://localhost:8080")?;
2286    /// # let mut client = Client::new(homeserver).await?;
2287    ///
2288    /// let server_versions = client.server_versions().await?;
2289    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2290    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2291    /// # anyhow::Ok(()) };
2292    /// ```
2293    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2294        Ok(self.supported_versions().await?.versions)
2295    }
2296
2297    /// Get the unstable features supported by the homeserver by fetching them
2298    /// from the server or the cache.
2299    ///
2300    /// # Examples
2301    ///
2302    /// ```no_run
2303    /// use matrix_sdk::ruma::api::FeatureFlag;
2304    /// # use matrix_sdk::{Client, config::SyncSettings};
2305    /// # use url::Url;
2306    /// # async {
2307    /// # let homeserver = Url::parse("http://localhost:8080")?;
2308    /// # let mut client = Client::new(homeserver).await?;
2309    ///
2310    /// let msc_x_feature = FeatureFlag::from("msc_x");
2311    /// let unstable_features = client.unstable_features().await?;
2312    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2313    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2314    /// # anyhow::Ok(()) };
2315    /// ```
2316    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2317        Ok(self.supported_versions().await?.features)
2318    }
2319
2320    /// Empty the supported versions and unstable features cache.
2321    ///
2322    /// Since the SDK caches the supported versions, it's possible to have a
2323    /// stale entry in the cache. This functions makes it possible to force
2324    /// reset it.
2325    pub async fn reset_supported_versions(&self) -> Result<()> {
2326        // Empty the in-memory caches.
2327        self.inner.caches.supported_versions.write().await.take();
2328
2329        // Empty the store cache.
2330        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2331    }
2332
2333    /// Load well-known from storage, or fetch it from network and cache it.
2334    async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2335        match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2336            Ok(Some(stored)) => {
2337                if let Some(well_known) =
2338                    stored.into_well_known().and_then(|value| value.into_data())
2339                {
2340                    return Ok(well_known);
2341                }
2342            }
2343            Ok(None) => {
2344                // fallthrough: cache is empty
2345            }
2346            Err(err) => {
2347                warn!("error when loading cached well-known: {err}");
2348                // fallthrough to network.
2349            }
2350        }
2351
2352        let well_known = self.fetch_client_well_known().await.map(Into::into);
2353
2354        // Attempt to cache the result in storage.
2355        if let Err(err) = self
2356            .state_store()
2357            .set_kv_data(
2358                StateStoreDataKey::WellKnown,
2359                StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2360            )
2361            .await
2362        {
2363            warn!("error when caching well-known: {err}");
2364        }
2365
2366        Ok(well_known)
2367    }
2368
2369    async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2370        let well_known = &self.inner.caches.well_known;
2371        if let CachedValue::Cached(val) = &*well_known.read().await {
2372            return Ok(val.clone());
2373        }
2374
2375        let mut guarded_well_known = well_known.write().await;
2376        if let CachedValue::Cached(val) = &*guarded_well_known {
2377            return Ok(val.clone());
2378        }
2379
2380        let well_known = self.load_or_fetch_well_known().await?;
2381
2382        *guarded_well_known = CachedValue::Cached(well_known.clone());
2383
2384        Ok(well_known)
2385    }
2386
2387    /// Get information about the homeserver's advertised RTC foci by fetching
2388    /// the well-known file from the server or the cache.
2389    ///
2390    /// # Examples
2391    /// ```no_run
2392    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2393    /// # use url::Url;
2394    /// # async {
2395    /// # let homeserver = Url::parse("http://localhost:8080")?;
2396    /// # let mut client = Client::new(homeserver).await?;
2397    /// let rtc_foci = client.rtc_foci().await?;
2398    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2399    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2400    ///     _ => None,
2401    /// });
2402    /// if let Some(info) = default_livekit_focus_info {
2403    ///     println!("Default LiveKit service URL: {}", info.service_url);
2404    /// }
2405    /// # anyhow::Ok(()) };
2406    /// ```
2407    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2408        let well_known = self.get_or_load_and_cache_well_known().await?;
2409
2410        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2411    }
2412
2413    /// Empty the well-known cache.
2414    ///
2415    /// Since the SDK caches the well-known, it's possible to have a stale entry
2416    /// in the cache. This functions makes it possible to force reset it.
2417    pub async fn reset_well_known(&self) -> Result<()> {
2418        // Empty the in-memory caches.
2419        self.inner.caches.well_known.write().await.take();
2420
2421        // Empty the store cache.
2422        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2423    }
2424
2425    /// Check whether MSC 4028 is enabled on the homeserver.
2426    ///
2427    /// # Examples
2428    ///
2429    /// ```no_run
2430    /// # use matrix_sdk::{Client, config::SyncSettings};
2431    /// # use url::Url;
2432    /// # async {
2433    /// # let homeserver = Url::parse("http://localhost:8080")?;
2434    /// # let mut client = Client::new(homeserver).await?;
2435    /// let msc4028_enabled =
2436    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2437    /// # anyhow::Ok(()) };
2438    /// ```
2439    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2440        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2441    }
2442
2443    /// Get information of all our own devices.
2444    ///
2445    /// # Examples
2446    ///
2447    /// ```no_run
2448    /// # use matrix_sdk::{Client, config::SyncSettings};
2449    /// # use url::Url;
2450    /// # async {
2451    /// # let homeserver = Url::parse("http://localhost:8080")?;
2452    /// # let mut client = Client::new(homeserver).await?;
2453    /// let response = client.devices().await?;
2454    ///
2455    /// for device in response.devices {
2456    ///     println!(
2457    ///         "Device: {} {}",
2458    ///         device.device_id,
2459    ///         device.display_name.as_deref().unwrap_or("")
2460    ///     );
2461    /// }
2462    /// # anyhow::Ok(()) };
2463    /// ```
2464    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2465        let request = get_devices::v3::Request::new();
2466
2467        self.send(request).await
2468    }
2469
2470    /// Delete the given devices from the server.
2471    ///
2472    /// # Arguments
2473    ///
2474    /// * `devices` - The list of devices that should be deleted from the
2475    ///   server.
2476    ///
2477    /// * `auth_data` - This request requires user interactive auth, the first
2478    ///   request needs to set this to `None` and will always fail with an
2479    ///   `UiaaResponse`. The response will contain information for the
2480    ///   interactive auth and the same request needs to be made but this time
2481    ///   with some `auth_data` provided.
2482    ///
2483    /// ```no_run
2484    /// # use matrix_sdk::{
2485    /// #    ruma::{api::client::uiaa, owned_device_id},
2486    /// #    Client, Error, config::SyncSettings,
2487    /// # };
2488    /// # use serde_json::json;
2489    /// # use url::Url;
2490    /// # use std::collections::BTreeMap;
2491    /// # async {
2492    /// # let homeserver = Url::parse("http://localhost:8080")?;
2493    /// # let mut client = Client::new(homeserver).await?;
2494    /// let devices = &[owned_device_id!("DEVICEID")];
2495    ///
2496    /// if let Err(e) = client.delete_devices(devices, None).await {
2497    ///     if let Some(info) = e.as_uiaa_response() {
2498    ///         let mut password = uiaa::Password::new(
2499    ///             uiaa::UserIdentifier::Matrix(uiaa::MatrixUserIdentifier::new("example".to_owned())),
2500    ///             "wordpass".to_owned(),
2501    ///         );
2502    ///         password.session = info.session.clone();
2503    ///
2504    ///         client
2505    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2506    ///             .await?;
2507    ///     }
2508    /// }
2509    /// # anyhow::Ok(()) };
2510    pub async fn delete_devices(
2511        &self,
2512        devices: &[OwnedDeviceId],
2513        auth_data: Option<uiaa::AuthData>,
2514    ) -> HttpResult<delete_devices::v3::Response> {
2515        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2516        request.auth = auth_data;
2517
2518        self.send(request).await
2519    }
2520
2521    /// Change the display name of a device owned by the current user.
2522    ///
2523    /// Returns a `update_device::Response` which specifies the result
2524    /// of the operation.
2525    ///
2526    /// # Arguments
2527    ///
2528    /// * `device_id` - The ID of the device to change the display name of.
2529    /// * `display_name` - The new display name to set.
2530    pub async fn rename_device(
2531        &self,
2532        device_id: &DeviceId,
2533        display_name: &str,
2534    ) -> HttpResult<update_device::v3::Response> {
2535        let mut request = update_device::v3::Request::new(device_id.to_owned());
2536        request.display_name = Some(display_name.to_owned());
2537
2538        self.send(request).await
2539    }
2540
2541    /// Check whether a device with a specific ID exists on the server.
2542    ///
2543    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2544    /// with 404 and the underlying error otherwise.
2545    ///
2546    /// # Arguments
2547    ///
2548    /// * `device_id` - The ID of the device to query.
2549    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2550        let request = device::get_device::v3::Request::new(device_id);
2551        match self.send(request).await {
2552            Ok(_) => Ok(true),
2553            Err(err) => {
2554                if let Some(error) = err.as_client_api_error()
2555                    && error.status_code == 404
2556                {
2557                    Ok(false)
2558                } else {
2559                    Err(err.into())
2560                }
2561            }
2562        }
2563    }
2564
2565    /// Synchronize the client's state with the latest state on the server.
2566    ///
2567    /// ## Syncing Events
2568    ///
2569    /// Messages or any other type of event need to be periodically fetched from
2570    /// the server, this is achieved by sending a `/sync` request to the server.
2571    ///
2572    /// The first sync is sent out without a [`token`]. The response of the
2573    /// first sync will contain a [`next_batch`] field which should then be
2574    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2575    /// don't receive the same events multiple times.
2576    ///
2577    /// ## Long Polling
2578    ///
2579    /// A sync should in the usual case always be in flight. The
2580    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2581    /// long the server will wait for new events before it will respond.
2582    /// The server will respond immediately if some new events arrive before the
2583    /// timeout has expired. If no changes arrive and the timeout expires an
2584    /// empty sync response will be sent to the client.
2585    ///
2586    /// This method of sending a request that may not receive a response
2587    /// immediately is called long polling.
2588    ///
2589    /// ## Filtering Events
2590    ///
2591    /// The number or type of messages and events that the client should receive
2592    /// from the server can be altered using a [`Filter`].
2593    ///
2594    /// Filters can be non-trivial and, since they will be sent with every sync
2595    /// request, they may take up a bunch of unnecessary bandwidth.
2596    ///
2597    /// Luckily filters can be uploaded to the server and reused using an unique
2598    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2599    /// method.
2600    ///
2601    /// # Arguments
2602    ///
2603    /// * `sync_settings` - Settings for the sync call, this allows us to set
2604    /// various options to configure the sync:
2605    ///     * [`filter`] - To configure which events we receive and which get
2606    ///       [filtered] by the server
2607    ///     * [`timeout`] - To configure our [long polling] setup.
2608    ///     * [`token`] - To tell the server which events we already received
2609    ///       and where we wish to continue syncing.
2610    ///     * [`full_state`] - To tell the server that we wish to receive all
2611    ///       state events, regardless of our configured [`token`].
2612    ///     * [`set_presence`] - To tell the server to set the presence and to
2613    ///       which state.
2614    ///
2615    /// # Examples
2616    ///
2617    /// ```no_run
2618    /// # use url::Url;
2619    /// # async {
2620    /// # let homeserver = Url::parse("http://localhost:8080")?;
2621    /// # let username = "";
2622    /// # let password = "";
2623    /// use matrix_sdk::{
2624    ///     Client, config::SyncSettings,
2625    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2626    /// };
2627    ///
2628    /// let client = Client::new(homeserver).await?;
2629    /// client.matrix_auth().login_username(username, password).send().await?;
2630    ///
2631    /// // Sync once so we receive the client state and old messages.
2632    /// client.sync_once(SyncSettings::default()).await?;
2633    ///
2634    /// // Register our handler so we start responding once we receive a new
2635    /// // event.
2636    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2637    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2638    /// });
2639    ///
2640    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2641    /// // from our `sync_once()` call automatically.
2642    /// client.sync(SyncSettings::default()).await;
2643    /// # anyhow::Ok(()) };
2644    /// ```
2645    ///
2646    /// [`sync`]: #method.sync
2647    /// [`SyncSettings`]: crate::config::SyncSettings
2648    /// [`token`]: crate::config::SyncSettings#method.token
2649    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2650    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2651    /// [`set_presence`]: ruma::presence::PresenceState
2652    /// [`filter`]: crate::config::SyncSettings#method.filter
2653    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2654    /// [`next_batch`]: SyncResponse#structfield.next_batch
2655    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2656    /// [long polling]: #long-polling
2657    /// [filtered]: #filtering-events
2658    #[instrument(skip(self))]
2659    pub async fn sync_once(
2660        &self,
2661        sync_settings: crate::config::SyncSettings,
2662    ) -> Result<SyncResponse> {
2663        // The sync might not return for quite a while due to the timeout.
2664        // We'll see if there's anything crypto related to send out before we
2665        // sync, i.e. if we closed our client after a sync but before the
2666        // crypto requests were sent out.
2667        //
2668        // This will mostly be a no-op.
2669        #[cfg(feature = "e2e-encryption")]
2670        if let Err(e) = self.send_outgoing_requests().await {
2671            error!(error = ?e, "Error while sending outgoing E2EE requests");
2672        }
2673
2674        let token = match sync_settings.token {
2675            SyncToken::Specific(token) => Some(token),
2676            SyncToken::NoToken => None,
2677            SyncToken::ReusePrevious => self.sync_token().await,
2678        };
2679
2680        let request = assign!(sync_events::v3::Request::new(), {
2681            filter: sync_settings.filter.map(|f| *f),
2682            since: token,
2683            full_state: sync_settings.full_state,
2684            set_presence: sync_settings.set_presence,
2685            timeout: sync_settings.timeout,
2686            use_state_after: true,
2687        });
2688        let mut request_config = self.request_config();
2689        if let Some(timeout) = sync_settings.timeout {
2690            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2691            request_config.timeout = Some(base_timeout + timeout);
2692        }
2693
2694        let response = self.send(request).with_request_config(request_config).await?;
2695        let next_batch = response.next_batch.clone();
2696        let response = self.process_sync(response).await?;
2697
2698        #[cfg(feature = "e2e-encryption")]
2699        if let Err(e) = self.send_outgoing_requests().await {
2700            error!(error = ?e, "Error while sending outgoing E2EE requests");
2701        }
2702
2703        self.inner.sync_beat.notify(usize::MAX);
2704
2705        Ok(SyncResponse::new(next_batch, response))
2706    }
2707
2708    /// Repeatedly synchronize the client state with the server.
2709    ///
2710    /// This method will only return on error, if cancellation is needed
2711    /// the method should be wrapped in a cancelable task or the
2712    /// [`Client::sync_with_callback`] method can be used or
2713    /// [`Client::sync_with_result_callback`] if you want to handle error
2714    /// cases in the loop, too.
2715    ///
2716    /// This method will internally call [`Client::sync_once`] in a loop.
2717    ///
2718    /// This method can be used with the [`Client::add_event_handler`]
2719    /// method to react to individual events. If you instead wish to handle
2720    /// events in a bulk manner the [`Client::sync_with_callback`],
2721    /// [`Client::sync_with_result_callback`] and
2722    /// [`Client::sync_stream`] methods can be used instead. Those methods
2723    /// repeatedly return the whole sync response.
2724    ///
2725    /// # Arguments
2726    ///
2727    /// * `sync_settings` - Settings for the sync call. *Note* that those
2728    ///   settings will be only used for the first sync call. See the argument
2729    ///   docs for [`Client::sync_once`] for more info.
2730    ///
2731    /// # Return
2732    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2733    /// up to the user of the API to check the error and decide whether the sync
2734    /// should continue or not.
2735    ///
2736    /// # Examples
2737    ///
2738    /// ```no_run
2739    /// # use url::Url;
2740    /// # async {
2741    /// # let homeserver = Url::parse("http://localhost:8080")?;
2742    /// # let username = "";
2743    /// # let password = "";
2744    /// use matrix_sdk::{
2745    ///     Client, config::SyncSettings,
2746    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2747    /// };
2748    ///
2749    /// let client = Client::new(homeserver).await?;
2750    /// client.matrix_auth().login_username(&username, &password).send().await?;
2751    ///
2752    /// // Register our handler so we start responding once we receive a new
2753    /// // event.
2754    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2755    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2756    /// });
2757    ///
2758    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2759    /// // automatically.
2760    /// client.sync(SyncSettings::default()).await?;
2761    /// # anyhow::Ok(()) };
2762    /// ```
2763    ///
2764    /// [argument docs]: #method.sync_once
2765    /// [`sync_with_callback`]: #method.sync_with_callback
2766    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2767        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2768    }
2769
2770    /// Repeatedly call sync to synchronize the client state with the server.
2771    ///
2772    /// # Arguments
2773    ///
2774    /// * `sync_settings` - Settings for the sync call. *Note* that those
2775    ///   settings will be only used for the first sync call. See the argument
2776    ///   docs for [`Client::sync_once`] for more info.
2777    ///
2778    /// * `callback` - A callback that will be called every time a successful
2779    ///   response has been fetched from the server. The callback must return a
2780    ///   boolean which signalizes if the method should stop syncing. If the
2781    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2782    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2783    ///
2784    /// # Return
2785    /// The sync runs until an error occurs or the
2786    /// callback indicates that the Loop should stop. If the callback asked for
2787    /// a regular stop, the result will be `Ok(())` otherwise the
2788    /// `Err(Error)` is returned.
2789    ///
2790    /// # Examples
2791    ///
2792    /// The following example demonstrates how to sync forever while sending all
2793    /// the interesting events through a mpsc channel to another thread e.g. a
2794    /// UI thread.
2795    ///
2796    /// ```no_run
2797    /// # use std::time::Duration;
2798    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2799    /// # use url::Url;
2800    /// # async {
2801    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2802    /// # let mut client = Client::new(homeserver).await.unwrap();
2803    ///
2804    /// use tokio::sync::mpsc::channel;
2805    ///
2806    /// let (tx, rx) = channel(100);
2807    ///
2808    /// let sync_channel = &tx;
2809    /// let sync_settings = SyncSettings::new()
2810    ///     .timeout(Duration::from_secs(30));
2811    ///
2812    /// client
2813    ///     .sync_with_callback(sync_settings, |response| async move {
2814    ///         let channel = sync_channel;
2815    ///         for (room_id, room) in response.rooms.joined {
2816    ///             for event in room.timeline.events {
2817    ///                 channel.send(event).await.unwrap();
2818    ///             }
2819    ///         }
2820    ///
2821    ///         LoopCtrl::Continue
2822    ///     })
2823    ///     .await;
2824    /// };
2825    /// ```
2826    #[instrument(skip_all)]
2827    pub async fn sync_with_callback<C>(
2828        &self,
2829        sync_settings: crate::config::SyncSettings,
2830        callback: impl Fn(SyncResponse) -> C,
2831    ) -> Result<(), Error>
2832    where
2833        C: Future<Output = LoopCtrl>,
2834    {
2835        self.sync_with_result_callback(sync_settings, |result| async {
2836            Ok(callback(result?).await)
2837        })
2838        .await
2839    }
2840
2841    /// Repeatedly call sync to synchronize the client state with the server.
2842    ///
2843    /// # Arguments
2844    ///
2845    /// * `sync_settings` - Settings for the sync call. *Note* that those
2846    ///   settings will be only used for the first sync call. See the argument
2847    ///   docs for [`Client::sync_once`] for more info.
2848    ///
2849    /// * `callback` - A callback that will be called every time after a
2850    ///   response has been received, failure or not. The callback returns a
2851    ///   `Result<LoopCtrl, Error>`, too. When returning
2852    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2853    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2854    ///   function returns `Ok(())`. In case the callback can't handle the
2855    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2856    ///   which results in the sync ending and the `Err(Error)` being returned.
2857    ///
2858    /// # Return
2859    /// The sync runs until an error occurs that the callback can't handle or
2860    /// the callback indicates that the Loop should stop. If the callback
2861    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2862    /// `Err(Error)` is returned.
2863    ///
2864    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2865    /// this, and are handled first without sending the result to the
2866    /// callback. Only after they have exceeded is the `Result` handed to
2867    /// the callback.
2868    ///
2869    /// # Examples
2870    ///
2871    /// The following example demonstrates how to sync forever while sending all
2872    /// the interesting events through a mpsc channel to another thread e.g. a
2873    /// UI thread.
2874    ///
2875    /// ```no_run
2876    /// # use std::time::Duration;
2877    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2878    /// # use url::Url;
2879    /// # async {
2880    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2881    /// # let mut client = Client::new(homeserver).await.unwrap();
2882    /// #
2883    /// use tokio::sync::mpsc::channel;
2884    ///
2885    /// let (tx, rx) = channel(100);
2886    ///
2887    /// let sync_channel = &tx;
2888    /// let sync_settings = SyncSettings::new()
2889    ///     .timeout(Duration::from_secs(30));
2890    ///
2891    /// client
2892    ///     .sync_with_result_callback(sync_settings, |response| async move {
2893    ///         let channel = sync_channel;
2894    ///         let sync_response = response?;
2895    ///         for (room_id, room) in sync_response.rooms.joined {
2896    ///              for event in room.timeline.events {
2897    ///                  channel.send(event).await.unwrap();
2898    ///               }
2899    ///         }
2900    ///
2901    ///         Ok(LoopCtrl::Continue)
2902    ///     })
2903    ///     .await;
2904    /// };
2905    /// ```
2906    #[instrument(skip(self, callback))]
2907    pub async fn sync_with_result_callback<C>(
2908        &self,
2909        sync_settings: crate::config::SyncSettings,
2910        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2911    ) -> Result<(), Error>
2912    where
2913        C: Future<Output = Result<LoopCtrl, Error>>,
2914    {
2915        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2916
2917        while let Some(result) = sync_stream.next().await {
2918            trace!("Running callback");
2919            if callback(result).await? == LoopCtrl::Break {
2920                trace!("Callback told us to stop");
2921                break;
2922            }
2923            trace!("Done running callback");
2924        }
2925
2926        Ok(())
2927    }
2928
2929    //// Repeatedly synchronize the client state with the server.
2930    ///
2931    /// This method will internally call [`Client::sync_once`] in a loop and is
2932    /// equivalent to the [`Client::sync`] method but the responses are provided
2933    /// as an async stream.
2934    ///
2935    /// # Arguments
2936    ///
2937    /// * `sync_settings` - Settings for the sync call. *Note* that those
2938    ///   settings will be only used for the first sync call. See the argument
2939    ///   docs for [`Client::sync_once`] for more info.
2940    ///
2941    /// # Examples
2942    ///
2943    /// ```no_run
2944    /// # use url::Url;
2945    /// # async {
2946    /// # let homeserver = Url::parse("http://localhost:8080")?;
2947    /// # let username = "";
2948    /// # let password = "";
2949    /// use futures_util::StreamExt;
2950    /// use matrix_sdk::{Client, config::SyncSettings};
2951    ///
2952    /// let client = Client::new(homeserver).await?;
2953    /// client.matrix_auth().login_username(&username, &password).send().await?;
2954    ///
2955    /// let mut sync_stream =
2956    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2957    ///
2958    /// while let Some(Ok(response)) = sync_stream.next().await {
2959    ///     for room in response.rooms.joined.values() {
2960    ///         for e in &room.timeline.events {
2961    ///             if let Ok(event) = e.raw().deserialize() {
2962    ///                 println!("Received event {:?}", event);
2963    ///             }
2964    ///         }
2965    ///     }
2966    /// }
2967    ///
2968    /// # anyhow::Ok(()) };
2969    /// ```
2970    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2971    #[instrument(skip(self))]
2972    pub async fn sync_stream(
2973        &self,
2974        mut sync_settings: crate::config::SyncSettings,
2975    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2976        let mut is_first_sync = true;
2977        let mut timeout = None;
2978        let mut last_sync_time: Option<Instant> = None;
2979
2980        let parent_span = Span::current();
2981
2982        async_stream::stream!({
2983            loop {
2984                trace!("Syncing");
2985
2986                if sync_settings.ignore_timeout_on_first_sync {
2987                    if is_first_sync {
2988                        timeout = sync_settings.timeout.take();
2989                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2990                        sync_settings.timeout = timeout.take();
2991                    }
2992
2993                    is_first_sync = false;
2994                }
2995
2996                yield self
2997                    .sync_loop_helper(&mut sync_settings)
2998                    .instrument(parent_span.clone())
2999                    .await;
3000
3001                Client::delay_sync(&mut last_sync_time).await
3002            }
3003        })
3004    }
3005
3006    /// Get the current, if any, sync token of the client.
3007    /// This will be None if the client didn't sync at least once.
3008    pub(crate) async fn sync_token(&self) -> Option<String> {
3009        self.inner.base_client.sync_token().await
3010    }
3011
3012    /// Gets information about the owner of a given access token.
3013    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3014        let request = whoami::v3::Request::new();
3015        self.send(request).await
3016    }
3017
3018    /// Subscribes a new receiver to client SessionChange broadcasts.
3019    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3020        let broadcast = &self.auth_ctx().session_change_sender;
3021        broadcast.subscribe()
3022    }
3023
3024    /// Sets the save/restore session callbacks.
3025    ///
3026    /// This is another mechanism to get synchronous updates to session tokens,
3027    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3028    pub fn set_session_callbacks(
3029        &self,
3030        reload_session_callback: Box<ReloadSessionCallback>,
3031        save_session_callback: Box<SaveSessionCallback>,
3032    ) -> Result<()> {
3033        self.inner
3034            .auth_ctx
3035            .reload_session_callback
3036            .set(reload_session_callback)
3037            .map_err(|_| Error::MultipleSessionCallbacks)?;
3038
3039        self.inner
3040            .auth_ctx
3041            .save_session_callback
3042            .set(save_session_callback)
3043            .map_err(|_| Error::MultipleSessionCallbacks)?;
3044
3045        Ok(())
3046    }
3047
3048    /// Get the notification settings of the current owner of the client.
3049    pub async fn notification_settings(&self) -> NotificationSettings {
3050        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3051        NotificationSettings::new(self.clone(), ruleset)
3052    }
3053
3054    /// Create a new specialized `Client` that can process notifications.
3055    ///
3056    /// See [`CrossProcessLock::new`] to learn more about
3057    /// `cross_process_lock_config`.
3058    ///
3059    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3060    pub async fn notification_client(
3061        &self,
3062        cross_process_lock_config: CrossProcessLockConfig,
3063    ) -> Result<Client> {
3064        let client = Client {
3065            inner: ClientInner::new(
3066                self.inner.auth_ctx.clone(),
3067                self.server().cloned(),
3068                self.homeserver(),
3069                self.sliding_sync_version(),
3070                self.inner.http_client.clone(),
3071                self.inner
3072                    .base_client
3073                    .clone_with_in_memory_state_store(cross_process_lock_config.clone(), false)
3074                    .await?,
3075                self.inner.caches.supported_versions.read().await.clone(),
3076                self.inner.caches.well_known.read().await.clone(),
3077                self.inner.respect_login_well_known,
3078                self.inner.event_cache.clone(),
3079                self.inner.send_queue_data.clone(),
3080                self.inner.latest_events.clone(),
3081                #[cfg(feature = "e2e-encryption")]
3082                self.inner.e2ee.encryption_settings,
3083                #[cfg(feature = "e2e-encryption")]
3084                self.inner.enable_share_history_on_invite,
3085                cross_process_lock_config,
3086                #[cfg(feature = "experimental-search")]
3087                self.inner.search_index.clone(),
3088                self.inner.thread_subscription_catchup.clone(),
3089            )
3090            .await,
3091        };
3092
3093        Ok(client)
3094    }
3095
3096    /// The [`EventCache`] instance for this [`Client`].
3097    pub fn event_cache(&self) -> &EventCache {
3098        // SAFETY: always initialized in the `Client` ctor.
3099        self.inner.event_cache.get().unwrap()
3100    }
3101
3102    /// The [`LatestEvents`] instance for this [`Client`].
3103    pub async fn latest_events(&self) -> &LatestEvents {
3104        self.inner
3105            .latest_events
3106            .get_or_init(|| async {
3107                LatestEvents::new(
3108                    WeakClient::from_client(self),
3109                    self.event_cache().clone(),
3110                    SendQueue::new(self.clone()),
3111                    self.room_info_notable_update_receiver(),
3112                )
3113            })
3114            .await
3115    }
3116
3117    /// Waits until an at least partially synced room is received, and returns
3118    /// it.
3119    ///
3120    /// **Note: this function will loop endlessly until either it finds the room
3121    /// or an externally set timeout happens.**
3122    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3123        loop {
3124            if let Some(room) = self.get_room(room_id) {
3125                if room.is_state_partially_or_fully_synced() {
3126                    debug!("Found just created room!");
3127                    return room;
3128                }
3129                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3130            } else {
3131                debug!("Room wasn't found, waiting for sync beat to try again");
3132            }
3133            self.inner.sync_beat.listen().await;
3134        }
3135    }
3136
3137    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3138    /// join it.
3139    pub async fn knock(
3140        &self,
3141        room_id_or_alias: OwnedRoomOrAliasId,
3142        reason: Option<String>,
3143        server_names: Vec<OwnedServerName>,
3144    ) -> Result<Room> {
3145        let request =
3146            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3147        let response = self.send(request).await?;
3148        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3149        Ok(Room::new(self.clone(), base_room))
3150    }
3151
3152    /// Checks whether the provided `user_id` belongs to an ignored user.
3153    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3154        self.base_client().is_user_ignored(user_id).await
3155    }
3156
3157    /// Gets the `max_upload_size` value from the homeserver, getting either a
3158    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3159    /// missing.
3160    ///
3161    /// Check the spec for more info:
3162    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3163    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3164        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3165        if let Some(data) = max_upload_size_lock.get() {
3166            return Ok(data.to_owned());
3167        }
3168
3169        // Use the authenticated endpoint when the server supports it.
3170        let supported_versions = self.supported_versions().await?;
3171        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3172            .is_supported(&supported_versions);
3173
3174        let upload_size = if use_auth {
3175            self.send(authenticated_media::get_media_config::v1::Request::default())
3176                .await?
3177                .upload_size
3178        } else {
3179            #[allow(deprecated)]
3180            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3181        };
3182
3183        match max_upload_size_lock.set(upload_size) {
3184            Ok(_) => Ok(upload_size),
3185            Err(error) => {
3186                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3187            }
3188        }
3189    }
3190
3191    /// The settings to use for decrypting events.
3192    #[cfg(feature = "e2e-encryption")]
3193    pub fn decryption_settings(&self) -> &DecryptionSettings {
3194        &self.base_client().decryption_settings
3195    }
3196
3197    /// Returns the [`SearchIndex`] for this [`Client`].
3198    #[cfg(feature = "experimental-search")]
3199    pub fn search_index(&self) -> &SearchIndex {
3200        &self.inner.search_index
3201    }
3202
3203    /// Whether the client is configured to take thread subscriptions (MSC4306
3204    /// and MSC4308) into account, and the server enabled the experimental
3205    /// feature flag for it.
3206    ///
3207    /// This may cause filtering out of thread subscriptions, and loading the
3208    /// thread subscriptions via the sliding sync extension, when the room
3209    /// list service is being used.
3210    ///
3211    /// This is async and fallible as it may use the network to retrieve the
3212    /// server supported features, if they aren't cached already.
3213    pub async fn enabled_thread_subscriptions(&self) -> Result<bool> {
3214        // Check if the client is configured to support thread subscriptions first.
3215        match self.base_client().threading_support {
3216            ThreadingSupport::Enabled { with_subscriptions: false }
3217            | ThreadingSupport::Disabled => return Ok(false),
3218            ThreadingSupport::Enabled { with_subscriptions: true } => {}
3219        }
3220
3221        // Now, let's check that the server supports it.
3222        let server_enabled = self
3223            .supported_versions()
3224            .await?
3225            .features
3226            .contains(&FeatureFlag::from("org.matrix.msc4306"));
3227
3228        Ok(server_enabled)
3229    }
3230
3231    /// Fetch thread subscriptions changes between `from` and up to `to`.
3232    ///
3233    /// The `limit` optional parameter can be used to limit the number of
3234    /// entries in a response. It can also be overridden by the server, if
3235    /// it's deemed too large.
3236    pub async fn fetch_thread_subscriptions(
3237        &self,
3238        from: Option<String>,
3239        to: Option<String>,
3240        limit: Option<UInt>,
3241    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3242        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3243            from,
3244            to,
3245            limit,
3246        });
3247        Ok(self.send(request).await?)
3248    }
3249
3250    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3251        self.inner.thread_subscription_catchup.get().unwrap()
3252    }
3253
3254    /// Perform database optimizations if any are available, i.e. vacuuming in
3255    /// SQLite.
3256    ///
3257    /// **Warning:** this was added to check if SQLite fragmentation was the
3258    /// source of performance issues, **DO NOT use in production**.
3259    #[doc(hidden)]
3260    pub async fn optimize_stores(&self) -> Result<()> {
3261        trace!("Optimizing state store...");
3262        self.state_store().optimize().await?;
3263
3264        trace!("Optimizing event cache store...");
3265        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3266            clean_lock.optimize().await?;
3267        }
3268
3269        trace!("Optimizing media store...");
3270        self.media_store().lock().await?.optimize().await?;
3271
3272        Ok(())
3273    }
3274
3275    /// Returns the sizes of the existing stores, if known.
3276    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3277        #[cfg(feature = "e2e-encryption")]
3278        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3279            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3280        {
3281            Some(store_size)
3282        } else {
3283            None
3284        };
3285        #[cfg(not(feature = "e2e-encryption"))]
3286        let crypto_store_size = None;
3287
3288        let state_store_size = self.state_store().get_size().await.ok().flatten();
3289
3290        let event_cache_store_size = if let Some(clean_lock) =
3291            self.event_cache_store().lock().await?.as_clean()
3292            && let Ok(Some(store_size)) = clean_lock.get_size().await
3293        {
3294            Some(store_size)
3295        } else {
3296            None
3297        };
3298
3299        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3300
3301        Ok(StoreSizes {
3302            crypto_store: crypto_store_size,
3303            state_store: state_store_size,
3304            event_cache_store: event_cache_store_size,
3305            media_store: media_store_size,
3306        })
3307    }
3308
3309    /// Get a reference to the client's task monitor, for spawning background
3310    /// tasks.
3311    pub fn task_monitor(&self) -> &TaskMonitor {
3312        &self.inner.task_monitor
3313    }
3314
3315    /// Add a subscriber for duplicate key upload error notifications triggered
3316    /// by requests to /keys/upload.
3317    #[cfg(feature = "e2e-encryption")]
3318    pub fn subscribe_to_duplicate_key_upload_errors(
3319        &self,
3320    ) -> broadcast::Receiver<Option<DuplicateOneTimeKeyErrorMessage>> {
3321        self.inner.duplicate_key_upload_error_sender.subscribe()
3322    }
3323
3324    /// Check the record of whether we are waiting for an [MSC4268] key bundle
3325    /// for the given room.
3326    ///
3327    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
3328    #[cfg(feature = "e2e-encryption")]
3329    pub async fn get_pending_key_bundle_details_for_room(
3330        &self,
3331        room_id: &RoomId,
3332    ) -> Result<Option<RoomPendingKeyBundleDetails>> {
3333        Ok(self.base_client().get_pending_key_bundle_details_for_room(room_id).await?)
3334    }
3335}
3336
3337/// Contains the disk size of the different stores, if known. It won't be
3338/// available for in-memory stores.
3339#[derive(Debug, Clone)]
3340pub struct StoreSizes {
3341    /// The size of the CryptoStore.
3342    pub crypto_store: Option<usize>,
3343    /// The size of the StateStore.
3344    pub state_store: Option<usize>,
3345    /// The size of the EventCacheStore.
3346    pub event_cache_store: Option<usize>,
3347    /// The size of the MediaStore.
3348    pub media_store: Option<usize>,
3349}
3350
3351#[cfg(any(feature = "testing", test))]
3352impl Client {
3353    /// Test helper to mark users as tracked by the crypto layer.
3354    #[cfg(feature = "e2e-encryption")]
3355    pub async fn update_tracked_users_for_testing(
3356        &self,
3357        user_ids: impl IntoIterator<Item = &UserId>,
3358    ) {
3359        let olm = self.olm_machine().await;
3360        let olm = olm.as_ref().unwrap();
3361        olm.update_tracked_users(user_ids).await.unwrap();
3362    }
3363}
3364
3365/// A weak reference to the inner client, useful when trying to get a handle
3366/// on the owning client.
3367#[derive(Clone, Debug)]
3368pub(crate) struct WeakClient {
3369    client: Weak<ClientInner>,
3370}
3371
3372impl WeakClient {
3373    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3374    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3375        Self { client: Arc::downgrade(client) }
3376    }
3377
3378    /// Construct a [`WeakClient`] from a [`Client`].
3379    pub fn from_client(client: &Client) -> Self {
3380        Self::from_inner(&client.inner)
3381    }
3382
3383    /// Attempts to get a [`Client`] from this [`WeakClient`].
3384    pub fn get(&self) -> Option<Client> {
3385        self.client.upgrade().map(|inner| Client { inner })
3386    }
3387
3388    /// Gets the number of strong (`Arc`) pointers still pointing to this
3389    /// client.
3390    #[allow(dead_code)]
3391    pub fn strong_count(&self) -> usize {
3392        self.client.strong_count()
3393    }
3394}
3395
3396/// Information about the state of a room before we joined it.
3397#[derive(Debug, Clone, Default)]
3398struct PreJoinRoomInfo {
3399    /// The user who invited us to the room, if any.
3400    pub inviter: Option<RoomMember>,
3401}
3402
3403// The http mocking library is not supported for wasm32
3404#[cfg(all(test, not(target_family = "wasm")))]
3405pub(crate) mod tests {
3406    use std::{sync::Arc, time::Duration};
3407
3408    use assert_matches::assert_matches;
3409    use assert_matches2::assert_let;
3410    use eyeball::SharedObservable;
3411    use futures_util::{FutureExt, StreamExt, pin_mut};
3412    use js_int::{UInt, uint};
3413    use matrix_sdk_base::{
3414        RoomState,
3415        store::{MemoryStore, StoreConfig},
3416    };
3417    use matrix_sdk_test::{
3418        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
3419        event_factory::EventFactory,
3420    };
3421    #[cfg(target_family = "wasm")]
3422    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3423
3424    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
3425    use ruma::{
3426        RoomId, ServerName, UserId,
3427        api::{
3428            FeatureFlag, MatrixVersion,
3429            client::{
3430                discovery::discover_homeserver::RtcFocusInfo,
3431                room::create_room::v3::Request as CreateRoomRequest,
3432            },
3433        },
3434        assign,
3435        events::{
3436            ignored_user_list::IgnoredUserListEventContent,
3437            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3438        },
3439        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3440    };
3441    use serde_json::json;
3442    use stream_assert::{assert_next_matches, assert_pending};
3443    use tokio::{
3444        spawn,
3445        time::{sleep, timeout},
3446    };
3447    use url::Url;
3448
3449    use super::Client;
3450    use crate::{
3451        Error, TransmissionProgress,
3452        client::{WeakClient, futures::SendMediaUploadRequest},
3453        config::{RequestConfig, SyncSettings},
3454        futures::SendRequest,
3455        media::MediaError,
3456        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3457    };
3458
3459    #[async_test]
3460    async fn test_account_data() {
3461        let server = MatrixMockServer::new().await;
3462        let client = server.client_builder().build().await;
3463
3464        let f = EventFactory::new();
3465        server
3466            .mock_sync()
3467            .ok_and_run(&client, |builder| {
3468                builder.add_global_account_data(
3469                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3470                );
3471            })
3472            .await;
3473
3474        let content = client
3475            .account()
3476            .account_data::<IgnoredUserListEventContent>()
3477            .await
3478            .unwrap()
3479            .unwrap()
3480            .deserialize()
3481            .unwrap();
3482
3483        assert_eq!(content.ignored_users.len(), 1);
3484    }
3485
3486    #[async_test]
3487    async fn test_successful_discovery() {
3488        // Imagine this is `matrix.org`.
3489        let server = MatrixMockServer::new().await;
3490        let server_url = server.uri();
3491
3492        // Imagine this is `matrix-client.matrix.org`.
3493        let homeserver = MatrixMockServer::new().await;
3494        let homeserver_url = homeserver.uri();
3495
3496        // Imagine Alice has the user ID `@alice:matrix.org`.
3497        let domain = server_url.strip_prefix("http://").unwrap();
3498        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3499
3500        // The `.well-known` is on the server (e.g. `matrix.org`).
3501        server
3502            .mock_well_known()
3503            .ok_with_homeserver_url(&homeserver_url)
3504            .mock_once()
3505            .named("well-known")
3506            .mount()
3507            .await;
3508
3509        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3510        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3511
3512        let client = Client::builder()
3513            .insecure_server_name_no_tls(alice.server_name())
3514            .build()
3515            .await
3516            .unwrap();
3517
3518        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3519        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3520        client.server_versions().await.unwrap();
3521    }
3522
3523    #[async_test]
3524    async fn test_discovery_broken_server() {
3525        let server = MatrixMockServer::new().await;
3526        let server_url = server.uri();
3527        let domain = server_url.strip_prefix("http://").unwrap();
3528        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3529
3530        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3531
3532        assert!(
3533            Client::builder()
3534                .insecure_server_name_no_tls(alice.server_name())
3535                .build()
3536                .await
3537                .is_err(),
3538            "Creating a client from a user ID should fail when the .well-known request fails."
3539        );
3540    }
3541
3542    #[async_test]
3543    async fn test_room_creation() {
3544        let server = MatrixMockServer::new().await;
3545        let client = server.client_builder().build().await;
3546
3547        let f = EventFactory::new().sender(user_id!("@example:localhost"));
3548        server
3549            .mock_sync()
3550            .ok_and_run(&client, |builder| {
3551                builder.add_joined_room(
3552                    JoinedRoomBuilder::default()
3553                        .add_state_event(
3554                            f.member(user_id!("@example:localhost")).display_name("example"),
3555                        )
3556                        .add_state_event(f.default_power_levels()),
3557                );
3558            })
3559            .await;
3560
3561        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3562        assert_eq!(room.state(), RoomState::Joined);
3563    }
3564
3565    #[async_test]
3566    async fn test_retry_limit_http_requests() {
3567        let server = MatrixMockServer::new().await;
3568        let client = server
3569            .client_builder()
3570            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(4)))
3571            .build()
3572            .await;
3573
3574        assert!(client.request_config().retry_limit.unwrap() == 4);
3575
3576        server.mock_who_am_i().error500().expect(4).mount().await;
3577
3578        client.whoami().await.unwrap_err();
3579    }
3580
3581    #[async_test]
3582    async fn test_retry_timeout_http_requests() {
3583        // Keep this timeout small so that the test doesn't take long
3584        let retry_timeout = Duration::from_secs(5);
3585        let server = MatrixMockServer::new().await;
3586        let client = server
3587            .client_builder()
3588            .on_builder(|builder| {
3589                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3590            })
3591            .build()
3592            .await;
3593
3594        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3595
3596        server.mock_login().error500().expect(2..).mount().await;
3597
3598        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3599    }
3600
3601    #[async_test]
3602    async fn test_short_retry_initial_http_requests() {
3603        let server = MatrixMockServer::new().await;
3604        let client = server
3605            .client_builder()
3606            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3607            .build()
3608            .await;
3609
3610        server.mock_login().error500().expect(3..).mount().await;
3611
3612        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3613    }
3614
3615    #[async_test]
3616    async fn test_no_retry_http_requests() {
3617        let server = MatrixMockServer::new().await;
3618        let client = server.client_builder().build().await;
3619
3620        server.mock_devices().error500().mock_once().mount().await;
3621
3622        client.devices().await.unwrap_err();
3623    }
3624
3625    #[async_test]
3626    async fn test_set_homeserver() {
3627        let client = MockClientBuilder::new(None).build().await;
3628        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3629
3630        let homeserver = Url::parse("http://example.com/").unwrap();
3631        client.set_homeserver(homeserver.clone());
3632        assert_eq!(client.homeserver(), homeserver);
3633    }
3634
3635    #[async_test]
3636    async fn test_search_user_request() {
3637        let server = MatrixMockServer::new().await;
3638        let client = server.client_builder().build().await;
3639
3640        server.mock_user_directory().ok().mock_once().mount().await;
3641
3642        let response = client.search_users("test", 50).await.unwrap();
3643        assert_eq!(response.results.len(), 1);
3644        let result = &response.results[0];
3645        assert_eq!(result.user_id.to_string(), "@test:example.me");
3646        assert_eq!(result.display_name.clone().unwrap(), "Test");
3647        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3648        assert!(!response.limited);
3649    }
3650
3651    #[async_test]
3652    async fn test_request_unstable_features() {
3653        let server = MatrixMockServer::new().await;
3654        let client = server.client_builder().no_server_versions().build().await;
3655
3656        server
3657            .mock_versions()
3658            .with_feature("org.matrix.e2e_cross_signing", true)
3659            .ok()
3660            .mock_once()
3661            .mount()
3662            .await;
3663
3664        let unstable_features = client.unstable_features().await.unwrap();
3665        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3666        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3667    }
3668
3669    #[async_test]
3670    async fn test_can_homeserver_push_encrypted_event_to_device() {
3671        let server = MatrixMockServer::new().await;
3672        let client = server.client_builder().no_server_versions().build().await;
3673
3674        server.mock_versions().with_push_encrypted_events().ok().mock_once().mount().await;
3675
3676        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3677        assert!(msc4028_enabled);
3678    }
3679
3680    #[async_test]
3681    async fn test_recently_visited_rooms() {
3682        // Tracking recently visited rooms requires authentication
3683        let client = MockClientBuilder::new(None).unlogged().build().await;
3684        assert_matches!(
3685            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3686            Err(Error::AuthenticationRequired)
3687        );
3688
3689        let client = MockClientBuilder::new(None).build().await;
3690        let account = client.account();
3691
3692        // We should start off with an empty list
3693        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3694
3695        // Tracking a valid room id should add it to the list
3696        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3697        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3698        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3699
3700        // And the existing list shouldn't be changed
3701        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3702        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3703
3704        // Tracking the same room again shouldn't change the list
3705        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3706        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3707        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3708
3709        // Tracking a second room should add it to the front of the list
3710        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3711        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3712        assert_eq!(
3713            account.get_recently_visited_rooms().await.unwrap(),
3714            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3715        );
3716
3717        // Tracking the first room yet again should move it to the front of the list
3718        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3719        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3720        assert_eq!(
3721            account.get_recently_visited_rooms().await.unwrap(),
3722            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3723        );
3724
3725        // Tracking should be capped at 20
3726        for n in 0..20 {
3727            account
3728                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3729                .await
3730                .unwrap();
3731        }
3732
3733        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3734
3735        // And the initial rooms should've been pushed out
3736        let rooms = account.get_recently_visited_rooms().await.unwrap();
3737        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3738        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3739
3740        // And the last tracked room should be the first
3741        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3742    }
3743
3744    #[async_test]
3745    async fn test_client_no_cycle_with_event_cache() {
3746        let client = MockClientBuilder::new(None).build().await;
3747
3748        // Wait for the init tasks to die.
3749        sleep(Duration::from_secs(1)).await;
3750
3751        let weak_client = WeakClient::from_client(&client);
3752        assert_eq!(weak_client.strong_count(), 1);
3753
3754        {
3755            let room_id = room_id!("!room:example.org");
3756
3757            // Have the client know the room.
3758            let response = SyncResponseBuilder::default()
3759                .add_joined_room(JoinedRoomBuilder::new(room_id))
3760                .build_sync_response();
3761            client.inner.base_client.receive_sync_response(response).await.unwrap();
3762
3763            client.event_cache().subscribe().unwrap();
3764
3765            let (_room_event_cache, _drop_handles) =
3766                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3767        }
3768
3769        drop(client);
3770
3771        // Give a bit of time for background tasks to die.
3772        sleep(Duration::from_secs(1)).await;
3773
3774        // The weak client must be the last reference to the client now.
3775        assert_eq!(weak_client.strong_count(), 0);
3776        let client = weak_client.get();
3777        assert!(
3778            client.is_none(),
3779            "too many strong references to the client: {}",
3780            Arc::strong_count(&client.unwrap().inner)
3781        );
3782    }
3783
3784    #[async_test]
3785    async fn test_supported_versions_caching() {
3786        let server = MatrixMockServer::new().await;
3787
3788        let versions_mock = server
3789            .mock_versions()
3790            .expect_default_access_token()
3791            .with_feature("org.matrix.e2e_cross_signing", true)
3792            .ok()
3793            .named("first versions mock")
3794            .expect(1)
3795            .mount_as_scoped()
3796            .await;
3797
3798        let memory_store = Arc::new(MemoryStore::new());
3799        let client = server
3800            .client_builder()
3801            .no_server_versions()
3802            .on_builder(|builder| {
3803                builder.store_config(
3804                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3805                        .state_store(memory_store.clone()),
3806                )
3807            })
3808            .build()
3809            .await;
3810
3811        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3812
3813        // The result was cached.
3814        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3815        // This subsequent call hits the in-memory cache.
3816        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3817
3818        drop(client);
3819
3820        let client = server
3821            .client_builder()
3822            .no_server_versions()
3823            .on_builder(|builder| {
3824                builder.store_config(
3825                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3826                        .state_store(memory_store.clone()),
3827                )
3828            })
3829            .build()
3830            .await;
3831
3832        // These calls to the new client hit the on-disk cache.
3833        assert!(
3834            client
3835                .unstable_features()
3836                .await
3837                .unwrap()
3838                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3839        );
3840        let supported = client.supported_versions().await.unwrap();
3841        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3842        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3843
3844        // Then this call hits the in-memory cache.
3845        let supported = client.supported_versions().await.unwrap();
3846        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3847        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3848
3849        drop(versions_mock);
3850
3851        // Now, reset the cache, and observe the endpoints being called again once.
3852        client.reset_supported_versions().await.unwrap();
3853
3854        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3855
3856        // Hits network again.
3857        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3858        // Hits in-memory cache again.
3859        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3860    }
3861
3862    #[async_test]
3863    async fn test_well_known_caching() {
3864        let server = MatrixMockServer::new().await;
3865        let server_url = server.uri();
3866        let domain = server_url.strip_prefix("http://").unwrap();
3867        let server_name = <&ServerName>::try_from(domain).unwrap();
3868        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3869
3870        let well_known_mock = server
3871            .mock_well_known()
3872            .ok()
3873            .named("well known mock")
3874            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3875            .mount_as_scoped()
3876            .await;
3877
3878        let memory_store = Arc::new(MemoryStore::new());
3879        let client = Client::builder()
3880            .insecure_server_name_no_tls(server_name)
3881            .store_config(
3882                StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3883                    .state_store(memory_store.clone()),
3884            )
3885            .build()
3886            .await
3887            .unwrap();
3888
3889        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3890
3891        // This subsequent call hits the in-memory cache.
3892        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3893
3894        drop(client);
3895
3896        let client = server
3897            .client_builder()
3898            .no_server_versions()
3899            .on_builder(|builder| {
3900                builder.store_config(
3901                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3902                        .state_store(memory_store.clone()),
3903                )
3904            })
3905            .build()
3906            .await;
3907
3908        // This call to the new client hits the on-disk cache.
3909        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3910
3911        // Then this call hits the in-memory cache.
3912        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3913
3914        drop(well_known_mock);
3915
3916        // Now, reset the cache, and observe the endpoints being called again once.
3917        client.reset_well_known().await.unwrap();
3918
3919        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3920
3921        // Hits network again.
3922        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3923        // Hits in-memory cache again.
3924        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3925    }
3926
3927    #[async_test]
3928    async fn test_missing_well_known_caching() {
3929        let server = MatrixMockServer::new().await;
3930        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3931
3932        let well_known_mock = server
3933            .mock_well_known()
3934            .error_unrecognized()
3935            .named("first well-known mock")
3936            .expect(1)
3937            .mount_as_scoped()
3938            .await;
3939
3940        let memory_store = Arc::new(MemoryStore::new());
3941        let client = server
3942            .client_builder()
3943            .on_builder(|builder| {
3944                builder.store_config(
3945                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3946                        .state_store(memory_store.clone()),
3947                )
3948            })
3949            .build()
3950            .await;
3951
3952        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3953
3954        // This subsequent call hits the in-memory cache.
3955        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3956
3957        drop(client);
3958
3959        let client = server
3960            .client_builder()
3961            .on_builder(|builder| {
3962                builder.store_config(
3963                    StoreConfig::new(CrossProcessLockConfig::SingleProcess)
3964                        .state_store(memory_store.clone()),
3965                )
3966            })
3967            .build()
3968            .await;
3969
3970        // This call to the new client hits the on-disk cache.
3971        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3972
3973        // Then this call hits the in-memory cache.
3974        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3975
3976        drop(well_known_mock);
3977
3978        // Now, reset the cache, and observe the endpoints being called again once.
3979        client.reset_well_known().await.unwrap();
3980
3981        server
3982            .mock_well_known()
3983            .error_unrecognized()
3984            .expect(1)
3985            .named("second well-known mock")
3986            .mount()
3987            .await;
3988
3989        // Hits network again.
3990        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3991        // Hits in-memory cache again.
3992        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3993    }
3994
3995    #[async_test]
3996    async fn test_no_network_doesnt_cause_infinite_retries() {
3997        // We want infinite retries for transient errors.
3998        let client = MockClientBuilder::new(None)
3999            .on_builder(|builder| builder.request_config(RequestConfig::new()))
4000            .build()
4001            .await;
4002
4003        // We don't define a mock server on purpose here, so that the error is really a
4004        // network error.
4005        client.whoami().await.unwrap_err();
4006    }
4007
4008    #[async_test]
4009    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
4010        let server = MatrixMockServer::new().await;
4011        let client = server.client_builder().build().await;
4012
4013        let room_id = room_id!("!room:example.org");
4014
4015        server
4016            .mock_sync()
4017            .ok_and_run(&client, |builder| {
4018                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4019            })
4020            .await;
4021
4022        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
4023        assert_eq!(room.room_id(), room_id);
4024    }
4025
4026    #[async_test]
4027    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
4028        let server = MatrixMockServer::new().await;
4029        let client = server.client_builder().build().await;
4030
4031        let room_id = room_id!("!room:example.org");
4032
4033        let client = Arc::new(client);
4034
4035        // Perform the /sync request with a delay so it starts after the
4036        // `await_room_remote_echo` call has happened
4037        spawn({
4038            let client = client.clone();
4039            async move {
4040                sleep(Duration::from_millis(100)).await;
4041
4042                server
4043                    .mock_sync()
4044                    .ok_and_run(&client, |builder| {
4045                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4046                    })
4047                    .await;
4048            }
4049        });
4050
4051        let room =
4052            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4053        assert_eq!(room.room_id(), room_id);
4054    }
4055
4056    #[async_test]
4057    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4058        let client = MockClientBuilder::new(None).build().await;
4059
4060        let room_id = room_id!("!room:example.org");
4061        // Room is not present so the client won't be able to find it. The call will
4062        // timeout.
4063        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4064    }
4065
4066    #[async_test]
4067    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4068        let server = MatrixMockServer::new().await;
4069        let client = server.client_builder().build().await;
4070
4071        server.mock_create_room().ok().mount().await;
4072
4073        // Create a room in the internal store
4074        let room = client
4075            .create_room(assign!(CreateRoomRequest::new(), {
4076                invite: vec![],
4077                is_direct: false,
4078            }))
4079            .await
4080            .unwrap();
4081
4082        // Room is locally present, but not synced, the call will timeout
4083        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4084            .await
4085            .unwrap_err();
4086    }
4087
4088    #[async_test]
4089    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4090        let server = MatrixMockServer::new().await;
4091        let client = server.client_builder().build().await;
4092
4093        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4094
4095        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4096        assert_matches!(ret, Ok(true));
4097    }
4098
4099    #[async_test]
4100    async fn test_is_room_alias_available_if_alias_is_resolved() {
4101        let server = MatrixMockServer::new().await;
4102        let client = server.client_builder().build().await;
4103
4104        server
4105            .mock_room_directory_resolve_alias()
4106            .ok("!some_room_id:matrix.org", Vec::new())
4107            .expect(1)
4108            .mount()
4109            .await;
4110
4111        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4112        assert_matches!(ret, Ok(false));
4113    }
4114
4115    #[async_test]
4116    async fn test_is_room_alias_available_if_error_found() {
4117        let server = MatrixMockServer::new().await;
4118        let client = server.client_builder().build().await;
4119
4120        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4121
4122        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4123        assert_matches!(ret, Err(_));
4124    }
4125
4126    #[async_test]
4127    async fn test_create_room_alias() {
4128        let server = MatrixMockServer::new().await;
4129        let client = server.client_builder().build().await;
4130
4131        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4132
4133        let ret = client
4134            .create_room_alias(
4135                room_alias_id!("#some_alias:matrix.org"),
4136                room_id!("!some_room:matrix.org"),
4137            )
4138            .await;
4139        assert_matches!(ret, Ok(()));
4140    }
4141
4142    #[async_test]
4143    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4144        let server = MatrixMockServer::new().await;
4145        let client = server.client_builder().build().await;
4146
4147        let room_id = room_id!("!a-room:matrix.org");
4148
4149        // Make sure the summary endpoint is called once
4150        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4151
4152        // We create a locally cached invited room
4153        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4154
4155        // And we get a preview, the server endpoint was reached
4156        let preview = client
4157            .get_room_preview(room_id.into(), Vec::new())
4158            .await
4159            .expect("Room preview should be retrieved");
4160
4161        assert_eq!(invited_room.room_id(), preview.room_id);
4162    }
4163
4164    #[async_test]
4165    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4166        let server = MatrixMockServer::new().await;
4167        let client = server.client_builder().build().await;
4168
4169        let room_id = room_id!("!a-room:matrix.org");
4170
4171        // Make sure the summary endpoint is called once
4172        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4173
4174        // We create a locally cached left room
4175        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4176
4177        // And we get a preview, the server endpoint was reached
4178        let preview = client
4179            .get_room_preview(room_id.into(), Vec::new())
4180            .await
4181            .expect("Room preview should be retrieved");
4182
4183        assert_eq!(left_room.room_id(), preview.room_id);
4184    }
4185
4186    #[async_test]
4187    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4188        let server = MatrixMockServer::new().await;
4189        let client = server.client_builder().build().await;
4190
4191        let room_id = room_id!("!a-room:matrix.org");
4192
4193        // Make sure the summary endpoint is called once
4194        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4195
4196        // We create a locally cached knocked room
4197        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4198
4199        // And we get a preview, the server endpoint was reached
4200        let preview = client
4201            .get_room_preview(room_id.into(), Vec::new())
4202            .await
4203            .expect("Room preview should be retrieved");
4204
4205        assert_eq!(knocked_room.room_id(), preview.room_id);
4206    }
4207
4208    #[async_test]
4209    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4210        let server = MatrixMockServer::new().await;
4211        let client = server.client_builder().build().await;
4212
4213        let room_id = room_id!("!a-room:matrix.org");
4214
4215        // Make sure the summary endpoint is not called
4216        server.mock_room_summary().ok(room_id).never().mount().await;
4217
4218        // We create a locally cached joined room
4219        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4220
4221        // And we get a preview, no server endpoint was reached
4222        let preview = client
4223            .get_room_preview(room_id.into(), Vec::new())
4224            .await
4225            .expect("Room preview should be retrieved");
4226
4227        assert_eq!(joined_room.room_id(), preview.room_id);
4228    }
4229
4230    #[async_test]
4231    async fn test_media_preview_config() {
4232        let server = MatrixMockServer::new().await;
4233        let client = server.client_builder().build().await;
4234
4235        server
4236            .mock_sync()
4237            .ok_and_run(&client, |builder| {
4238                builder.add_custom_global_account_data(json!({
4239                    "content": {
4240                        "media_previews": "private",
4241                        "invite_avatars": "off"
4242                    },
4243                    "type": "m.media_preview_config"
4244                }));
4245            })
4246            .await;
4247
4248        let (initial_value, stream) =
4249            client.account().observe_media_preview_config().await.unwrap();
4250
4251        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4252        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4253        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4254        pin_mut!(stream);
4255        assert_pending!(stream);
4256
4257        server
4258            .mock_sync()
4259            .ok_and_run(&client, |builder| {
4260                builder.add_custom_global_account_data(json!({
4261                    "content": {
4262                        "media_previews": "off",
4263                        "invite_avatars": "on"
4264                    },
4265                    "type": "m.media_preview_config"
4266                }));
4267            })
4268            .await;
4269
4270        assert_next_matches!(
4271            stream,
4272            MediaPreviewConfigEventContent {
4273                media_previews: Some(MediaPreviews::Off),
4274                invite_avatars: Some(InviteAvatars::On),
4275                ..
4276            }
4277        );
4278        assert_pending!(stream);
4279    }
4280
4281    #[async_test]
4282    async fn test_unstable_media_preview_config() {
4283        let server = MatrixMockServer::new().await;
4284        let client = server.client_builder().build().await;
4285
4286        server
4287            .mock_sync()
4288            .ok_and_run(&client, |builder| {
4289                builder.add_custom_global_account_data(json!({
4290                    "content": {
4291                        "media_previews": "private",
4292                        "invite_avatars": "off"
4293                    },
4294                    "type": "io.element.msc4278.media_preview_config"
4295                }));
4296            })
4297            .await;
4298
4299        let (initial_value, stream) =
4300            client.account().observe_media_preview_config().await.unwrap();
4301
4302        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4303        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4304        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4305        pin_mut!(stream);
4306        assert_pending!(stream);
4307
4308        server
4309            .mock_sync()
4310            .ok_and_run(&client, |builder| {
4311                builder.add_custom_global_account_data(json!({
4312                    "content": {
4313                        "media_previews": "off",
4314                        "invite_avatars": "on"
4315                    },
4316                    "type": "io.element.msc4278.media_preview_config"
4317                }));
4318            })
4319            .await;
4320
4321        assert_next_matches!(
4322            stream,
4323            MediaPreviewConfigEventContent {
4324                media_previews: Some(MediaPreviews::Off),
4325                invite_avatars: Some(InviteAvatars::On),
4326                ..
4327            }
4328        );
4329        assert_pending!(stream);
4330    }
4331
4332    #[async_test]
4333    async fn test_media_preview_config_not_found() {
4334        let server = MatrixMockServer::new().await;
4335        let client = server.client_builder().build().await;
4336
4337        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4338
4339        assert!(initial_value.is_none());
4340    }
4341
4342    #[async_test]
4343    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4344        // The default Matrix version we use is 1.11 or higher, so authenticated media
4345        // is supported.
4346        let server = MatrixMockServer::new().await;
4347        let client = server.client_builder().build().await;
4348
4349        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4350
4351        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4352        client.load_or_fetch_max_upload_size().await.unwrap();
4353
4354        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4355    }
4356
4357    #[async_test]
4358    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4359        // The server must advertise support for the stable feature for authenticated
4360        // media support, so we mock the `GET /versions` response.
4361        let server = MatrixMockServer::new().await;
4362        let client = server.client_builder().no_server_versions().build().await;
4363
4364        server
4365            .mock_versions()
4366            .with_versions(vec!["v1.7", "v1.8", "v1.9", "v1.10"])
4367            .with_feature("org.matrix.msc3916.stable", true)
4368            .ok()
4369            .named("versions")
4370            .expect(1)
4371            .mount()
4372            .await;
4373
4374        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4375
4376        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4377        client.load_or_fetch_max_upload_size().await.unwrap();
4378
4379        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4380    }
4381
4382    #[async_test]
4383    async fn test_load_or_fetch_max_upload_size_no_auth() {
4384        // The server must not support Matrix 1.11 or higher for unauthenticated
4385        // media requests, so we mock the `GET /versions` response.
4386        let server = MatrixMockServer::new().await;
4387        let client = server.client_builder().no_server_versions().build().await;
4388
4389        server
4390            .mock_versions()
4391            .with_versions(vec!["v1.1"])
4392            .ok()
4393            .named("versions")
4394            .expect(1)
4395            .mount()
4396            .await;
4397
4398        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4399
4400        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4401        client.load_or_fetch_max_upload_size().await.unwrap();
4402
4403        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4404    }
4405
4406    #[async_test]
4407    async fn test_uploading_a_too_large_media_file() {
4408        let server = MatrixMockServer::new().await;
4409        let client = server.client_builder().build().await;
4410
4411        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4412        client.load_or_fetch_max_upload_size().await.unwrap();
4413        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4414
4415        let data = vec![1, 2];
4416        let upload_request =
4417            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4418        let request = SendRequest {
4419            client: client.clone(),
4420            request: upload_request,
4421            config: None,
4422            send_progress: SharedObservable::new(TransmissionProgress::default()),
4423        };
4424        let media_request = SendMediaUploadRequest::new(request);
4425
4426        let error = media_request.await.err();
4427        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4428        assert_eq!(max, uint!(1));
4429        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4430    }
4431
4432    #[async_test]
4433    async fn test_dont_ignore_timeout_on_first_sync() {
4434        let server = MatrixMockServer::new().await;
4435        let client = server.client_builder().build().await;
4436
4437        server
4438            .mock_sync()
4439            .timeout(Some(Duration::from_secs(30)))
4440            .ok(|_| {})
4441            .mock_once()
4442            .named("sync_with_timeout")
4443            .mount()
4444            .await;
4445
4446        // Call the endpoint once to check the timeout.
4447        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4448
4449        timeout(Duration::from_secs(1), async {
4450            stream.next().await.unwrap().unwrap();
4451        })
4452        .await
4453        .unwrap();
4454    }
4455
4456    #[async_test]
4457    async fn test_ignore_timeout_on_first_sync() {
4458        let server = MatrixMockServer::new().await;
4459        let client = server.client_builder().build().await;
4460
4461        server
4462            .mock_sync()
4463            .timeout(None)
4464            .ok(|_| {})
4465            .mock_once()
4466            .named("sync_no_timeout")
4467            .mount()
4468            .await;
4469        server
4470            .mock_sync()
4471            .timeout(Some(Duration::from_secs(30)))
4472            .ok(|_| {})
4473            .mock_once()
4474            .named("sync_with_timeout")
4475            .mount()
4476            .await;
4477
4478        // Call each version of the endpoint once to check the timeouts.
4479        let mut stream = Box::pin(
4480            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4481        );
4482
4483        timeout(Duration::from_secs(1), async {
4484            stream.next().await.unwrap().unwrap();
4485            stream.next().await.unwrap().unwrap();
4486        })
4487        .await
4488        .unwrap();
4489    }
4490
4491    #[async_test]
4492    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4493        let server = MatrixMockServer::new().await;
4494        let client = server.client_builder().build().await;
4495        // This is the user ID that is inside MemberAdditional.
4496        // Note the confusing username, so we can share
4497        // GlobalAccountDataTestEvent::Direct with the invited test.
4498        let user_id = user_id!("@invited:localhost");
4499
4500        // When we receive a sync response saying "invited" is invited to a DM
4501        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4502        let response = SyncResponseBuilder::default()
4503            .add_joined_room(JoinedRoomBuilder::default().add_state_event(f.member(user_id)))
4504            .add_global_account_data(
4505                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4506            )
4507            .build_sync_response();
4508        client.base_client().receive_sync_response(response).await.unwrap();
4509
4510        // Then get_dm_room finds this room
4511        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4512        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4513    }
4514
4515    #[async_test]
4516    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4517        let server = MatrixMockServer::new().await;
4518        let client = server.client_builder().build().await;
4519        // This is the user ID that is inside MemberInvite
4520        let user_id = user_id!("@invited:localhost");
4521
4522        // When we receive a sync response saying "invited" is invited to a DM
4523        let f = EventFactory::new().sender(user_id!("@example:localhost"));
4524        let response = SyncResponseBuilder::default()
4525            .add_joined_room(
4526                JoinedRoomBuilder::default()
4527                    .add_state_event(f.member(user_id).invited(user_id).display_name("example")),
4528            )
4529            .add_global_account_data(
4530                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4531            )
4532            .build_sync_response();
4533        client.base_client().receive_sync_response(response).await.unwrap();
4534
4535        // Then get_dm_room finds this room
4536        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4537        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4538    }
4539
4540    #[async_test]
4541    async fn test_get_dm_room_still_finds_left_room() {
4542        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4543        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4544
4545        let server = MatrixMockServer::new().await;
4546        let client = server.client_builder().build().await;
4547        // This is the user ID that is inside MemberAdditional.
4548        // Note the confusing username, so we can share
4549        // GlobalAccountDataTestEvent::Direct with the invited test.
4550        let user_id = user_id!("@invited:localhost");
4551
4552        // When we receive a sync response saying "invited" has left a DM
4553        let f = EventFactory::new().sender(user_id);
4554        let response = SyncResponseBuilder::default()
4555            .add_joined_room(
4556                JoinedRoomBuilder::default().add_state_event(f.member(user_id).leave()),
4557            )
4558            .add_global_account_data(
4559                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4560            )
4561            .build_sync_response();
4562        client.base_client().receive_sync_response(response).await.unwrap();
4563
4564        // Then get_dm_room finds this room
4565        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4566        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4567    }
4568
4569    #[async_test]
4570    async fn test_device_exists() {
4571        let server = MatrixMockServer::new().await;
4572        let client = server.client_builder().build().await;
4573
4574        server.mock_get_device().ok().expect(1).mount().await;
4575
4576        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4577    }
4578
4579    #[async_test]
4580    async fn test_device_exists_404() {
4581        let server = MatrixMockServer::new().await;
4582        let client = server.client_builder().build().await;
4583
4584        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4585    }
4586
4587    #[async_test]
4588    async fn test_device_exists_500() {
4589        let server = MatrixMockServer::new().await;
4590        let client = server.client_builder().build().await;
4591
4592        server.mock_get_device().error500().expect(1).mount().await;
4593
4594        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4595    }
4596
4597    #[async_test]
4598    async fn test_fetching_well_known_with_homeserver_url() {
4599        let server = MatrixMockServer::new().await;
4600        let client = server.client_builder().build().await;
4601        server.mock_well_known().ok().mount().await;
4602
4603        assert_matches!(client.fetch_client_well_known().await, Some(_));
4604    }
4605
4606    #[async_test]
4607    async fn test_fetching_well_known_with_server_name() {
4608        let server = MatrixMockServer::new().await;
4609        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4610
4611        server.mock_well_known().ok().mount().await;
4612
4613        let client = MockClientBuilder::new(None)
4614            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4615            .build()
4616            .await;
4617
4618        assert_matches!(client.fetch_client_well_known().await, Some(_));
4619    }
4620
4621    #[async_test]
4622    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4623        let server = MatrixMockServer::new().await;
4624        server.mock_well_known().ok().mount().await;
4625
4626        let user_id =
4627            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4628        let client = MockClientBuilder::new(None)
4629            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4630            .build()
4631            .await;
4632
4633        assert_matches!(client.fetch_client_well_known().await, Some(_));
4634    }
4635}