matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use eyeball::{SharedObservable, Subscriber};
27use eyeball_im::{Vector, VectorDiff};
28use futures_core::Stream;
29use futures_util::StreamExt;
30#[cfg(feature = "e2e-encryption")]
31use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
32use matrix_sdk_base::{
33    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
34    StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm, ThreadingSupport,
35    event_cache::store::EventCacheStoreLock,
36    media::store::MediaStoreLock,
37    store::{
38        DynStateStore, RoomLoadSettings, SupportedVersionsResponse, TtlStoreValue,
39        WellKnownResponse,
40    },
41    sync::{Notification, RoomUpdates},
42};
43use matrix_sdk_common::ttl_cache::TtlCache;
44#[cfg(feature = "e2e-encryption")]
45use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
46use ruma::{
47    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
48    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
49    api::{
50        FeatureFlag, MatrixVersion, Metadata, OutgoingRequest, SupportedVersions,
51        auth_scheme::{AuthScheme, SendAccessToken},
52        client::{
53            account::whoami,
54            alias::{create_alias, delete_alias, get_alias},
55            authenticated_media,
56            device::{self, delete_devices, get_devices, update_device},
57            directory::{get_public_rooms, get_public_rooms_filtered},
58            discovery::{
59                discover_homeserver::{self, RtcFocusInfo},
60                get_capabilities::{self, v3::Capabilities},
61                get_supported_versions,
62            },
63            error::ErrorKind,
64            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
65            knock::knock_room,
66            media,
67            membership::{join_room_by_id, join_room_by_id_or_alias},
68            room::create_room,
69            session::login::v3::DiscoveryInfo,
70            sync::sync_events,
71            threads::get_thread_subscriptions_changes,
72            uiaa,
73            user_directory::search_users,
74        },
75        error::FromHttpResponseError,
76        path_builder::PathBuilder,
77    },
78    assign,
79    events::direct::DirectUserIdentifier,
80    push::Ruleset,
81    time::Instant,
82};
83use serde::de::DeserializeOwned;
84use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
85use tracing::{Instrument, Span, debug, error, info, instrument, trace, warn};
86use url::Url;
87
88use self::{
89    caches::{CachedValue, ClientCaches},
90    futures::SendRequest,
91};
92use crate::{
93    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
94    Room, SessionTokens, TransmissionProgress,
95    authentication::{
96        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
97        oauth::OAuth,
98    },
99    client::thread_subscriptions::ThreadSubscriptionCatchup,
100    config::{RequestConfig, SyncToken},
101    deduplicating_handler::DeduplicatingHandler,
102    error::HttpResult,
103    event_cache::EventCache,
104    event_handler::{
105        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
106        EventHandlerStore, ObservableEventHandler, SyncEvent,
107    },
108    http_client::{HttpClient, SupportedPathBuilder},
109    latest_events::LatestEvents,
110    media::MediaError,
111    notification_settings::NotificationSettings,
112    room::RoomMember,
113    room_preview::RoomPreview,
114    send_queue::{SendQueue, SendQueueData},
115    sliding_sync::Version as SlidingSyncVersion,
116    sync::{RoomUpdate, SyncResponse},
117};
118#[cfg(feature = "e2e-encryption")]
119use crate::{
120    cross_process_lock::CrossProcessLock,
121    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
122};
123
124mod builder;
125pub(crate) mod caches;
126pub(crate) mod futures;
127pub(crate) mod thread_subscriptions;
128
129pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
130#[cfg(feature = "experimental-search")]
131use crate::search_index::SearchIndex;
132
133#[cfg(not(target_family = "wasm"))]
134type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
135#[cfg(target_family = "wasm")]
136type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
137
138#[cfg(not(target_family = "wasm"))]
139type NotificationHandlerFn =
140    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
141#[cfg(target_family = "wasm")]
142type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
143
144/// Enum controlling if a loop running callbacks should continue or abort.
145///
146/// This is mainly used in the [`sync_with_callback`] method, the return value
147/// of the provided callback controls if the sync loop should be exited.
148///
149/// [`sync_with_callback`]: #method.sync_with_callback
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum LoopCtrl {
152    /// Continue running the loop.
153    Continue,
154    /// Break out of the loop.
155    Break,
156}
157
158/// Represents changes that can occur to a `Client`s `Session`.
159#[derive(Debug, Clone, PartialEq)]
160pub enum SessionChange {
161    /// The session's token is no longer valid.
162    UnknownToken {
163        /// Whether or not the session was soft logged out
164        soft_logout: bool,
165    },
166    /// The session's tokens have been refreshed.
167    TokensRefreshed,
168}
169
170/// Information about the server vendor obtained from the federation API.
171#[derive(Debug, Clone, PartialEq, Eq)]
172#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
173pub struct ServerVendorInfo {
174    /// The server name.
175    pub server_name: String,
176    /// The server version.
177    pub version: String,
178}
179
180/// An async/await enabled Matrix client.
181///
182/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
183#[derive(Clone)]
184pub struct Client {
185    pub(crate) inner: Arc<ClientInner>,
186}
187
188#[derive(Default)]
189pub(crate) struct ClientLocks {
190    /// Lock ensuring that only a single room may be marked as a DM at once.
191    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
192    /// explanation.
193    pub(crate) mark_as_dm_lock: Mutex<()>,
194
195    /// Lock ensuring that only a single secret store is getting opened at the
196    /// same time.
197    ///
198    /// This is important so we don't accidentally create multiple different new
199    /// default secret storage keys.
200    #[cfg(feature = "e2e-encryption")]
201    pub(crate) open_secret_store_lock: Mutex<()>,
202
203    /// Lock ensuring that we're only storing a single secret at a time.
204    ///
205    /// Take a look at the [`SecretStore::put_secret`] method for a more
206    /// detailed explanation.
207    ///
208    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
209    #[cfg(feature = "e2e-encryption")]
210    pub(crate) store_secret_lock: Mutex<()>,
211
212    /// Lock ensuring that only one method at a time might modify our backup.
213    #[cfg(feature = "e2e-encryption")]
214    pub(crate) backup_modify_lock: Mutex<()>,
215
216    /// Lock ensuring that we're going to attempt to upload backups for a single
217    /// requester.
218    #[cfg(feature = "e2e-encryption")]
219    pub(crate) backup_upload_lock: Mutex<()>,
220
221    /// Handler making sure we only have one group session sharing request in
222    /// flight per room.
223    #[cfg(feature = "e2e-encryption")]
224    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
225
226    /// Lock making sure we're only doing one key claim request at a time.
227    #[cfg(feature = "e2e-encryption")]
228    pub(crate) key_claim_lock: Mutex<()>,
229
230    /// Handler to ensure that only one members request is running at a time,
231    /// given a room.
232    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
233
234    /// Handler to ensure that only one encryption state request is running at a
235    /// time, given a room.
236    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
237
238    /// Deduplicating handler for sending read receipts. The string is an
239    /// internal implementation detail, see [`Self::send_single_receipt`].
240    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
241
242    #[cfg(feature = "e2e-encryption")]
243    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
244
245    /// Latest "generation" of data known by the crypto store.
246    ///
247    /// This is a counter that only increments, set in the database (and can
248    /// wrap). It's incremented whenever some process acquires a lock for the
249    /// first time. *This assumes the crypto store lock is being held, to
250    /// avoid data races on writing to this value in the store*.
251    ///
252    /// The current process will maintain this value in local memory and in the
253    /// DB over time. Observing a different value than the one read in
254    /// memory, when reading from the store indicates that somebody else has
255    /// written into the database under our feet.
256    ///
257    /// TODO: this should live in the `OlmMachine`, since it's information
258    /// related to the lock. As of today (2023-07-28), we blow up the entire
259    /// olm machine when there's a generation mismatch. So storing the
260    /// generation in the olm machine would make the client think there's
261    /// *always* a mismatch, and that's why we need to store the generation
262    /// outside the `OlmMachine`.
263    #[cfg(feature = "e2e-encryption")]
264    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
265}
266
267pub(crate) struct ClientInner {
268    /// All the data related to authentication and authorization.
269    pub(crate) auth_ctx: Arc<AuthCtx>,
270
271    /// The URL of the server.
272    ///
273    /// Not to be confused with the `Self::homeserver`. `server` is usually
274    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
275    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
276    /// homeserver (at the time of writing — 2024-08-28).
277    ///
278    /// This value is optional depending on how the `Client` has been built.
279    /// If it's been built from a homeserver URL directly, we don't know the
280    /// server. However, if the `Client` has been built from a server URL or
281    /// name, then the homeserver has been discovered, and we know both.
282    server: Option<Url>,
283
284    /// The URL of the homeserver to connect to.
285    ///
286    /// This is the URL for the client-server Matrix API.
287    homeserver: StdRwLock<Url>,
288
289    /// The sliding sync version.
290    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
291
292    /// The underlying HTTP client.
293    pub(crate) http_client: HttpClient,
294
295    /// User session data.
296    pub(super) base_client: BaseClient,
297
298    /// Collection of in-memory caches for the [`Client`].
299    pub(crate) caches: ClientCaches,
300
301    /// Collection of locks individual client methods might want to use, either
302    /// to ensure that only a single call to a method happens at once or to
303    /// deduplicate multiple calls to a method.
304    pub(crate) locks: ClientLocks,
305
306    /// The cross-process store locks holder name.
307    ///
308    /// The SDK provides cross-process store locks (see
309    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
310    /// `holder_name` is the value used for all cross-process store locks
311    /// used by this `Client`.
312    ///
313    /// If multiple `Client`s are running in different processes, this
314    /// value MUST be different for each `Client`.
315    cross_process_store_locks_holder_name: String,
316
317    /// A mapping of the times at which the current user sent typing notices,
318    /// keyed by room.
319    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
320
321    /// Event handlers. See `add_event_handler`.
322    pub(crate) event_handlers: EventHandlerStore,
323
324    /// Notification handlers. See `register_notification_handler`.
325    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
326
327    /// The sender-side of channels used to receive room updates.
328    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
329
330    /// The sender-side of a channel used to observe all the room updates of a
331    /// sync response.
332    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
333
334    /// Whether the client should update its homeserver URL with the discovery
335    /// information present in the login response.
336    respect_login_well_known: bool,
337
338    /// An event that can be listened on to wait for a successful sync. The
339    /// event will only be fired if a sync loop is running. Can be used for
340    /// synchronization, e.g. if we send out a request to create a room, we can
341    /// wait for the sync to get the data to fetch a room object from the state
342    /// store.
343    pub(crate) sync_beat: event_listener::Event,
344
345    /// A central cache for events, inactive first.
346    ///
347    /// It becomes active when [`EventCache::subscribe`] is called.
348    pub(crate) event_cache: OnceCell<EventCache>,
349
350    /// End-to-end encryption related state.
351    #[cfg(feature = "e2e-encryption")]
352    pub(crate) e2ee: EncryptionData,
353
354    /// The verification state of our own device.
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) verification_state: SharedObservable<VerificationState>,
357
358    /// Whether to enable the experimental support for sending and receiving
359    /// encrypted room history on invite, per [MSC4268].
360    ///
361    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
362    #[cfg(feature = "e2e-encryption")]
363    pub(crate) enable_share_history_on_invite: bool,
364
365    /// Data related to the [`SendQueue`].
366    ///
367    /// [`SendQueue`]: crate::send_queue::SendQueue
368    pub(crate) send_queue_data: Arc<SendQueueData>,
369
370    /// The `max_upload_size` value of the homeserver, it contains the max
371    /// request size you can send.
372    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
373
374    /// The entry point to get the [`LatestEvent`] of rooms and threads.
375    ///
376    /// [`LatestEvent`]: crate::latest_event::LatestEvent
377    latest_events: OnceCell<LatestEvents>,
378
379    /// Service handling the catching up of thread subscriptions in the
380    /// background.
381    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
382
383    #[cfg(feature = "experimental-search")]
384    /// Handler for [`RoomIndex`]'s of each room
385    search_index: SearchIndex,
386}
387
388impl ClientInner {
389    /// Create a new `ClientInner`.
390    ///
391    /// All the fields passed as parameters here are those that must be cloned
392    /// upon instantiation of a sub-client, e.g. a client specialized for
393    /// notifications.
394    #[allow(clippy::too_many_arguments)]
395    async fn new(
396        auth_ctx: Arc<AuthCtx>,
397        server: Option<Url>,
398        homeserver: Url,
399        sliding_sync_version: SlidingSyncVersion,
400        http_client: HttpClient,
401        base_client: BaseClient,
402        supported_versions: CachedValue<SupportedVersions>,
403        well_known: CachedValue<Option<WellKnownResponse>>,
404        respect_login_well_known: bool,
405        event_cache: OnceCell<EventCache>,
406        send_queue: Arc<SendQueueData>,
407        latest_events: OnceCell<LatestEvents>,
408        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
409        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
410        cross_process_store_locks_holder_name: String,
411        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
412        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
413    ) -> Arc<Self> {
414        let caches = ClientCaches {
415            supported_versions: supported_versions.into(),
416            well_known: well_known.into(),
417            server_metadata: Mutex::new(TtlCache::new()),
418        };
419
420        let client = Self {
421            server,
422            homeserver: StdRwLock::new(homeserver),
423            auth_ctx,
424            sliding_sync_version: StdRwLock::new(sliding_sync_version),
425            http_client,
426            base_client,
427            caches,
428            locks: Default::default(),
429            cross_process_store_locks_holder_name,
430            typing_notice_times: Default::default(),
431            event_handlers: Default::default(),
432            notification_handlers: Default::default(),
433            room_update_channels: Default::default(),
434            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
435            // ballast for all observers to catch up.
436            room_updates_sender: broadcast::Sender::new(32),
437            respect_login_well_known,
438            sync_beat: event_listener::Event::new(),
439            event_cache,
440            send_queue_data: send_queue,
441            latest_events,
442            #[cfg(feature = "e2e-encryption")]
443            e2ee: EncryptionData::new(encryption_settings),
444            #[cfg(feature = "e2e-encryption")]
445            verification_state: SharedObservable::new(VerificationState::Unknown),
446            #[cfg(feature = "e2e-encryption")]
447            enable_share_history_on_invite,
448            server_max_upload_size: Mutex::new(OnceCell::new()),
449            #[cfg(feature = "experimental-search")]
450            search_index: search_index_handler,
451            thread_subscription_catchup,
452        };
453
454        #[allow(clippy::let_and_return)]
455        let client = Arc::new(client);
456
457        #[cfg(feature = "e2e-encryption")]
458        client.e2ee.initialize_tasks(&client);
459
460        let _ = client
461            .event_cache
462            .get_or_init(|| async {
463                EventCache::new(
464                    WeakClient::from_inner(&client),
465                    client.base_client.event_cache_store().clone(),
466                )
467            })
468            .await;
469
470        let _ = client
471            .thread_subscription_catchup
472            .get_or_init(|| async {
473                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
474            })
475            .await;
476
477        client
478    }
479}
480
481#[cfg(not(tarpaulin_include))]
482impl Debug for Client {
483    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
484        write!(fmt, "Client")
485    }
486}
487
488impl Client {
489    /// Create a new [`Client`] that will use the given homeserver.
490    ///
491    /// # Arguments
492    ///
493    /// * `homeserver_url` - The homeserver that the client should connect to.
494    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
495        Self::builder().homeserver_url(homeserver_url).build().await
496    }
497
498    /// Returns a subscriber that publishes an event every time the ignore user
499    /// list changes.
500    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
501        self.inner.base_client.subscribe_to_ignore_user_list_changes()
502    }
503
504    /// Create a new [`ClientBuilder`].
505    pub fn builder() -> ClientBuilder {
506        ClientBuilder::new()
507    }
508
509    pub(crate) fn base_client(&self) -> &BaseClient {
510        &self.inner.base_client
511    }
512
513    /// The underlying HTTP client.
514    pub fn http_client(&self) -> &reqwest::Client {
515        &self.inner.http_client.inner
516    }
517
518    pub(crate) fn locks(&self) -> &ClientLocks {
519        &self.inner.locks
520    }
521
522    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
523        &self.inner.auth_ctx
524    }
525
526    /// The cross-process store locks holder name.
527    ///
528    /// The SDK provides cross-process store locks (see
529    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
530    /// `holder_name` is the value used for all cross-process store locks
531    /// used by this `Client`.
532    pub fn cross_process_store_locks_holder_name(&self) -> &str {
533        &self.inner.cross_process_store_locks_holder_name
534    }
535
536    /// Change the homeserver URL used by this client.
537    ///
538    /// # Arguments
539    ///
540    /// * `homeserver_url` - The new URL to use.
541    fn set_homeserver(&self, homeserver_url: Url) {
542        *self.inner.homeserver.write().unwrap() = homeserver_url;
543    }
544
545    /// Change to a different homeserver and re-resolve well-known.
546    #[cfg(feature = "e2e-encryption")]
547    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
548        &self,
549        homeserver_url: Url,
550    ) -> Result<()> {
551        self.set_homeserver(homeserver_url);
552        self.reset_well_known().await?;
553        if let Some(well_known) = self.load_or_fetch_well_known().await? {
554            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
555        }
556        Ok(())
557    }
558
559    /// Get the capabilities of the homeserver.
560    ///
561    /// This method should be used to check what features are supported by the
562    /// homeserver.
563    ///
564    /// # Examples
565    ///
566    /// ```no_run
567    /// # use matrix_sdk::Client;
568    /// # use url::Url;
569    /// # async {
570    /// # let homeserver = Url::parse("http://example.com")?;
571    /// let client = Client::new(homeserver).await?;
572    ///
573    /// let capabilities = client.get_capabilities().await?;
574    ///
575    /// if capabilities.change_password.enabled {
576    ///     // Change password
577    /// }
578    /// # anyhow::Ok(()) };
579    /// ```
580    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
581        let res = self.send(get_capabilities::v3::Request::new()).await?;
582        Ok(res.capabilities)
583    }
584
585    /// Get the server vendor information from the federation API.
586    ///
587    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
588    /// both the server's software name and version.
589    ///
590    /// # Examples
591    ///
592    /// ```no_run
593    /// # use matrix_sdk::Client;
594    /// # use url::Url;
595    /// # async {
596    /// # let homeserver = Url::parse("http://example.com")?;
597    /// let client = Client::new(homeserver).await?;
598    ///
599    /// let server_info = client.server_vendor_info(None).await?;
600    /// println!(
601    ///     "Server: {}, Version: {}",
602    ///     server_info.server_name, server_info.version
603    /// );
604    /// # anyhow::Ok(()) };
605    /// ```
606    #[cfg(feature = "federation-api")]
607    pub async fn server_vendor_info(
608        &self,
609        request_config: Option<RequestConfig>,
610    ) -> HttpResult<ServerVendorInfo> {
611        use ruma::api::federation::discovery::get_server_version;
612
613        let res = self
614            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
615            .await?;
616
617        // Extract server info, using defaults if fields are missing.
618        let server = res.server.unwrap_or_default();
619        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
620        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
621
622        Ok(ServerVendorInfo { server_name: server_name_str, version })
623    }
624
625    /// Get a copy of the default request config.
626    ///
627    /// The default request config is what's used when sending requests if no
628    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
629    /// function with such a parameter.
630    ///
631    /// If the default request config was not customized through
632    /// [`ClientBuilder`] when creating this `Client`, the returned value will
633    /// be equivalent to [`RequestConfig::default()`].
634    pub fn request_config(&self) -> RequestConfig {
635        self.inner.http_client.request_config
636    }
637
638    /// Check whether the client has been activated.
639    ///
640    /// A client is considered active when:
641    ///
642    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
643    ///    is logged in,
644    /// 2. Has loaded cached data from storage,
645    /// 3. If encryption is enabled, it also initialized or restored its
646    ///    `OlmMachine`.
647    pub fn is_active(&self) -> bool {
648        self.inner.base_client.is_active()
649    }
650
651    /// The server used by the client.
652    ///
653    /// See `Self::server` to learn more.
654    pub fn server(&self) -> Option<&Url> {
655        self.inner.server.as_ref()
656    }
657
658    /// The homeserver of the client.
659    pub fn homeserver(&self) -> Url {
660        self.inner.homeserver.read().unwrap().clone()
661    }
662
663    /// Get the sliding sync version.
664    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
665        self.inner.sliding_sync_version.read().unwrap().clone()
666    }
667
668    /// Override the sliding sync version.
669    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
670        let mut lock = self.inner.sliding_sync_version.write().unwrap();
671        *lock = version;
672    }
673
674    /// Get the Matrix user session meta information.
675    ///
676    /// If the client is currently logged in, this will return a
677    /// [`SessionMeta`] object which contains the user ID and device ID.
678    /// Otherwise it returns `None`.
679    pub fn session_meta(&self) -> Option<&SessionMeta> {
680        self.base_client().session_meta()
681    }
682
683    /// Returns a receiver that gets events for each room info update. To watch
684    /// for new events, use `receiver.resubscribe()`.
685    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
686        self.base_client().room_info_notable_update_receiver()
687    }
688
689    /// Performs a search for users.
690    /// The search is performed case-insensitively on user IDs and display names
691    ///
692    /// # Arguments
693    ///
694    /// * `search_term` - The search term for the search
695    /// * `limit` - The maximum number of results to return. Defaults to 10.
696    ///
697    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
698    pub async fn search_users(
699        &self,
700        search_term: &str,
701        limit: u64,
702    ) -> HttpResult<search_users::v3::Response> {
703        let mut request = search_users::v3::Request::new(search_term.to_owned());
704
705        if let Some(limit) = UInt::new(limit) {
706            request.limit = limit;
707        }
708
709        self.send(request).await
710    }
711
712    /// Get the user id of the current owner of the client.
713    pub fn user_id(&self) -> Option<&UserId> {
714        self.session_meta().map(|s| s.user_id.as_ref())
715    }
716
717    /// Get the device ID that identifies the current session.
718    pub fn device_id(&self) -> Option<&DeviceId> {
719        self.session_meta().map(|s| s.device_id.as_ref())
720    }
721
722    /// Get the current access token for this session.
723    ///
724    /// Will be `None` if the client has not been logged in.
725    pub fn access_token(&self) -> Option<String> {
726        self.auth_ctx().access_token()
727    }
728
729    /// Get the current tokens for this session.
730    ///
731    /// To be notified of changes in the session tokens, use
732    /// [`Client::subscribe_to_session_changes()`] or
733    /// [`Client::set_session_callbacks()`].
734    ///
735    /// Returns `None` if the client has not been logged in.
736    pub fn session_tokens(&self) -> Option<SessionTokens> {
737        self.auth_ctx().session_tokens()
738    }
739
740    /// Access the authentication API used to log in this client.
741    ///
742    /// Will be `None` if the client has not been logged in.
743    pub fn auth_api(&self) -> Option<AuthApi> {
744        match self.auth_ctx().auth_data.get()? {
745            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
746            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
747        }
748    }
749
750    /// Get the whole session info of this client.
751    ///
752    /// Will be `None` if the client has not been logged in.
753    ///
754    /// Can be used with [`Client::restore_session`] to restore a previously
755    /// logged-in session.
756    pub fn session(&self) -> Option<AuthSession> {
757        match self.auth_api()? {
758            AuthApi::Matrix(api) => api.session().map(Into::into),
759            AuthApi::OAuth(api) => api.full_session().map(Into::into),
760        }
761    }
762
763    /// Get a reference to the state store.
764    pub fn state_store(&self) -> &DynStateStore {
765        self.base_client().state_store()
766    }
767
768    /// Get a reference to the event cache store.
769    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
770        self.base_client().event_cache_store()
771    }
772
773    /// Get a reference to the media store.
774    pub fn media_store(&self) -> &MediaStoreLock {
775        self.base_client().media_store()
776    }
777
778    /// Access the native Matrix authentication API with this client.
779    pub fn matrix_auth(&self) -> MatrixAuth {
780        MatrixAuth::new(self.clone())
781    }
782
783    /// Get the account of the current owner of the client.
784    pub fn account(&self) -> Account {
785        Account::new(self.clone())
786    }
787
788    /// Get the encryption manager of the client.
789    #[cfg(feature = "e2e-encryption")]
790    pub fn encryption(&self) -> Encryption {
791        Encryption::new(self.clone())
792    }
793
794    /// Get the media manager of the client.
795    pub fn media(&self) -> Media {
796        Media::new(self.clone())
797    }
798
799    /// Get the pusher manager of the client.
800    pub fn pusher(&self) -> Pusher {
801        Pusher::new(self.clone())
802    }
803
804    /// Access the OAuth 2.0 API of the client.
805    pub fn oauth(&self) -> OAuth {
806        OAuth::new(self.clone())
807    }
808
809    /// Register a handler for a specific event type.
810    ///
811    /// The handler is a function or closure with one or more arguments. The
812    /// first argument is the event itself. All additional arguments are
813    /// "context" arguments: They have to implement [`EventHandlerContext`].
814    /// This trait is named that way because most of the types implementing it
815    /// give additional context about an event: The room it was in, its raw form
816    /// and other similar things. As two exceptions to this,
817    /// [`Client`] and [`EventHandlerHandle`] also implement the
818    /// `EventHandlerContext` trait so you don't have to clone your client
819    /// into the event handler manually and a handler can decide to remove
820    /// itself.
821    ///
822    /// Some context arguments are not universally applicable. A context
823    /// argument that isn't available for the given event type will result in
824    /// the event handler being skipped and an error being logged. The following
825    /// context argument types are only available for a subset of event types:
826    ///
827    /// * [`Room`] is only available for room-specific events, i.e. not for
828    ///   events like global account data events or presence events.
829    ///
830    /// You can provide custom context via
831    /// [`add_event_handler_context`](Client::add_event_handler_context) and
832    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
833    /// into the event handler.
834    ///
835    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
836    ///
837    /// # Examples
838    ///
839    /// ```no_run
840    /// use matrix_sdk::{
841    ///     deserialized_responses::EncryptionInfo,
842    ///     event_handler::Ctx,
843    ///     ruma::{
844    ///         events::{
845    ///             macros::EventContent,
846    ///             push_rules::PushRulesEvent,
847    ///             room::{
848    ///                 message::SyncRoomMessageEvent,
849    ///                 topic::SyncRoomTopicEvent,
850    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
851    ///             },
852    ///         },
853    ///         push::Action,
854    ///         Int, MilliSecondsSinceUnixEpoch,
855    ///     },
856    ///     Client, Room,
857    /// };
858    /// use serde::{Deserialize, Serialize};
859    ///
860    /// # async fn example(client: Client) {
861    /// client.add_event_handler(
862    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
863    ///         // Common usage: Room event plus room and client.
864    ///     },
865    /// );
866    /// client.add_event_handler(
867    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
868    ///         async move {
869    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
870    ///             // unencrypted events and events that were decrypted by the SDK.
871    ///         }
872    ///     },
873    /// );
874    /// client.add_event_handler(
875    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
876    ///         async move {
877    ///             // A `Vec<Action>` parameter allows you to know which push actions
878    ///             // are applicable for an event. For example, an event with
879    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
880    ///             // in the timeline.
881    ///         }
882    ///     },
883    /// );
884    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
885    ///     // You can omit any or all arguments after the first.
886    /// });
887    ///
888    /// // Registering a temporary event handler:
889    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
890    ///     /* Event handler */
891    /// });
892    /// client.remove_event_handler(handle);
893    ///
894    /// // Registering custom event handler context:
895    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
896    /// struct MyContext {
897    ///     number: usize,
898    /// }
899    /// client.add_event_handler_context(MyContext { number: 5 });
900    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
901    ///     // Use the context
902    /// });
903    ///
904    /// // This will handle membership events in joined rooms. Invites are special, see below.
905    /// client.add_event_handler(
906    ///     |ev: SyncRoomMemberEvent| async move {},
907    /// );
908    ///
909    /// // To handle state events in invited rooms (including invite membership events),
910    /// // `StrippedRoomMemberEvent` should be used.
911    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
912    /// client.add_event_handler(
913    ///     |ev: StrippedRoomMemberEvent| async move {},
914    /// );
915    ///
916    /// // Custom events work exactly the same way, you just need to declare
917    /// // the content struct and use the EventContent derive macro on it.
918    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
919    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
920    /// struct TokenEventContent {
921    ///     token: String,
922    ///     #[serde(rename = "exp")]
923    ///     expires_at: MilliSecondsSinceUnixEpoch,
924    /// }
925    ///
926    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
927    ///     todo!("Display the token");
928    /// });
929    ///
930    /// // Event handler closures can also capture local variables.
931    /// // Make sure they are cheap to clone though, because they will be cloned
932    /// // every time the closure is called.
933    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
934    ///
935    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
936    ///     println!("Calling the handler with identifier {data}");
937    /// });
938    /// # }
939    /// ```
940    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
941    where
942        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
943        H: EventHandler<Ev, Ctx>,
944    {
945        self.add_event_handler_impl(handler, None)
946    }
947
948    /// Register a handler for a specific room, and event type.
949    ///
950    /// This method works the same way as
951    /// [`add_event_handler`][Self::add_event_handler], except that the handler
952    /// will only be called for events in the room with the specified ID. See
953    /// that method for more details on event handler functions.
954    ///
955    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
956    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
957    /// your use case.
958    pub fn add_room_event_handler<Ev, Ctx, H>(
959        &self,
960        room_id: &RoomId,
961        handler: H,
962    ) -> EventHandlerHandle
963    where
964        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
965        H: EventHandler<Ev, Ctx>,
966    {
967        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
968    }
969
970    /// Observe a specific event type.
971    ///
972    /// `Ev` represents the kind of event that will be observed. `Ctx`
973    /// represents the context that will come with the event. It relies on the
974    /// same mechanism as [`Client::add_event_handler`]. The main difference is
975    /// that it returns an [`ObservableEventHandler`] and doesn't require a
976    /// user-defined closure. It is possible to subscribe to the
977    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
978    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
979    /// Ctx)`.
980    ///
981    /// Be careful that only the most recent value can be observed. Subscribers
982    /// are notified when a new value is sent, but there is no guarantee
983    /// that they will see all values.
984    ///
985    /// # Example
986    ///
987    /// Let's see a classical usage:
988    ///
989    /// ```
990    /// use futures_util::StreamExt as _;
991    /// use matrix_sdk::{
992    ///     Client, Room,
993    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
994    /// };
995    ///
996    /// # async fn example(client: Client) -> Option<()> {
997    /// let observer =
998    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
999    ///
1000    /// let mut subscriber = observer.subscribe();
1001    ///
1002    /// let (event, (room, push_actions)) = subscriber.next().await?;
1003    /// # Some(())
1004    /// # }
1005    /// ```
1006    ///
1007    /// Now let's see how to get several contexts that can be useful for you:
1008    ///
1009    /// ```
1010    /// use matrix_sdk::{
1011    ///     Client, Room,
1012    ///     deserialized_responses::EncryptionInfo,
1013    ///     ruma::{
1014    ///         events::room::{
1015    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1016    ///         },
1017    ///         push::Action,
1018    ///     },
1019    /// };
1020    ///
1021    /// # async fn example(client: Client) {
1022    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1023    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1024    ///
1025    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1026    /// // to distinguish between unencrypted events and events that were decrypted
1027    /// // by the SDK.
1028    /// let _ = client
1029    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1030    ///     );
1031    ///
1032    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1033    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1034    /// // should be highlighted in the timeline.
1035    /// let _ =
1036    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1037    ///
1038    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1039    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1040    /// # }
1041    /// ```
1042    ///
1043    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1044    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1045    where
1046        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1047        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1048    {
1049        self.observe_room_events_impl(None)
1050    }
1051
1052    /// Observe a specific room, and event type.
1053    ///
1054    /// This method works the same way as [`Client::observe_events`], except
1055    /// that the observability will only be applied for events in the room with
1056    /// the specified ID. See that method for more details.
1057    ///
1058    /// Be careful that only the most recent value can be observed. Subscribers
1059    /// are notified when a new value is sent, but there is no guarantee
1060    /// that they will see all values.
1061    pub fn observe_room_events<Ev, Ctx>(
1062        &self,
1063        room_id: &RoomId,
1064    ) -> ObservableEventHandler<(Ev, Ctx)>
1065    where
1066        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1067        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1068    {
1069        self.observe_room_events_impl(Some(room_id.to_owned()))
1070    }
1071
1072    /// Shared implementation for `Client::observe_events` and
1073    /// `Client::observe_room_events`.
1074    fn observe_room_events_impl<Ev, Ctx>(
1075        &self,
1076        room_id: Option<OwnedRoomId>,
1077    ) -> ObservableEventHandler<(Ev, Ctx)>
1078    where
1079        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1080        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1081    {
1082        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1083        // new value.
1084        let shared_observable = SharedObservable::new(None);
1085
1086        ObservableEventHandler::new(
1087            shared_observable.clone(),
1088            self.event_handler_drop_guard(self.add_event_handler_impl(
1089                move |event: Ev, context: Ctx| {
1090                    shared_observable.set(Some((event, context)));
1091
1092                    ready(())
1093                },
1094                room_id,
1095            )),
1096        )
1097    }
1098
1099    /// Remove the event handler associated with the handle.
1100    ///
1101    /// Note that you **must not** call `remove_event_handler` from the
1102    /// non-async part of an event handler, that is:
1103    ///
1104    /// ```ignore
1105    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1106    ///     // ⚠ this will cause a deadlock ⚠
1107    ///     client.remove_event_handler(handle);
1108    ///
1109    ///     async move {
1110    ///         // removing the event handler here is fine
1111    ///         client.remove_event_handler(handle);
1112    ///     }
1113    /// })
1114    /// ```
1115    ///
1116    /// Note also that handlers that remove themselves will still execute with
1117    /// events received in the same sync cycle.
1118    ///
1119    /// # Arguments
1120    ///
1121    /// `handle` - The [`EventHandlerHandle`] that is returned when
1122    /// registering the event handler with [`Client::add_event_handler`].
1123    ///
1124    /// # Examples
1125    ///
1126    /// ```no_run
1127    /// # use url::Url;
1128    /// # use tokio::sync::mpsc;
1129    /// #
1130    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1131    /// #
1132    /// use matrix_sdk::{
1133    ///     Client, event_handler::EventHandlerHandle,
1134    ///     ruma::events::room::member::SyncRoomMemberEvent,
1135    /// };
1136    /// #
1137    /// # futures_executor::block_on(async {
1138    /// # let client = matrix_sdk::Client::builder()
1139    /// #     .homeserver_url(homeserver)
1140    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1141    /// #     .build()
1142    /// #     .await
1143    /// #     .unwrap();
1144    ///
1145    /// client.add_event_handler(
1146    ///     |ev: SyncRoomMemberEvent,
1147    ///      client: Client,
1148    ///      handle: EventHandlerHandle| async move {
1149    ///         // Common usage: Check arriving Event is the expected one
1150    ///         println!("Expected RoomMemberEvent received!");
1151    ///         client.remove_event_handler(handle);
1152    ///     },
1153    /// );
1154    /// # });
1155    /// ```
1156    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1157        self.inner.event_handlers.remove(handle);
1158    }
1159
1160    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1161    /// the given handle.
1162    ///
1163    /// When the returned value is dropped, the event handler will be removed.
1164    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1165        EventHandlerDropGuard::new(handle, self.clone())
1166    }
1167
1168    /// Add an arbitrary value for use as event handler context.
1169    ///
1170    /// The value can be obtained in an event handler by adding an argument of
1171    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1172    ///
1173    /// If a value of the same type has been added before, it will be
1174    /// overwritten.
1175    ///
1176    /// # Examples
1177    ///
1178    /// ```no_run
1179    /// use matrix_sdk::{
1180    ///     Room, event_handler::Ctx,
1181    ///     ruma::events::room::message::SyncRoomMessageEvent,
1182    /// };
1183    /// # #[derive(Clone)]
1184    /// # struct SomeType;
1185    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1186    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1187    /// # futures_executor::block_on(async {
1188    /// # let client = matrix_sdk::Client::builder()
1189    /// #     .homeserver_url(homeserver)
1190    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1191    /// #     .build()
1192    /// #     .await
1193    /// #     .unwrap();
1194    ///
1195    /// // Handle used to send messages to the UI part of the app
1196    /// let my_gui_handle: SomeType = obtain_gui_handle();
1197    ///
1198    /// client.add_event_handler_context(my_gui_handle.clone());
1199    /// client.add_event_handler(
1200    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1201    ///         async move {
1202    ///             // gui_handle.send(DisplayMessage { message: ev });
1203    ///         }
1204    ///     },
1205    /// );
1206    /// # });
1207    /// ```
1208    pub fn add_event_handler_context<T>(&self, ctx: T)
1209    where
1210        T: Clone + Send + Sync + 'static,
1211    {
1212        self.inner.event_handlers.add_context(ctx);
1213    }
1214
1215    /// Register a handler for a notification.
1216    ///
1217    /// Similar to [`Client::add_event_handler`], but only allows functions
1218    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1219    /// [`Client`] for now.
1220    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1221    where
1222        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1223        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1224    {
1225        self.inner.notification_handlers.write().await.push(Box::new(
1226            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1227        ));
1228
1229        self
1230    }
1231
1232    /// Subscribe to all updates for the room with the given ID.
1233    ///
1234    /// The returned receiver will receive a new message for each sync response
1235    /// that contains updates for that room.
1236    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1237        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1238            btree_map::Entry::Vacant(entry) => {
1239                let (tx, rx) = broadcast::channel(8);
1240                entry.insert(tx);
1241                rx
1242            }
1243            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1244        }
1245    }
1246
1247    /// Subscribe to all updates to all rooms, whenever any has been received in
1248    /// a sync response.
1249    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1250        self.inner.room_updates_sender.subscribe()
1251    }
1252
1253    pub(crate) async fn notification_handlers(
1254        &self,
1255    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1256        self.inner.notification_handlers.read().await
1257    }
1258
1259    /// Get all the rooms the client knows about.
1260    ///
1261    /// This will return the list of joined, invited, and left rooms.
1262    pub fn rooms(&self) -> Vec<Room> {
1263        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1264    }
1265
1266    /// Get all the rooms the client knows about, filtered by room state.
1267    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1268        self.base_client()
1269            .rooms_filtered(filter)
1270            .into_iter()
1271            .map(|room| Room::new(self.clone(), room))
1272            .collect()
1273    }
1274
1275    /// Get a stream of all the rooms, in addition to the existing rooms.
1276    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1277        let (rooms, stream) = self.base_client().rooms_stream();
1278
1279        let map_room = |room| Room::new(self.clone(), room);
1280
1281        (
1282            rooms.into_iter().map(map_room).collect(),
1283            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1284        )
1285    }
1286
1287    /// Returns the joined rooms this client knows about.
1288    pub fn joined_rooms(&self) -> Vec<Room> {
1289        self.rooms_filtered(RoomStateFilter::JOINED)
1290    }
1291
1292    /// Returns the invited rooms this client knows about.
1293    pub fn invited_rooms(&self) -> Vec<Room> {
1294        self.rooms_filtered(RoomStateFilter::INVITED)
1295    }
1296
1297    /// Returns the left rooms this client knows about.
1298    pub fn left_rooms(&self) -> Vec<Room> {
1299        self.rooms_filtered(RoomStateFilter::LEFT)
1300    }
1301
1302    /// Returns the joined space rooms this client knows about.
1303    pub fn joined_space_rooms(&self) -> Vec<Room> {
1304        self.base_client()
1305            .rooms_filtered(RoomStateFilter::JOINED)
1306            .into_iter()
1307            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1308            .collect()
1309    }
1310
1311    /// Get a room with the given room id.
1312    ///
1313    /// # Arguments
1314    ///
1315    /// `room_id` - The unique id of the room that should be fetched.
1316    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1317        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1318    }
1319
1320    /// Gets the preview of a room, whether the current user has joined it or
1321    /// not.
1322    pub async fn get_room_preview(
1323        &self,
1324        room_or_alias_id: &RoomOrAliasId,
1325        via: Vec<OwnedServerName>,
1326    ) -> Result<RoomPreview> {
1327        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1328            Ok(room_id) => room_id.to_owned(),
1329            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1330        };
1331
1332        if let Some(room) = self.get_room(&room_id) {
1333            // The cached data can only be trusted if the room state is joined or
1334            // banned: for invite and knock rooms, no updates will be received
1335            // for the rooms after the invite/knock action took place so we may
1336            // have very out to date data for important fields such as
1337            // `join_rule`. For left rooms, the homeserver should return the latest info.
1338            match room.state() {
1339                RoomState::Joined | RoomState::Banned => {
1340                    return Ok(RoomPreview::from_known_room(&room).await);
1341                }
1342                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1343            }
1344        }
1345
1346        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1347    }
1348
1349    /// Resolve a room alias to a room id and a list of servers which know
1350    /// about it.
1351    ///
1352    /// # Arguments
1353    ///
1354    /// `room_alias` - The room alias to be resolved.
1355    pub async fn resolve_room_alias(
1356        &self,
1357        room_alias: &RoomAliasId,
1358    ) -> HttpResult<get_alias::v3::Response> {
1359        let request = get_alias::v3::Request::new(room_alias.to_owned());
1360        self.send(request).await
1361    }
1362
1363    /// Checks if a room alias is not in use yet.
1364    ///
1365    /// Returns:
1366    /// - `Ok(true)` if the room alias is available.
1367    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1368    ///   status code).
1369    /// - An `Err` otherwise.
1370    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1371        match self.resolve_room_alias(alias).await {
1372            // The room alias was resolved, so it's already in use.
1373            Ok(_) => Ok(false),
1374            Err(error) => {
1375                match error.client_api_error_kind() {
1376                    // The room alias wasn't found, so it's available.
1377                    Some(ErrorKind::NotFound) => Ok(true),
1378                    _ => Err(error),
1379                }
1380            }
1381        }
1382    }
1383
1384    /// Adds a new room alias associated with a room to the room directory.
1385    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1386        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1387        self.send(request).await?;
1388        Ok(())
1389    }
1390
1391    /// Removes a room alias from the room directory.
1392    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1393        let request = delete_alias::v3::Request::new(alias.to_owned());
1394        self.send(request).await?;
1395        Ok(())
1396    }
1397
1398    /// Update the homeserver from the login response well-known if needed.
1399    ///
1400    /// # Arguments
1401    ///
1402    /// * `login_well_known` - The `well_known` field from a successful login
1403    ///   response.
1404    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1405        if self.inner.respect_login_well_known
1406            && let Some(well_known) = login_well_known
1407            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1408        {
1409            self.set_homeserver(homeserver);
1410        }
1411    }
1412
1413    /// Similar to [`Client::restore_session_with`], with
1414    /// [`RoomLoadSettings::default()`].
1415    ///
1416    /// # Panics
1417    ///
1418    /// Panics if a session was already restored or logged in.
1419    #[instrument(skip_all)]
1420    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1421        self.restore_session_with(session, RoomLoadSettings::default()).await
1422    }
1423
1424    /// Restore a session previously logged-in using one of the available
1425    /// authentication APIs. The number of rooms to restore is controlled by
1426    /// [`RoomLoadSettings`].
1427    ///
1428    /// See the documentation of the corresponding authentication API's
1429    /// `restore_session` method for more information.
1430    ///
1431    /// # Panics
1432    ///
1433    /// Panics if a session was already restored or logged in.
1434    #[instrument(skip_all)]
1435    pub async fn restore_session_with(
1436        &self,
1437        session: impl Into<AuthSession>,
1438        room_load_settings: RoomLoadSettings,
1439    ) -> Result<()> {
1440        let session = session.into();
1441        match session {
1442            AuthSession::Matrix(session) => {
1443                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1444            }
1445            AuthSession::OAuth(session) => {
1446                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1447            }
1448        }
1449    }
1450
1451    /// Refresh the access token using the authentication API used to log into
1452    /// this session.
1453    ///
1454    /// See the documentation of the authentication API's `refresh_access_token`
1455    /// method for more information.
1456    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1457        let Some(auth_api) = self.auth_api() else {
1458            return Err(RefreshTokenError::RefreshTokenRequired);
1459        };
1460
1461        match auth_api {
1462            AuthApi::Matrix(api) => {
1463                trace!("Token refresh: Using the homeserver.");
1464                Box::pin(api.refresh_access_token()).await?;
1465            }
1466            AuthApi::OAuth(api) => {
1467                trace!("Token refresh: Using OAuth 2.0.");
1468                Box::pin(api.refresh_access_token()).await?;
1469            }
1470        }
1471
1472        Ok(())
1473    }
1474
1475    /// Log out the current session using the proper authentication API.
1476    ///
1477    /// # Errors
1478    ///
1479    /// Returns an error if the session is not authenticated or if an error
1480    /// occurred while making the request to the server.
1481    pub async fn logout(&self) -> Result<(), Error> {
1482        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1483        match auth_api {
1484            AuthApi::Matrix(matrix_auth) => {
1485                matrix_auth.logout().await?;
1486                Ok(())
1487            }
1488            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1489        }
1490    }
1491
1492    /// Get or upload a sync filter.
1493    ///
1494    /// This method will either get a filter ID from the store or upload the
1495    /// filter definition to the homeserver and return the new filter ID.
1496    ///
1497    /// # Arguments
1498    ///
1499    /// * `filter_name` - The unique name of the filter, this name will be used
1500    /// locally to store and identify the filter ID returned by the server.
1501    ///
1502    /// * `definition` - The filter definition that should be uploaded to the
1503    /// server if no filter ID can be found in the store.
1504    ///
1505    /// # Examples
1506    ///
1507    /// ```no_run
1508    /// # use matrix_sdk::{
1509    /// #    Client, config::SyncSettings,
1510    /// #    ruma::api::client::{
1511    /// #        filter::{
1512    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1513    /// #        },
1514    /// #        sync::sync_events::v3::Filter,
1515    /// #    }
1516    /// # };
1517    /// # use url::Url;
1518    /// # async {
1519    /// # let homeserver = Url::parse("http://example.com").unwrap();
1520    /// # let client = Client::new(homeserver).await.unwrap();
1521    /// let mut filter = FilterDefinition::default();
1522    ///
1523    /// // Let's enable member lazy loading.
1524    /// filter.room.state.lazy_load_options =
1525    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1526    ///
1527    /// let filter_id = client
1528    ///     .get_or_upload_filter("sync", filter)
1529    ///     .await
1530    ///     .unwrap();
1531    ///
1532    /// let sync_settings = SyncSettings::new()
1533    ///     .filter(Filter::FilterId(filter_id));
1534    ///
1535    /// let response = client.sync_once(sync_settings).await.unwrap();
1536    /// # };
1537    #[instrument(skip(self, definition))]
1538    pub async fn get_or_upload_filter(
1539        &self,
1540        filter_name: &str,
1541        definition: FilterDefinition,
1542    ) -> Result<String> {
1543        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1544            debug!("Found filter locally");
1545            Ok(filter)
1546        } else {
1547            debug!("Didn't find filter locally");
1548            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1549            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1550            let response = self.send(request).await?;
1551
1552            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1553
1554            Ok(response.filter_id)
1555        }
1556    }
1557
1558    /// Prepare to join a room by ID, by getting the current details about it
1559    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1560        let room = self.get_room(room_id)?;
1561
1562        let inviter = match room.invite_details().await {
1563            Ok(details) => details.inviter,
1564            Err(Error::WrongRoomState(_)) => None,
1565            Err(e) => {
1566                warn!("Error fetching invite details for room: {e:?}");
1567                None
1568            }
1569        };
1570
1571        Some(PreJoinRoomInfo { inviter })
1572    }
1573
1574    /// Finish joining a room.
1575    ///
1576    /// If the room was an invite that should be marked as a DM, will include it
1577    /// in the DM event after creating the joined room.
1578    ///
1579    /// If encrypted history sharing is enabled, will check to see if we have a
1580    /// key bundle, and import it if so.
1581    ///
1582    /// # Arguments
1583    ///
1584    /// * `room_id` - The `RoomId` of the room that was joined.
1585    /// * `pre_join_room_info` - Information about the room before we joined.
1586    async fn finish_join_room(
1587        &self,
1588        room_id: &RoomId,
1589        pre_join_room_info: Option<PreJoinRoomInfo>,
1590    ) -> Result<Room> {
1591        info!(?room_id, ?pre_join_room_info, "Completing room join");
1592        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1593            room.state() == RoomState::Invited
1594                && room.is_direct().await.unwrap_or_else(|e| {
1595                    warn!(%room_id, "is_direct() failed: {e}");
1596                    false
1597                })
1598        } else {
1599            false
1600        };
1601
1602        let base_room = self
1603            .base_client()
1604            .room_joined(
1605                room_id,
1606                pre_join_room_info
1607                    .as_ref()
1608                    .and_then(|info| info.inviter.as_ref())
1609                    .map(|i| i.user_id().to_owned()),
1610            )
1611            .await?;
1612        let room = Room::new(self.clone(), base_room);
1613
1614        if mark_as_dm {
1615            room.set_is_direct(true).await?;
1616        }
1617
1618        #[cfg(feature = "e2e-encryption")]
1619        if self.inner.enable_share_history_on_invite
1620            && let Some(inviter) =
1621                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1622        {
1623            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1624                .await?;
1625        }
1626
1627        // Suppress "unused variable" and "unused field" lints
1628        #[cfg(not(feature = "e2e-encryption"))]
1629        let _ = pre_join_room_info.map(|i| i.inviter);
1630
1631        Ok(room)
1632    }
1633
1634    /// Join a room by `RoomId`.
1635    ///
1636    /// Returns the `Room` in the joined state.
1637    ///
1638    /// # Arguments
1639    ///
1640    /// * `room_id` - The `RoomId` of the room to be joined.
1641    #[instrument(skip(self))]
1642    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1643        // See who invited us to this room, if anyone. Note we have to do this before
1644        // making the `/join` request, otherwise we could race against the sync.
1645        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1646
1647        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1648        let response = self.send(request).await?;
1649        self.finish_join_room(&response.room_id, pre_join_info).await
1650    }
1651
1652    /// Join a room by `RoomOrAliasId`.
1653    ///
1654    /// Returns the `Room` in the joined state.
1655    ///
1656    /// # Arguments
1657    ///
1658    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1659    ///   alias looks like `#name:example.com`.
1660    /// * `server_names` - The server names to be used for resolving the alias,
1661    ///   if needs be.
1662    #[instrument(skip(self))]
1663    pub async fn join_room_by_id_or_alias(
1664        &self,
1665        alias: &RoomOrAliasId,
1666        server_names: &[OwnedServerName],
1667    ) -> Result<Room> {
1668        let pre_join_info = {
1669            match alias.try_into() {
1670                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1671                Err(_) => {
1672                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1673                    // responding to an invitation to the room, and therefore don't need to handle
1674                    // things that happen as a result of invites.
1675                    None
1676                }
1677            }
1678        };
1679        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1680            via: server_names.to_owned(),
1681        });
1682        let response = self.send(request).await?;
1683        self.finish_join_room(&response.room_id, pre_join_info).await
1684    }
1685
1686    /// Search the homeserver's directory of public rooms.
1687    ///
1688    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1689    /// a `get_public_rooms::Response`.
1690    ///
1691    /// # Arguments
1692    ///
1693    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1694    ///
1695    /// * `since` - Pagination token from a previous request.
1696    ///
1697    /// * `server` - The name of the server, if `None` the requested server is
1698    ///   used.
1699    ///
1700    /// # Examples
1701    /// ```no_run
1702    /// use matrix_sdk::Client;
1703    /// # use url::Url;
1704    /// # let homeserver = Url::parse("http://example.com").unwrap();
1705    /// # let limit = Some(10);
1706    /// # let since = Some("since token");
1707    /// # let server = Some("servername.com".try_into().unwrap());
1708    /// # async {
1709    /// let mut client = Client::new(homeserver).await.unwrap();
1710    ///
1711    /// client.public_rooms(limit, since, server).await;
1712    /// # };
1713    /// ```
1714    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1715    pub async fn public_rooms(
1716        &self,
1717        limit: Option<u32>,
1718        since: Option<&str>,
1719        server: Option<&ServerName>,
1720    ) -> HttpResult<get_public_rooms::v3::Response> {
1721        let limit = limit.map(UInt::from);
1722
1723        let request = assign!(get_public_rooms::v3::Request::new(), {
1724            limit,
1725            since: since.map(ToOwned::to_owned),
1726            server: server.map(ToOwned::to_owned),
1727        });
1728        self.send(request).await
1729    }
1730
1731    /// Create a room with the given parameters.
1732    ///
1733    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1734    /// created room.
1735    ///
1736    /// If you want to create a direct message with one specific user, you can
1737    /// use [`create_dm`][Self::create_dm], which is more convenient than
1738    /// assembling the [`create_room::v3::Request`] yourself.
1739    ///
1740    /// If the `is_direct` field of the request is set to `true` and at least
1741    /// one user is invited, the room will be automatically added to the direct
1742    /// rooms in the account data.
1743    ///
1744    /// # Examples
1745    ///
1746    /// ```no_run
1747    /// use matrix_sdk::{
1748    ///     Client,
1749    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1750    /// };
1751    /// # use url::Url;
1752    /// #
1753    /// # async {
1754    /// # let homeserver = Url::parse("http://example.com").unwrap();
1755    /// let request = CreateRoomRequest::new();
1756    /// let client = Client::new(homeserver).await.unwrap();
1757    /// assert!(client.create_room(request).await.is_ok());
1758    /// # };
1759    /// ```
1760    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1761        let invite = request.invite.clone();
1762        let is_direct_room = request.is_direct;
1763        let response = self.send(request).await?;
1764
1765        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1766
1767        let joined_room = Room::new(self.clone(), base_room);
1768
1769        if is_direct_room
1770            && !invite.is_empty()
1771            && let Err(error) =
1772                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1773        {
1774            // FIXME: Retry in the background
1775            error!("Failed to mark room as DM: {error}");
1776        }
1777
1778        Ok(joined_room)
1779    }
1780
1781    /// Create a DM room.
1782    ///
1783    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1784    /// given user being invited, the room marked `is_direct` and both the
1785    /// creator and invitee getting the default maximum power level.
1786    ///
1787    /// If the `e2e-encryption` feature is enabled, the room will also be
1788    /// encrypted.
1789    ///
1790    /// # Arguments
1791    ///
1792    /// * `user_id` - The ID of the user to create a DM for.
1793    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1794        #[cfg(feature = "e2e-encryption")]
1795        let initial_state = vec![
1796            InitialStateEvent::with_empty_state_key(
1797                RoomEncryptionEventContent::with_recommended_defaults(),
1798            )
1799            .to_raw_any(),
1800        ];
1801
1802        #[cfg(not(feature = "e2e-encryption"))]
1803        let initial_state = vec![];
1804
1805        let request = assign!(create_room::v3::Request::new(), {
1806            invite: vec![user_id.to_owned()],
1807            is_direct: true,
1808            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1809            initial_state,
1810        });
1811
1812        self.create_room(request).await
1813    }
1814
1815    /// Get the existing DM room with the given user, if any.
1816    pub fn get_dm_room(&self, user_id: &UserId) -> Option<Room> {
1817        let rooms = self.joined_rooms();
1818
1819        // Find the room we share with the `user_id` and only with `user_id`
1820        let room = rooms.into_iter().find(|r| {
1821            let targets = r.direct_targets();
1822            targets.len() == 1 && targets.contains(<&DirectUserIdentifier>::from(user_id))
1823        });
1824
1825        trace!(?user_id, ?room, "Found DM room with user");
1826        room
1827    }
1828
1829    /// Search the homeserver's directory for public rooms with a filter.
1830    ///
1831    /// # Arguments
1832    ///
1833    /// * `room_search` - The easiest way to create this request is using the
1834    ///   `get_public_rooms_filtered::Request` itself.
1835    ///
1836    /// # Examples
1837    ///
1838    /// ```no_run
1839    /// # use url::Url;
1840    /// # use matrix_sdk::Client;
1841    /// # async {
1842    /// # let homeserver = Url::parse("http://example.com")?;
1843    /// use matrix_sdk::ruma::{
1844    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1845    /// };
1846    /// # let mut client = Client::new(homeserver).await?;
1847    ///
1848    /// let mut filter = Filter::new();
1849    /// filter.generic_search_term = Some("rust".to_owned());
1850    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1851    /// request.filter = filter;
1852    ///
1853    /// let response = client.public_rooms_filtered(request).await?;
1854    ///
1855    /// for room in response.chunk {
1856    ///     println!("Found room {room:?}");
1857    /// }
1858    /// # anyhow::Ok(()) };
1859    /// ```
1860    pub async fn public_rooms_filtered(
1861        &self,
1862        request: get_public_rooms_filtered::v3::Request,
1863    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1864        self.send(request).await
1865    }
1866
1867    /// Send an arbitrary request to the server, without updating client state.
1868    ///
1869    /// **Warning:** Because this method *does not* update the client state, it
1870    /// is important to make sure that you account for this yourself, and
1871    /// use wrapper methods where available.  This method should *only* be
1872    /// used if a wrapper method for the endpoint you'd like to use is not
1873    /// available.
1874    ///
1875    /// # Arguments
1876    ///
1877    /// * `request` - A filled out and valid request for the endpoint to be hit
1878    ///
1879    /// * `timeout` - An optional request timeout setting, this overrides the
1880    ///   default request setting if one was set.
1881    ///
1882    /// # Examples
1883    ///
1884    /// ```no_run
1885    /// # use matrix_sdk::{Client, config::SyncSettings};
1886    /// # use url::Url;
1887    /// # async {
1888    /// # let homeserver = Url::parse("http://localhost:8080")?;
1889    /// # let mut client = Client::new(homeserver).await?;
1890    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1891    ///
1892    /// // First construct the request you want to make
1893    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1894    /// // for all available Endpoints
1895    /// let user_id = user_id!("@example:localhost").to_owned();
1896    /// let request = profile::get_profile::v3::Request::new(user_id);
1897    ///
1898    /// // Start the request using Client::send()
1899    /// let response = client.send(request).await?;
1900    ///
1901    /// // Check the corresponding Response struct to find out what types are
1902    /// // returned
1903    /// # anyhow::Ok(()) };
1904    /// ```
1905    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1906    where
1907        Request: OutgoingRequest + Clone + Debug,
1908        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1909        Request::PathBuilder: SupportedPathBuilder,
1910        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1911        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1912    {
1913        SendRequest {
1914            client: self.clone(),
1915            request,
1916            config: None,
1917            send_progress: Default::default(),
1918        }
1919    }
1920
1921    pub(crate) async fn send_inner<Request>(
1922        &self,
1923        request: Request,
1924        config: Option<RequestConfig>,
1925        send_progress: SharedObservable<TransmissionProgress>,
1926    ) -> HttpResult<Request::IncomingResponse>
1927    where
1928        Request: OutgoingRequest + Debug,
1929        for<'a> Request::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
1930        Request::PathBuilder: SupportedPathBuilder,
1931        for<'a> <Request::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
1932        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1933    {
1934        let homeserver = self.homeserver().to_string();
1935        let access_token = self.access_token();
1936        let skip_auth = config.map(|c| c.skip_auth).unwrap_or(self.request_config().skip_auth);
1937
1938        let path_builder_input =
1939            Request::PathBuilder::get_path_builder_input(self, skip_auth).await?;
1940
1941        let result = self
1942            .inner
1943            .http_client
1944            .send(
1945                request,
1946                config,
1947                homeserver,
1948                access_token.as_deref(),
1949                path_builder_input,
1950                send_progress,
1951            )
1952            .await;
1953
1954        if let Err(Some(ErrorKind::UnknownToken { .. })) =
1955            result.as_ref().map_err(HttpError::client_api_error_kind)
1956            && let Some(access_token) = &access_token
1957        {
1958            // Mark the access token as expired.
1959            self.auth_ctx().set_access_token_expired(access_token);
1960        }
1961
1962        result
1963    }
1964
1965    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1966        _ = self
1967            .inner
1968            .auth_ctx
1969            .session_change_sender
1970            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1971    }
1972
1973    /// Fetches server versions from network; no caching.
1974    pub async fn fetch_server_versions(
1975        &self,
1976        request_config: Option<RequestConfig>,
1977    ) -> HttpResult<get_supported_versions::Response> {
1978        // Since this was called by the user, try to refresh the access token if
1979        // necessary.
1980        self.fetch_server_versions_inner(false, request_config).await
1981    }
1982
1983    /// Fetches server versions from network; no caching.
1984    ///
1985    /// If the access token is expired and `failsafe` is `false`, this will
1986    /// attempt to refresh the access token, otherwise this will try to make an
1987    /// unauthenticated request instead.
1988    pub(crate) async fn fetch_server_versions_inner(
1989        &self,
1990        failsafe: bool,
1991        request_config: Option<RequestConfig>,
1992    ) -> HttpResult<get_supported_versions::Response> {
1993        if !failsafe {
1994            // `Client::send()` handles refreshing access tokens.
1995            return self
1996                .send(get_supported_versions::Request::new())
1997                .with_request_config(request_config)
1998                .await;
1999        }
2000
2001        let homeserver = self.homeserver().to_string();
2002
2003        // If we have a fresh access token, try with it first.
2004        if !request_config.as_ref().is_some_and(|config| config.skip_auth && !config.force_auth)
2005            && self.auth_ctx().has_valid_access_token()
2006            && let Some(access_token) = self.access_token()
2007        {
2008            let result = self
2009                .inner
2010                .http_client
2011                .send(
2012                    get_supported_versions::Request::new(),
2013                    request_config,
2014                    homeserver.clone(),
2015                    Some(&access_token),
2016                    (),
2017                    Default::default(),
2018                )
2019                .await;
2020
2021            if let Err(Some(ErrorKind::UnknownToken { .. })) =
2022                result.as_ref().map_err(HttpError::client_api_error_kind)
2023            {
2024                // If the access token is actually expired, mark it as expired and fallback to
2025                // the unauthenticated request below.
2026                self.auth_ctx().set_access_token_expired(&access_token);
2027            } else {
2028                // If the request succeeded or it's an other error, just stop now.
2029                return result;
2030            }
2031        }
2032
2033        // Try without authentication.
2034        self.inner
2035            .http_client
2036            .send(
2037                get_supported_versions::Request::new(),
2038                request_config,
2039                homeserver.clone(),
2040                None,
2041                (),
2042                Default::default(),
2043            )
2044            .await
2045    }
2046
2047    /// Fetches client well_known from network; no caching.
2048    ///
2049    /// 1. If the [`Client::server`] value is available, we use it to fetch the
2050    ///    well-known contents.
2051    /// 2. If it's not, we try extracting the server name from the
2052    ///    [`Client::user_id`] and building the server URL from it.
2053    /// 3. If we couldn't get the well-known contents with either the explicit
2054    ///    server name or the implicit extracted one, we try the homeserver URL
2055    ///    as a last resort.
2056    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
2057        let homeserver = self.homeserver();
2058        let scheme = homeserver.scheme();
2059
2060        // Use the server name, either an explicit one or an implicit one taken from
2061        // the user id: sometimes we'll have only the homeserver url available and no
2062        // server name, but the server name can be extracted from the current user id.
2063        let server_url = self
2064            .server()
2065            .map(|server| server.to_string())
2066            // If the server name wasn't available, extract it from the user id and build a URL:
2067            // Reuse the same scheme as the homeserver url does, assuming if it's `http` there it
2068            // will be the same for the public server url, lacking a better candidate.
2069            .or_else(|| self.user_id().map(|id| format!("{}://{}", scheme, id.server_name())));
2070
2071        // If the server name is available, first try using it
2072        let response = if let Some(server_url) = server_url {
2073            // First try using the server name
2074            self.fetch_client_well_known_with_url(server_url).await
2075        } else {
2076            None
2077        };
2078
2079        // If we didn't get a well-known value yet, try with the homeserver url instead:
2080        if response.is_none() {
2081            // Sometimes people configure their well-known directly on the homeserver so use
2082            // this as a fallback when the server name is unknown.
2083            warn!(
2084                "Fetching the well-known from the server name didn't work, using the homeserver url instead"
2085            );
2086            self.fetch_client_well_known_with_url(homeserver.to_string()).await
2087        } else {
2088            response
2089        }
2090    }
2091
2092    async fn fetch_client_well_known_with_url(
2093        &self,
2094        url: String,
2095    ) -> Option<discover_homeserver::Response> {
2096        let well_known = self
2097            .inner
2098            .http_client
2099            .send(
2100                discover_homeserver::Request::new(),
2101                Some(RequestConfig::short_retry()),
2102                url,
2103                None,
2104                (),
2105                Default::default(),
2106            )
2107            .await;
2108
2109        match well_known {
2110            Ok(well_known) => Some(well_known),
2111            Err(http_error) => {
2112                // It is perfectly valid to not have a well-known file.
2113                // Maybe we should check for a specific error code to be sure?
2114                warn!("Failed to fetch client well-known: {http_error}");
2115                None
2116            }
2117        }
2118    }
2119
2120    /// Load supported versions from storage, or fetch them from network and
2121    /// cache them.
2122    ///
2123    /// If `failsafe` is true, this will try to minimize side effects to avoid
2124    /// possible deadlocks.
2125    async fn load_or_fetch_supported_versions(
2126        &self,
2127        failsafe: bool,
2128    ) -> HttpResult<SupportedVersionsResponse> {
2129        match self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await {
2130            Ok(Some(stored)) => {
2131                if let Some(supported_versions) =
2132                    stored.into_supported_versions().and_then(|value| value.into_data())
2133                {
2134                    return Ok(supported_versions);
2135                }
2136            }
2137            Ok(None) => {
2138                // fallthrough: cache is empty
2139            }
2140            Err(err) => {
2141                warn!("error when loading cached supported versions: {err}");
2142                // fallthrough to network.
2143            }
2144        }
2145
2146        let server_versions = self.fetch_server_versions_inner(failsafe, None).await?;
2147        let supported_versions = SupportedVersionsResponse {
2148            versions: server_versions.versions,
2149            unstable_features: server_versions.unstable_features,
2150        };
2151
2152        // Only attempt to cache the result in storage if the request was authenticated.
2153        if self.auth_ctx().has_valid_access_token()
2154            && let Err(err) = self
2155                .state_store()
2156                .set_kv_data(
2157                    StateStoreDataKey::SupportedVersions,
2158                    StateStoreDataValue::SupportedVersions(TtlStoreValue::new(
2159                        supported_versions.clone(),
2160                    )),
2161                )
2162                .await
2163        {
2164            warn!("error when caching supported versions: {err}");
2165        }
2166
2167        Ok(supported_versions)
2168    }
2169
2170    pub(crate) async fn get_cached_supported_versions(&self) -> Option<SupportedVersions> {
2171        let supported_versions = &self.inner.caches.supported_versions;
2172
2173        if let CachedValue::Cached(val) = &*supported_versions.read().await {
2174            Some(val.clone())
2175        } else {
2176            None
2177        }
2178    }
2179
2180    /// Get the Matrix versions and features supported by the homeserver by
2181    /// fetching them from the server or the cache.
2182    ///
2183    /// This is equivalent to calling both [`Client::server_versions()`] and
2184    /// [`Client::unstable_features()`]. To always fetch the result from the
2185    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2186    /// and then `.as_supported_versions()` on the response.
2187    ///
2188    /// # Examples
2189    ///
2190    /// ```no_run
2191    /// use ruma::api::{FeatureFlag, MatrixVersion};
2192    /// # use matrix_sdk::{Client, config::SyncSettings};
2193    /// # use url::Url;
2194    /// # async {
2195    /// # let homeserver = Url::parse("http://localhost:8080")?;
2196    /// # let mut client = Client::new(homeserver).await?;
2197    ///
2198    /// let supported = client.supported_versions().await?;
2199    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2200    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2201    ///
2202    /// let msc_x_feature = FeatureFlag::from("msc_x");
2203    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2204    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2205    /// # anyhow::Ok(()) };
2206    /// ```
2207    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2208        self.supported_versions_inner(false).await
2209    }
2210
2211    /// Get the Matrix versions and features supported by the homeserver by
2212    /// fetching them from the server or the cache.
2213    ///
2214    /// If `failsafe` is true, this will try to minimize side effects to avoid
2215    /// possible deadlocks.
2216    pub(crate) async fn supported_versions_inner(
2217        &self,
2218        failsafe: bool,
2219    ) -> HttpResult<SupportedVersions> {
2220        let cached_supported_versions = &self.inner.caches.supported_versions;
2221        if let CachedValue::Cached(val) = &*cached_supported_versions.read().await {
2222            return Ok(val.clone());
2223        }
2224
2225        let mut guarded_supported_versions = cached_supported_versions.write().await;
2226        if let CachedValue::Cached(val) = &*guarded_supported_versions {
2227            return Ok(val.clone());
2228        }
2229
2230        let supported = self.load_or_fetch_supported_versions(failsafe).await?;
2231
2232        // Fill both unstable features and server versions at once.
2233        let mut supported_versions = supported.supported_versions();
2234        if supported_versions.versions.is_empty() {
2235            supported_versions.versions = [MatrixVersion::V1_0].into();
2236        }
2237
2238        // Only cache the result if the request was authenticated.
2239        if self.auth_ctx().has_valid_access_token() {
2240            *guarded_supported_versions = CachedValue::Cached(supported_versions.clone());
2241        }
2242
2243        Ok(supported_versions)
2244    }
2245
2246    /// Get the Matrix versions and features supported by the homeserver by
2247    /// fetching them from the cache.
2248    ///
2249    /// For a version of this function that fetches the supported versions and
2250    /// features from the homeserver if the [`SupportedVersions`] aren't
2251    /// found in the cache, take a look at the [`Client::server_versions()`]
2252    /// method.
2253    ///
2254    /// # Examples
2255    ///
2256    /// ```no_run
2257    /// use ruma::api::{FeatureFlag, MatrixVersion};
2258    /// # use matrix_sdk::{Client, config::SyncSettings};
2259    /// # use url::Url;
2260    /// # async {
2261    /// # let homeserver = Url::parse("http://localhost:8080")?;
2262    /// # let mut client = Client::new(homeserver).await?;
2263    ///
2264    /// let supported =
2265    ///     if let Some(supported) = client.supported_versions_cached().await? {
2266    ///         supported
2267    ///     } else {
2268    ///         client.fetch_server_versions(None).await?.as_supported_versions()
2269    ///     };
2270    ///
2271    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2272    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2273    ///
2274    /// let msc_x_feature = FeatureFlag::from("msc_x");
2275    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2276    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2277    /// # anyhow::Ok(()) };
2278    /// ```
2279    pub async fn supported_versions_cached(&self) -> Result<Option<SupportedVersions>, StoreError> {
2280        if let Some(cached) = self.get_cached_supported_versions().await {
2281            Ok(Some(cached))
2282        } else if let Some(stored) =
2283            self.state_store().get_kv_data(StateStoreDataKey::SupportedVersions).await?
2284        {
2285            if let Some(supported) =
2286                stored.into_supported_versions().and_then(|value| value.into_data())
2287            {
2288                Ok(Some(supported.supported_versions()))
2289            } else {
2290                Ok(None)
2291            }
2292        } else {
2293            Ok(None)
2294        }
2295    }
2296
2297    /// Get the Matrix versions supported by the homeserver by fetching them
2298    /// from the server or the cache.
2299    ///
2300    /// # Examples
2301    ///
2302    /// ```no_run
2303    /// use ruma::api::MatrixVersion;
2304    /// # use matrix_sdk::{Client, config::SyncSettings};
2305    /// # use url::Url;
2306    /// # async {
2307    /// # let homeserver = Url::parse("http://localhost:8080")?;
2308    /// # let mut client = Client::new(homeserver).await?;
2309    ///
2310    /// let server_versions = client.server_versions().await?;
2311    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2312    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2313    /// # anyhow::Ok(()) };
2314    /// ```
2315    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2316        Ok(self.supported_versions().await?.versions)
2317    }
2318
2319    /// Get the unstable features supported by the homeserver by fetching them
2320    /// from the server or the cache.
2321    ///
2322    /// # Examples
2323    ///
2324    /// ```no_run
2325    /// use matrix_sdk::ruma::api::FeatureFlag;
2326    /// # use matrix_sdk::{Client, config::SyncSettings};
2327    /// # use url::Url;
2328    /// # async {
2329    /// # let homeserver = Url::parse("http://localhost:8080")?;
2330    /// # let mut client = Client::new(homeserver).await?;
2331    ///
2332    /// let msc_x_feature = FeatureFlag::from("msc_x");
2333    /// let unstable_features = client.unstable_features().await?;
2334    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2335    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2336    /// # anyhow::Ok(()) };
2337    /// ```
2338    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2339        Ok(self.supported_versions().await?.features)
2340    }
2341
2342    /// Empty the supported versions and unstable features cache.
2343    ///
2344    /// Since the SDK caches the supported versions, it's possible to have a
2345    /// stale entry in the cache. This functions makes it possible to force
2346    /// reset it.
2347    pub async fn reset_supported_versions(&self) -> Result<()> {
2348        // Empty the in-memory caches.
2349        self.inner.caches.supported_versions.write().await.take();
2350
2351        // Empty the store cache.
2352        Ok(self.state_store().remove_kv_data(StateStoreDataKey::SupportedVersions).await?)
2353    }
2354
2355    /// Load well-known from storage, or fetch it from network and cache it.
2356    async fn load_or_fetch_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2357        match self.state_store().get_kv_data(StateStoreDataKey::WellKnown).await {
2358            Ok(Some(stored)) => {
2359                if let Some(well_known) =
2360                    stored.into_well_known().and_then(|value| value.into_data())
2361                {
2362                    return Ok(well_known);
2363                }
2364            }
2365            Ok(None) => {
2366                // fallthrough: cache is empty
2367            }
2368            Err(err) => {
2369                warn!("error when loading cached well-known: {err}");
2370                // fallthrough to network.
2371            }
2372        }
2373
2374        let well_known = self.fetch_client_well_known().await.map(Into::into);
2375
2376        // Attempt to cache the result in storage.
2377        if let Err(err) = self
2378            .state_store()
2379            .set_kv_data(
2380                StateStoreDataKey::WellKnown,
2381                StateStoreDataValue::WellKnown(TtlStoreValue::new(well_known.clone())),
2382            )
2383            .await
2384        {
2385            warn!("error when caching well-known: {err}");
2386        }
2387
2388        Ok(well_known)
2389    }
2390
2391    async fn get_or_load_and_cache_well_known(&self) -> HttpResult<Option<WellKnownResponse>> {
2392        let well_known = &self.inner.caches.well_known;
2393        if let CachedValue::Cached(val) = &*well_known.read().await {
2394            return Ok(val.clone());
2395        }
2396
2397        let mut guarded_well_known = well_known.write().await;
2398        if let CachedValue::Cached(val) = &*guarded_well_known {
2399            return Ok(val.clone());
2400        }
2401
2402        let well_known = self.load_or_fetch_well_known().await?;
2403
2404        *guarded_well_known = CachedValue::Cached(well_known.clone());
2405
2406        Ok(well_known)
2407    }
2408
2409    /// Get information about the homeserver's advertised RTC foci by fetching
2410    /// the well-known file from the server or the cache.
2411    ///
2412    /// # Examples
2413    /// ```no_run
2414    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2415    /// # use url::Url;
2416    /// # async {
2417    /// # let homeserver = Url::parse("http://localhost:8080")?;
2418    /// # let mut client = Client::new(homeserver).await?;
2419    /// let rtc_foci = client.rtc_foci().await?;
2420    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2421    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2422    ///     _ => None,
2423    /// });
2424    /// if let Some(info) = default_livekit_focus_info {
2425    ///     println!("Default LiveKit service URL: {}", info.service_url);
2426    /// }
2427    /// # anyhow::Ok(()) };
2428    /// ```
2429    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2430        let well_known = self.get_or_load_and_cache_well_known().await?;
2431
2432        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2433    }
2434
2435    /// Empty the well-known cache.
2436    ///
2437    /// Since the SDK caches the well-known, it's possible to have a stale entry
2438    /// in the cache. This functions makes it possible to force reset it.
2439    pub async fn reset_well_known(&self) -> Result<()> {
2440        // Empty the in-memory caches.
2441        self.inner.caches.well_known.write().await.take();
2442
2443        // Empty the store cache.
2444        Ok(self.state_store().remove_kv_data(StateStoreDataKey::WellKnown).await?)
2445    }
2446
2447    /// Check whether MSC 4028 is enabled on the homeserver.
2448    ///
2449    /// # Examples
2450    ///
2451    /// ```no_run
2452    /// # use matrix_sdk::{Client, config::SyncSettings};
2453    /// # use url::Url;
2454    /// # async {
2455    /// # let homeserver = Url::parse("http://localhost:8080")?;
2456    /// # let mut client = Client::new(homeserver).await?;
2457    /// let msc4028_enabled =
2458    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2459    /// # anyhow::Ok(()) };
2460    /// ```
2461    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2462        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2463    }
2464
2465    /// Get information of all our own devices.
2466    ///
2467    /// # Examples
2468    ///
2469    /// ```no_run
2470    /// # use matrix_sdk::{Client, config::SyncSettings};
2471    /// # use url::Url;
2472    /// # async {
2473    /// # let homeserver = Url::parse("http://localhost:8080")?;
2474    /// # let mut client = Client::new(homeserver).await?;
2475    /// let response = client.devices().await?;
2476    ///
2477    /// for device in response.devices {
2478    ///     println!(
2479    ///         "Device: {} {}",
2480    ///         device.device_id,
2481    ///         device.display_name.as_deref().unwrap_or("")
2482    ///     );
2483    /// }
2484    /// # anyhow::Ok(()) };
2485    /// ```
2486    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2487        let request = get_devices::v3::Request::new();
2488
2489        self.send(request).await
2490    }
2491
2492    /// Delete the given devices from the server.
2493    ///
2494    /// # Arguments
2495    ///
2496    /// * `devices` - The list of devices that should be deleted from the
2497    ///   server.
2498    ///
2499    /// * `auth_data` - This request requires user interactive auth, the first
2500    ///   request needs to set this to `None` and will always fail with an
2501    ///   `UiaaResponse`. The response will contain information for the
2502    ///   interactive auth and the same request needs to be made but this time
2503    ///   with some `auth_data` provided.
2504    ///
2505    /// ```no_run
2506    /// # use matrix_sdk::{
2507    /// #    ruma::{api::client::uiaa, device_id},
2508    /// #    Client, Error, config::SyncSettings,
2509    /// # };
2510    /// # use serde_json::json;
2511    /// # use url::Url;
2512    /// # use std::collections::BTreeMap;
2513    /// # async {
2514    /// # let homeserver = Url::parse("http://localhost:8080")?;
2515    /// # let mut client = Client::new(homeserver).await?;
2516    /// let devices = &[device_id!("DEVICEID").to_owned()];
2517    ///
2518    /// if let Err(e) = client.delete_devices(devices, None).await {
2519    ///     if let Some(info) = e.as_uiaa_response() {
2520    ///         let mut password = uiaa::Password::new(
2521    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2522    ///             "wordpass".to_owned(),
2523    ///         );
2524    ///         password.session = info.session.clone();
2525    ///
2526    ///         client
2527    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2528    ///             .await?;
2529    ///     }
2530    /// }
2531    /// # anyhow::Ok(()) };
2532    pub async fn delete_devices(
2533        &self,
2534        devices: &[OwnedDeviceId],
2535        auth_data: Option<uiaa::AuthData>,
2536    ) -> HttpResult<delete_devices::v3::Response> {
2537        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2538        request.auth = auth_data;
2539
2540        self.send(request).await
2541    }
2542
2543    /// Change the display name of a device owned by the current user.
2544    ///
2545    /// Returns a `update_device::Response` which specifies the result
2546    /// of the operation.
2547    ///
2548    /// # Arguments
2549    ///
2550    /// * `device_id` - The ID of the device to change the display name of.
2551    /// * `display_name` - The new display name to set.
2552    pub async fn rename_device(
2553        &self,
2554        device_id: &DeviceId,
2555        display_name: &str,
2556    ) -> HttpResult<update_device::v3::Response> {
2557        let mut request = update_device::v3::Request::new(device_id.to_owned());
2558        request.display_name = Some(display_name.to_owned());
2559
2560        self.send(request).await
2561    }
2562
2563    /// Check whether a device with a specific ID exists on the server.
2564    ///
2565    /// Returns Ok(true) if the device exists, Ok(false) if the server responded
2566    /// with 404 and the underlying error otherwise.
2567    ///
2568    /// # Arguments
2569    ///
2570    /// * `device_id` - The ID of the device to query.
2571    pub async fn device_exists(&self, device_id: OwnedDeviceId) -> Result<bool> {
2572        let request = device::get_device::v3::Request::new(device_id);
2573        match self.send(request).await {
2574            Ok(_) => Ok(true),
2575            Err(err) => {
2576                if let Some(error) = err.as_client_api_error()
2577                    && error.status_code == 404
2578                {
2579                    Ok(false)
2580                } else {
2581                    Err(err.into())
2582                }
2583            }
2584        }
2585    }
2586
2587    /// Synchronize the client's state with the latest state on the server.
2588    ///
2589    /// ## Syncing Events
2590    ///
2591    /// Messages or any other type of event need to be periodically fetched from
2592    /// the server, this is achieved by sending a `/sync` request to the server.
2593    ///
2594    /// The first sync is sent out without a [`token`]. The response of the
2595    /// first sync will contain a [`next_batch`] field which should then be
2596    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2597    /// don't receive the same events multiple times.
2598    ///
2599    /// ## Long Polling
2600    ///
2601    /// A sync should in the usual case always be in flight. The
2602    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2603    /// long the server will wait for new events before it will respond.
2604    /// The server will respond immediately if some new events arrive before the
2605    /// timeout has expired. If no changes arrive and the timeout expires an
2606    /// empty sync response will be sent to the client.
2607    ///
2608    /// This method of sending a request that may not receive a response
2609    /// immediately is called long polling.
2610    ///
2611    /// ## Filtering Events
2612    ///
2613    /// The number or type of messages and events that the client should receive
2614    /// from the server can be altered using a [`Filter`].
2615    ///
2616    /// Filters can be non-trivial and, since they will be sent with every sync
2617    /// request, they may take up a bunch of unnecessary bandwidth.
2618    ///
2619    /// Luckily filters can be uploaded to the server and reused using an unique
2620    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2621    /// method.
2622    ///
2623    /// # Arguments
2624    ///
2625    /// * `sync_settings` - Settings for the sync call, this allows us to set
2626    /// various options to configure the sync:
2627    ///     * [`filter`] - To configure which events we receive and which get
2628    ///       [filtered] by the server
2629    ///     * [`timeout`] - To configure our [long polling] setup.
2630    ///     * [`token`] - To tell the server which events we already received
2631    ///       and where we wish to continue syncing.
2632    ///     * [`full_state`] - To tell the server that we wish to receive all
2633    ///       state events, regardless of our configured [`token`].
2634    ///     * [`set_presence`] - To tell the server to set the presence and to
2635    ///       which state.
2636    ///
2637    /// # Examples
2638    ///
2639    /// ```no_run
2640    /// # use url::Url;
2641    /// # async {
2642    /// # let homeserver = Url::parse("http://localhost:8080")?;
2643    /// # let username = "";
2644    /// # let password = "";
2645    /// use matrix_sdk::{
2646    ///     Client, config::SyncSettings,
2647    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2648    /// };
2649    ///
2650    /// let client = Client::new(homeserver).await?;
2651    /// client.matrix_auth().login_username(username, password).send().await?;
2652    ///
2653    /// // Sync once so we receive the client state and old messages.
2654    /// client.sync_once(SyncSettings::default()).await?;
2655    ///
2656    /// // Register our handler so we start responding once we receive a new
2657    /// // event.
2658    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2659    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2660    /// });
2661    ///
2662    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2663    /// // from our `sync_once()` call automatically.
2664    /// client.sync(SyncSettings::default()).await;
2665    /// # anyhow::Ok(()) };
2666    /// ```
2667    ///
2668    /// [`sync`]: #method.sync
2669    /// [`SyncSettings`]: crate::config::SyncSettings
2670    /// [`token`]: crate::config::SyncSettings#method.token
2671    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2672    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2673    /// [`set_presence`]: ruma::presence::PresenceState
2674    /// [`filter`]: crate::config::SyncSettings#method.filter
2675    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2676    /// [`next_batch`]: SyncResponse#structfield.next_batch
2677    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2678    /// [long polling]: #long-polling
2679    /// [filtered]: #filtering-events
2680    #[instrument(skip(self))]
2681    pub async fn sync_once(
2682        &self,
2683        sync_settings: crate::config::SyncSettings,
2684    ) -> Result<SyncResponse> {
2685        // The sync might not return for quite a while due to the timeout.
2686        // We'll see if there's anything crypto related to send out before we
2687        // sync, i.e. if we closed our client after a sync but before the
2688        // crypto requests were sent out.
2689        //
2690        // This will mostly be a no-op.
2691        #[cfg(feature = "e2e-encryption")]
2692        if let Err(e) = self.send_outgoing_requests().await {
2693            error!(error = ?e, "Error while sending outgoing E2EE requests");
2694        }
2695
2696        let token = match sync_settings.token {
2697            SyncToken::Specific(token) => Some(token),
2698            SyncToken::NoToken => None,
2699            SyncToken::ReusePrevious => self.sync_token().await,
2700        };
2701
2702        let request = assign!(sync_events::v3::Request::new(), {
2703            filter: sync_settings.filter.map(|f| *f),
2704            since: token,
2705            full_state: sync_settings.full_state,
2706            set_presence: sync_settings.set_presence,
2707            timeout: sync_settings.timeout,
2708            use_state_after: true,
2709        });
2710        let mut request_config = self.request_config();
2711        if let Some(timeout) = sync_settings.timeout {
2712            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2713            request_config.timeout = Some(base_timeout + timeout);
2714        }
2715
2716        let response = self.send(request).with_request_config(request_config).await?;
2717        let next_batch = response.next_batch.clone();
2718        let response = self.process_sync(response).await?;
2719
2720        #[cfg(feature = "e2e-encryption")]
2721        if let Err(e) = self.send_outgoing_requests().await {
2722            error!(error = ?e, "Error while sending outgoing E2EE requests");
2723        }
2724
2725        self.inner.sync_beat.notify(usize::MAX);
2726
2727        Ok(SyncResponse::new(next_batch, response))
2728    }
2729
2730    /// Repeatedly synchronize the client state with the server.
2731    ///
2732    /// This method will only return on error, if cancellation is needed
2733    /// the method should be wrapped in a cancelable task or the
2734    /// [`Client::sync_with_callback`] method can be used or
2735    /// [`Client::sync_with_result_callback`] if you want to handle error
2736    /// cases in the loop, too.
2737    ///
2738    /// This method will internally call [`Client::sync_once`] in a loop.
2739    ///
2740    /// This method can be used with the [`Client::add_event_handler`]
2741    /// method to react to individual events. If you instead wish to handle
2742    /// events in a bulk manner the [`Client::sync_with_callback`],
2743    /// [`Client::sync_with_result_callback`] and
2744    /// [`Client::sync_stream`] methods can be used instead. Those methods
2745    /// repeatedly return the whole sync response.
2746    ///
2747    /// # Arguments
2748    ///
2749    /// * `sync_settings` - Settings for the sync call. *Note* that those
2750    ///   settings will be only used for the first sync call. See the argument
2751    ///   docs for [`Client::sync_once`] for more info.
2752    ///
2753    /// # Return
2754    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2755    /// up to the user of the API to check the error and decide whether the sync
2756    /// should continue or not.
2757    ///
2758    /// # Examples
2759    ///
2760    /// ```no_run
2761    /// # use url::Url;
2762    /// # async {
2763    /// # let homeserver = Url::parse("http://localhost:8080")?;
2764    /// # let username = "";
2765    /// # let password = "";
2766    /// use matrix_sdk::{
2767    ///     Client, config::SyncSettings,
2768    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2769    /// };
2770    ///
2771    /// let client = Client::new(homeserver).await?;
2772    /// client.matrix_auth().login_username(&username, &password).send().await?;
2773    ///
2774    /// // Register our handler so we start responding once we receive a new
2775    /// // event.
2776    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2777    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2778    /// });
2779    ///
2780    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2781    /// // automatically.
2782    /// client.sync(SyncSettings::default()).await?;
2783    /// # anyhow::Ok(()) };
2784    /// ```
2785    ///
2786    /// [argument docs]: #method.sync_once
2787    /// [`sync_with_callback`]: #method.sync_with_callback
2788    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2789        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2790    }
2791
2792    /// Repeatedly call sync to synchronize the client state with the server.
2793    ///
2794    /// # Arguments
2795    ///
2796    /// * `sync_settings` - Settings for the sync call. *Note* that those
2797    ///   settings will be only used for the first sync call. See the argument
2798    ///   docs for [`Client::sync_once`] for more info.
2799    ///
2800    /// * `callback` - A callback that will be called every time a successful
2801    ///   response has been fetched from the server. The callback must return a
2802    ///   boolean which signalizes if the method should stop syncing. If the
2803    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2804    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2805    ///
2806    /// # Return
2807    /// The sync runs until an error occurs or the
2808    /// callback indicates that the Loop should stop. If the callback asked for
2809    /// a regular stop, the result will be `Ok(())` otherwise the
2810    /// `Err(Error)` is returned.
2811    ///
2812    /// # Examples
2813    ///
2814    /// The following example demonstrates how to sync forever while sending all
2815    /// the interesting events through a mpsc channel to another thread e.g. a
2816    /// UI thread.
2817    ///
2818    /// ```no_run
2819    /// # use std::time::Duration;
2820    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2821    /// # use url::Url;
2822    /// # async {
2823    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2824    /// # let mut client = Client::new(homeserver).await.unwrap();
2825    ///
2826    /// use tokio::sync::mpsc::channel;
2827    ///
2828    /// let (tx, rx) = channel(100);
2829    ///
2830    /// let sync_channel = &tx;
2831    /// let sync_settings = SyncSettings::new()
2832    ///     .timeout(Duration::from_secs(30));
2833    ///
2834    /// client
2835    ///     .sync_with_callback(sync_settings, |response| async move {
2836    ///         let channel = sync_channel;
2837    ///         for (room_id, room) in response.rooms.joined {
2838    ///             for event in room.timeline.events {
2839    ///                 channel.send(event).await.unwrap();
2840    ///             }
2841    ///         }
2842    ///
2843    ///         LoopCtrl::Continue
2844    ///     })
2845    ///     .await;
2846    /// };
2847    /// ```
2848    #[instrument(skip_all)]
2849    pub async fn sync_with_callback<C>(
2850        &self,
2851        sync_settings: crate::config::SyncSettings,
2852        callback: impl Fn(SyncResponse) -> C,
2853    ) -> Result<(), Error>
2854    where
2855        C: Future<Output = LoopCtrl>,
2856    {
2857        self.sync_with_result_callback(sync_settings, |result| async {
2858            Ok(callback(result?).await)
2859        })
2860        .await
2861    }
2862
2863    /// Repeatedly call sync to synchronize the client state with the server.
2864    ///
2865    /// # Arguments
2866    ///
2867    /// * `sync_settings` - Settings for the sync call. *Note* that those
2868    ///   settings will be only used for the first sync call. See the argument
2869    ///   docs for [`Client::sync_once`] for more info.
2870    ///
2871    /// * `callback` - A callback that will be called every time after a
2872    ///   response has been received, failure or not. The callback returns a
2873    ///   `Result<LoopCtrl, Error>`, too. When returning
2874    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2875    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2876    ///   function returns `Ok(())`. In case the callback can't handle the
2877    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2878    ///   which results in the sync ending and the `Err(Error)` being returned.
2879    ///
2880    /// # Return
2881    /// The sync runs until an error occurs that the callback can't handle or
2882    /// the callback indicates that the Loop should stop. If the callback
2883    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2884    /// `Err(Error)` is returned.
2885    ///
2886    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2887    /// this, and are handled first without sending the result to the
2888    /// callback. Only after they have exceeded is the `Result` handed to
2889    /// the callback.
2890    ///
2891    /// # Examples
2892    ///
2893    /// The following example demonstrates how to sync forever while sending all
2894    /// the interesting events through a mpsc channel to another thread e.g. a
2895    /// UI thread.
2896    ///
2897    /// ```no_run
2898    /// # use std::time::Duration;
2899    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2900    /// # use url::Url;
2901    /// # async {
2902    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2903    /// # let mut client = Client::new(homeserver).await.unwrap();
2904    /// #
2905    /// use tokio::sync::mpsc::channel;
2906    ///
2907    /// let (tx, rx) = channel(100);
2908    ///
2909    /// let sync_channel = &tx;
2910    /// let sync_settings = SyncSettings::new()
2911    ///     .timeout(Duration::from_secs(30));
2912    ///
2913    /// client
2914    ///     .sync_with_result_callback(sync_settings, |response| async move {
2915    ///         let channel = sync_channel;
2916    ///         let sync_response = response?;
2917    ///         for (room_id, room) in sync_response.rooms.joined {
2918    ///              for event in room.timeline.events {
2919    ///                  channel.send(event).await.unwrap();
2920    ///               }
2921    ///         }
2922    ///
2923    ///         Ok(LoopCtrl::Continue)
2924    ///     })
2925    ///     .await;
2926    /// };
2927    /// ```
2928    #[instrument(skip(self, callback))]
2929    pub async fn sync_with_result_callback<C>(
2930        &self,
2931        sync_settings: crate::config::SyncSettings,
2932        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2933    ) -> Result<(), Error>
2934    where
2935        C: Future<Output = Result<LoopCtrl, Error>>,
2936    {
2937        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2938
2939        while let Some(result) = sync_stream.next().await {
2940            trace!("Running callback");
2941            if callback(result).await? == LoopCtrl::Break {
2942                trace!("Callback told us to stop");
2943                break;
2944            }
2945            trace!("Done running callback");
2946        }
2947
2948        Ok(())
2949    }
2950
2951    //// Repeatedly synchronize the client state with the server.
2952    ///
2953    /// This method will internally call [`Client::sync_once`] in a loop and is
2954    /// equivalent to the [`Client::sync`] method but the responses are provided
2955    /// as an async stream.
2956    ///
2957    /// # Arguments
2958    ///
2959    /// * `sync_settings` - Settings for the sync call. *Note* that those
2960    ///   settings will be only used for the first sync call. See the argument
2961    ///   docs for [`Client::sync_once`] for more info.
2962    ///
2963    /// # Examples
2964    ///
2965    /// ```no_run
2966    /// # use url::Url;
2967    /// # async {
2968    /// # let homeserver = Url::parse("http://localhost:8080")?;
2969    /// # let username = "";
2970    /// # let password = "";
2971    /// use futures_util::StreamExt;
2972    /// use matrix_sdk::{Client, config::SyncSettings};
2973    ///
2974    /// let client = Client::new(homeserver).await?;
2975    /// client.matrix_auth().login_username(&username, &password).send().await?;
2976    ///
2977    /// let mut sync_stream =
2978    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2979    ///
2980    /// while let Some(Ok(response)) = sync_stream.next().await {
2981    ///     for room in response.rooms.joined.values() {
2982    ///         for e in &room.timeline.events {
2983    ///             if let Ok(event) = e.raw().deserialize() {
2984    ///                 println!("Received event {:?}", event);
2985    ///             }
2986    ///         }
2987    ///     }
2988    /// }
2989    ///
2990    /// # anyhow::Ok(()) };
2991    /// ```
2992    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2993    #[instrument(skip(self))]
2994    pub async fn sync_stream(
2995        &self,
2996        mut sync_settings: crate::config::SyncSettings,
2997    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2998        let mut is_first_sync = true;
2999        let mut timeout = None;
3000        let mut last_sync_time: Option<Instant> = None;
3001
3002        let parent_span = Span::current();
3003
3004        async_stream::stream!({
3005            loop {
3006                trace!("Syncing");
3007
3008                if sync_settings.ignore_timeout_on_first_sync {
3009                    if is_first_sync {
3010                        timeout = sync_settings.timeout.take();
3011                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
3012                        sync_settings.timeout = timeout.take();
3013                    }
3014
3015                    is_first_sync = false;
3016                }
3017
3018                yield self
3019                    .sync_loop_helper(&mut sync_settings)
3020                    .instrument(parent_span.clone())
3021                    .await;
3022
3023                Client::delay_sync(&mut last_sync_time).await
3024            }
3025        })
3026    }
3027
3028    /// Get the current, if any, sync token of the client.
3029    /// This will be None if the client didn't sync at least once.
3030    pub(crate) async fn sync_token(&self) -> Option<String> {
3031        self.inner.base_client.sync_token().await
3032    }
3033
3034    /// Gets information about the owner of a given access token.
3035    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
3036        let request = whoami::v3::Request::new();
3037        self.send(request).await
3038    }
3039
3040    /// Subscribes a new receiver to client SessionChange broadcasts.
3041    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
3042        let broadcast = &self.auth_ctx().session_change_sender;
3043        broadcast.subscribe()
3044    }
3045
3046    /// Sets the save/restore session callbacks.
3047    ///
3048    /// This is another mechanism to get synchronous updates to session tokens,
3049    /// while [`Self::subscribe_to_session_changes`] provides an async update.
3050    pub fn set_session_callbacks(
3051        &self,
3052        reload_session_callback: Box<ReloadSessionCallback>,
3053        save_session_callback: Box<SaveSessionCallback>,
3054    ) -> Result<()> {
3055        self.inner
3056            .auth_ctx
3057            .reload_session_callback
3058            .set(reload_session_callback)
3059            .map_err(|_| Error::MultipleSessionCallbacks)?;
3060
3061        self.inner
3062            .auth_ctx
3063            .save_session_callback
3064            .set(save_session_callback)
3065            .map_err(|_| Error::MultipleSessionCallbacks)?;
3066
3067        Ok(())
3068    }
3069
3070    /// Get the notification settings of the current owner of the client.
3071    pub async fn notification_settings(&self) -> NotificationSettings {
3072        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
3073        NotificationSettings::new(self.clone(), ruleset)
3074    }
3075
3076    /// Create a new specialized `Client` that can process notifications.
3077    ///
3078    /// See [`CrossProcessLock::new`] to learn more about
3079    /// `cross_process_store_locks_holder_name`.
3080    ///
3081    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
3082    pub async fn notification_client(
3083        &self,
3084        cross_process_store_locks_holder_name: String,
3085    ) -> Result<Client> {
3086        let client = Client {
3087            inner: ClientInner::new(
3088                self.inner.auth_ctx.clone(),
3089                self.server().cloned(),
3090                self.homeserver(),
3091                self.sliding_sync_version(),
3092                self.inner.http_client.clone(),
3093                self.inner
3094                    .base_client
3095                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
3096                    .await?,
3097                self.inner.caches.supported_versions.read().await.clone(),
3098                self.inner.caches.well_known.read().await.clone(),
3099                self.inner.respect_login_well_known,
3100                self.inner.event_cache.clone(),
3101                self.inner.send_queue_data.clone(),
3102                self.inner.latest_events.clone(),
3103                #[cfg(feature = "e2e-encryption")]
3104                self.inner.e2ee.encryption_settings,
3105                #[cfg(feature = "e2e-encryption")]
3106                self.inner.enable_share_history_on_invite,
3107                cross_process_store_locks_holder_name,
3108                #[cfg(feature = "experimental-search")]
3109                self.inner.search_index.clone(),
3110                self.inner.thread_subscription_catchup.clone(),
3111            )
3112            .await,
3113        };
3114
3115        Ok(client)
3116    }
3117
3118    /// The [`EventCache`] instance for this [`Client`].
3119    pub fn event_cache(&self) -> &EventCache {
3120        // SAFETY: always initialized in the `Client` ctor.
3121        self.inner.event_cache.get().unwrap()
3122    }
3123
3124    /// The [`LatestEvents`] instance for this [`Client`].
3125    pub async fn latest_events(&self) -> &LatestEvents {
3126        self.inner
3127            .latest_events
3128            .get_or_init(|| async {
3129                LatestEvents::new(
3130                    WeakClient::from_client(self),
3131                    self.event_cache().clone(),
3132                    SendQueue::new(self.clone()),
3133                )
3134            })
3135            .await
3136    }
3137
3138    /// Waits until an at least partially synced room is received, and returns
3139    /// it.
3140    ///
3141    /// **Note: this function will loop endlessly until either it finds the room
3142    /// or an externally set timeout happens.**
3143    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
3144        loop {
3145            if let Some(room) = self.get_room(room_id) {
3146                if room.is_state_partially_or_fully_synced() {
3147                    debug!("Found just created room!");
3148                    return room;
3149                }
3150                debug!("Room wasn't partially synced, waiting for sync beat to try again");
3151            } else {
3152                debug!("Room wasn't found, waiting for sync beat to try again");
3153            }
3154            self.inner.sync_beat.listen().await;
3155        }
3156    }
3157
3158    /// Knock on a room given its `room_id_or_alias` to ask for permission to
3159    /// join it.
3160    pub async fn knock(
3161        &self,
3162        room_id_or_alias: OwnedRoomOrAliasId,
3163        reason: Option<String>,
3164        server_names: Vec<OwnedServerName>,
3165    ) -> Result<Room> {
3166        let request =
3167            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
3168        let response = self.send(request).await?;
3169        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
3170        Ok(Room::new(self.clone(), base_room))
3171    }
3172
3173    /// Checks whether the provided `user_id` belongs to an ignored user.
3174    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
3175        self.base_client().is_user_ignored(user_id).await
3176    }
3177
3178    /// Gets the `max_upload_size` value from the homeserver, getting either a
3179    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
3180    /// missing.
3181    ///
3182    /// Check the spec for more info:
3183    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
3184    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
3185        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
3186        if let Some(data) = max_upload_size_lock.get() {
3187            return Ok(data.to_owned());
3188        }
3189
3190        // Use the authenticated endpoint when the server supports it.
3191        let supported_versions = self.supported_versions().await?;
3192        let use_auth = authenticated_media::get_media_config::v1::Request::PATH_BUILDER
3193            .is_supported(&supported_versions);
3194
3195        let upload_size = if use_auth {
3196            self.send(authenticated_media::get_media_config::v1::Request::default())
3197                .await?
3198                .upload_size
3199        } else {
3200            #[allow(deprecated)]
3201            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
3202        };
3203
3204        match max_upload_size_lock.set(upload_size) {
3205            Ok(_) => Ok(upload_size),
3206            Err(error) => {
3207                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
3208            }
3209        }
3210    }
3211
3212    /// The settings to use for decrypting events.
3213    #[cfg(feature = "e2e-encryption")]
3214    pub fn decryption_settings(&self) -> &DecryptionSettings {
3215        &self.base_client().decryption_settings
3216    }
3217
3218    /// Returns the [`SearchIndex`] for this [`Client`].
3219    #[cfg(feature = "experimental-search")]
3220    pub fn search_index(&self) -> &SearchIndex {
3221        &self.inner.search_index
3222    }
3223
3224    /// Whether the client is configured to take thread subscriptions (MSC4306
3225    /// and MSC4308) into account.
3226    ///
3227    /// This may cause filtering out of thread subscriptions, and loading the
3228    /// thread subscriptions via the sliding sync extension, when the room
3229    /// list service is being used.
3230    pub fn enabled_thread_subscriptions(&self) -> bool {
3231        match self.base_client().threading_support {
3232            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
3233            ThreadingSupport::Disabled => false,
3234        }
3235    }
3236
3237    /// Fetch thread subscriptions changes between `from` and up to `to`.
3238    ///
3239    /// The `limit` optional parameter can be used to limit the number of
3240    /// entries in a response. It can also be overridden by the server, if
3241    /// it's deemed too large.
3242    pub async fn fetch_thread_subscriptions(
3243        &self,
3244        from: Option<String>,
3245        to: Option<String>,
3246        limit: Option<UInt>,
3247    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
3248        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
3249            from,
3250            to,
3251            limit,
3252        });
3253        Ok(self.send(request).await?)
3254    }
3255
3256    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
3257        self.inner.thread_subscription_catchup.get().unwrap()
3258    }
3259
3260    /// Perform database optimizations if any are available, i.e. vacuuming in
3261    /// SQLite.
3262    ///
3263    /// **Warning:** this was added to check if SQLite fragmentation was the
3264    /// source of performance issues, **DO NOT use in production**.
3265    #[doc(hidden)]
3266    pub async fn optimize_stores(&self) -> Result<()> {
3267        trace!("Optimizing state store...");
3268        self.state_store().optimize().await?;
3269
3270        trace!("Optimizing event cache store...");
3271        if let Some(clean_lock) = self.event_cache_store().lock().await?.as_clean() {
3272            clean_lock.optimize().await?;
3273        }
3274
3275        trace!("Optimizing media store...");
3276        self.media_store().lock().await?.optimize().await?;
3277
3278        Ok(())
3279    }
3280
3281    /// Returns the sizes of the existing stores, if known.
3282    pub async fn get_store_sizes(&self) -> Result<StoreSizes> {
3283        #[cfg(feature = "e2e-encryption")]
3284        let crypto_store_size = if let Some(olm_machine) = self.olm_machine().await.as_ref()
3285            && let Ok(Some(store_size)) = olm_machine.store().get_size().await
3286        {
3287            Some(store_size)
3288        } else {
3289            None
3290        };
3291        #[cfg(not(feature = "e2e-encryption"))]
3292        let crypto_store_size = None;
3293
3294        let state_store_size = self.state_store().get_size().await.ok().flatten();
3295
3296        let event_cache_store_size = if let Some(clean_lock) =
3297            self.event_cache_store().lock().await?.as_clean()
3298            && let Ok(Some(store_size)) = clean_lock.get_size().await
3299        {
3300            Some(store_size)
3301        } else {
3302            None
3303        };
3304
3305        let media_store_size = self.media_store().lock().await?.get_size().await.ok().flatten();
3306
3307        Ok(StoreSizes {
3308            crypto_store: crypto_store_size,
3309            state_store: state_store_size,
3310            event_cache_store: event_cache_store_size,
3311            media_store: media_store_size,
3312        })
3313    }
3314}
3315
3316/// Contains the disk size of the different stores, if known. It won't be
3317/// available for in-memory stores.
3318#[derive(Debug, Clone)]
3319pub struct StoreSizes {
3320    /// The size of the CryptoStore.
3321    pub crypto_store: Option<usize>,
3322    /// The size of the StateStore.
3323    pub state_store: Option<usize>,
3324    /// The size of the EventCacheStore.
3325    pub event_cache_store: Option<usize>,
3326    /// The size of the MediaStore.
3327    pub media_store: Option<usize>,
3328}
3329
3330#[cfg(any(feature = "testing", test))]
3331impl Client {
3332    /// Test helper to mark users as tracked by the crypto layer.
3333    #[cfg(feature = "e2e-encryption")]
3334    pub async fn update_tracked_users_for_testing(
3335        &self,
3336        user_ids: impl IntoIterator<Item = &UserId>,
3337    ) {
3338        let olm = self.olm_machine().await;
3339        let olm = olm.as_ref().unwrap();
3340        olm.update_tracked_users(user_ids).await.unwrap();
3341    }
3342}
3343
3344/// A weak reference to the inner client, useful when trying to get a handle
3345/// on the owning client.
3346#[derive(Clone, Debug)]
3347pub(crate) struct WeakClient {
3348    client: Weak<ClientInner>,
3349}
3350
3351impl WeakClient {
3352    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3353    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3354        Self { client: Arc::downgrade(client) }
3355    }
3356
3357    /// Construct a [`WeakClient`] from a [`Client`].
3358    pub fn from_client(client: &Client) -> Self {
3359        Self::from_inner(&client.inner)
3360    }
3361
3362    /// Attempts to get a [`Client`] from this [`WeakClient`].
3363    pub fn get(&self) -> Option<Client> {
3364        self.client.upgrade().map(|inner| Client { inner })
3365    }
3366
3367    /// Gets the number of strong (`Arc`) pointers still pointing to this
3368    /// client.
3369    #[allow(dead_code)]
3370    pub fn strong_count(&self) -> usize {
3371        self.client.strong_count()
3372    }
3373}
3374
3375/// Information about the state of a room before we joined it.
3376#[derive(Debug, Clone, Default)]
3377struct PreJoinRoomInfo {
3378    /// The user who invited us to the room, if any.
3379    pub inviter: Option<RoomMember>,
3380}
3381
3382// The http mocking library is not supported for wasm32
3383#[cfg(all(test, not(target_family = "wasm")))]
3384pub(crate) mod tests {
3385    use std::{sync::Arc, time::Duration};
3386
3387    use assert_matches::assert_matches;
3388    use assert_matches2::assert_let;
3389    use eyeball::SharedObservable;
3390    use futures_util::{FutureExt, StreamExt, pin_mut};
3391    use js_int::{UInt, uint};
3392    use matrix_sdk_base::{
3393        RoomState,
3394        store::{MemoryStore, StoreConfig},
3395    };
3396    use matrix_sdk_test::{
3397        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3398        event_factory::EventFactory,
3399    };
3400    #[cfg(target_family = "wasm")]
3401    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3402
3403    use ruma::{
3404        RoomId, ServerName, UserId,
3405        api::{
3406            FeatureFlag, MatrixVersion,
3407            client::{
3408                discovery::discover_homeserver::RtcFocusInfo,
3409                room::create_room::v3::Request as CreateRoomRequest,
3410            },
3411        },
3412        assign,
3413        events::{
3414            ignored_user_list::IgnoredUserListEventContent,
3415            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3416        },
3417        owned_device_id, owned_room_id, owned_user_id, room_alias_id, room_id, user_id,
3418    };
3419    use serde_json::json;
3420    use stream_assert::{assert_next_matches, assert_pending};
3421    use tokio::{
3422        spawn,
3423        time::{sleep, timeout},
3424    };
3425    use url::Url;
3426
3427    use super::Client;
3428    use crate::{
3429        Error, TransmissionProgress,
3430        client::{WeakClient, futures::SendMediaUploadRequest},
3431        config::{RequestConfig, SyncSettings},
3432        futures::SendRequest,
3433        media::MediaError,
3434        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3435    };
3436
3437    #[async_test]
3438    async fn test_account_data() {
3439        let server = MatrixMockServer::new().await;
3440        let client = server.client_builder().build().await;
3441
3442        let f = EventFactory::new();
3443        server
3444            .mock_sync()
3445            .ok_and_run(&client, |builder| {
3446                builder.add_global_account_data(
3447                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3448                );
3449            })
3450            .await;
3451
3452        let content = client
3453            .account()
3454            .account_data::<IgnoredUserListEventContent>()
3455            .await
3456            .unwrap()
3457            .unwrap()
3458            .deserialize()
3459            .unwrap();
3460
3461        assert_eq!(content.ignored_users.len(), 1);
3462    }
3463
3464    #[async_test]
3465    async fn test_successful_discovery() {
3466        // Imagine this is `matrix.org`.
3467        let server = MatrixMockServer::new().await;
3468        let server_url = server.uri();
3469
3470        // Imagine this is `matrix-client.matrix.org`.
3471        let homeserver = MatrixMockServer::new().await;
3472        let homeserver_url = homeserver.uri();
3473
3474        // Imagine Alice has the user ID `@alice:matrix.org`.
3475        let domain = server_url.strip_prefix("http://").unwrap();
3476        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3477
3478        // The `.well-known` is on the server (e.g. `matrix.org`).
3479        server
3480            .mock_well_known()
3481            .ok_with_homeserver_url(&homeserver_url)
3482            .mock_once()
3483            .named("well-known")
3484            .mount()
3485            .await;
3486
3487        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3488        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3489
3490        let client = Client::builder()
3491            .insecure_server_name_no_tls(alice.server_name())
3492            .build()
3493            .await
3494            .unwrap();
3495
3496        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3497        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3498        client.server_versions().await.unwrap();
3499    }
3500
3501    #[async_test]
3502    async fn test_discovery_broken_server() {
3503        let server = MatrixMockServer::new().await;
3504        let server_url = server.uri();
3505        let domain = server_url.strip_prefix("http://").unwrap();
3506        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3507
3508        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3509
3510        assert!(
3511            Client::builder()
3512                .insecure_server_name_no_tls(alice.server_name())
3513                .build()
3514                .await
3515                .is_err(),
3516            "Creating a client from a user ID should fail when the .well-known request fails."
3517        );
3518    }
3519
3520    #[async_test]
3521    async fn test_room_creation() {
3522        let server = MatrixMockServer::new().await;
3523        let client = server.client_builder().build().await;
3524
3525        server
3526            .mock_sync()
3527            .ok_and_run(&client, |builder| {
3528                builder.add_joined_room(
3529                    JoinedRoomBuilder::default()
3530                        .add_state_event(StateTestEvent::Member)
3531                        .add_state_event(StateTestEvent::PowerLevels),
3532                );
3533            })
3534            .await;
3535
3536        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3537        assert_eq!(room.state(), RoomState::Joined);
3538    }
3539
3540    #[async_test]
3541    async fn test_retry_limit_http_requests() {
3542        let server = MatrixMockServer::new().await;
3543        let client = server
3544            .client_builder()
3545            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3546            .build()
3547            .await;
3548
3549        assert!(client.request_config().retry_limit.unwrap() == 3);
3550
3551        server.mock_login().error500().expect(3).mount().await;
3552
3553        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3554    }
3555
3556    #[async_test]
3557    async fn test_retry_timeout_http_requests() {
3558        // Keep this timeout small so that the test doesn't take long
3559        let retry_timeout = Duration::from_secs(5);
3560        let server = MatrixMockServer::new().await;
3561        let client = server
3562            .client_builder()
3563            .on_builder(|builder| {
3564                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3565            })
3566            .build()
3567            .await;
3568
3569        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3570
3571        server.mock_login().error500().expect(2..).mount().await;
3572
3573        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3574    }
3575
3576    #[async_test]
3577    async fn test_short_retry_initial_http_requests() {
3578        let server = MatrixMockServer::new().await;
3579        let client = server
3580            .client_builder()
3581            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3582            .build()
3583            .await;
3584
3585        server.mock_login().error500().expect(3..).mount().await;
3586
3587        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3588    }
3589
3590    #[async_test]
3591    async fn test_no_retry_http_requests() {
3592        let server = MatrixMockServer::new().await;
3593        let client = server.client_builder().build().await;
3594
3595        server.mock_devices().error500().mock_once().mount().await;
3596
3597        client.devices().await.unwrap_err();
3598    }
3599
3600    #[async_test]
3601    async fn test_set_homeserver() {
3602        let client = MockClientBuilder::new(None).build().await;
3603        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3604
3605        let homeserver = Url::parse("http://example.com/").unwrap();
3606        client.set_homeserver(homeserver.clone());
3607        assert_eq!(client.homeserver(), homeserver);
3608    }
3609
3610    #[async_test]
3611    async fn test_search_user_request() {
3612        let server = MatrixMockServer::new().await;
3613        let client = server.client_builder().build().await;
3614
3615        server.mock_user_directory().ok().mock_once().mount().await;
3616
3617        let response = client.search_users("test", 50).await.unwrap();
3618        assert_eq!(response.results.len(), 1);
3619        let result = &response.results[0];
3620        assert_eq!(result.user_id.to_string(), "@test:example.me");
3621        assert_eq!(result.display_name.clone().unwrap(), "Test");
3622        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3623        assert!(!response.limited);
3624    }
3625
3626    #[async_test]
3627    async fn test_request_unstable_features() {
3628        let server = MatrixMockServer::new().await;
3629        let client = server.client_builder().no_server_versions().build().await;
3630
3631        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3632
3633        let unstable_features = client.unstable_features().await.unwrap();
3634        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3635        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3636    }
3637
3638    #[async_test]
3639    async fn test_can_homeserver_push_encrypted_event_to_device() {
3640        let server = MatrixMockServer::new().await;
3641        let client = server.client_builder().no_server_versions().build().await;
3642
3643        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3644
3645        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3646        assert!(msc4028_enabled);
3647    }
3648
3649    #[async_test]
3650    async fn test_recently_visited_rooms() {
3651        // Tracking recently visited rooms requires authentication
3652        let client = MockClientBuilder::new(None).unlogged().build().await;
3653        assert_matches!(
3654            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3655            Err(Error::AuthenticationRequired)
3656        );
3657
3658        let client = MockClientBuilder::new(None).build().await;
3659        let account = client.account();
3660
3661        // We should start off with an empty list
3662        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3663
3664        // Tracking a valid room id should add it to the list
3665        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3666        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3667        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3668
3669        // And the existing list shouldn't be changed
3670        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3671        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3672
3673        // Tracking the same room again shouldn't change the list
3674        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3675        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3676        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3677
3678        // Tracking a second room should add it to the front of the list
3679        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3680        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3681        assert_eq!(
3682            account.get_recently_visited_rooms().await.unwrap(),
3683            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3684        );
3685
3686        // Tracking the first room yet again should move it to the front of the list
3687        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3688        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3689        assert_eq!(
3690            account.get_recently_visited_rooms().await.unwrap(),
3691            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3692        );
3693
3694        // Tracking should be capped at 20
3695        for n in 0..20 {
3696            account
3697                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3698                .await
3699                .unwrap();
3700        }
3701
3702        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3703
3704        // And the initial rooms should've been pushed out
3705        let rooms = account.get_recently_visited_rooms().await.unwrap();
3706        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3707        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3708
3709        // And the last tracked room should be the first
3710        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3711    }
3712
3713    #[async_test]
3714    async fn test_client_no_cycle_with_event_cache() {
3715        let client = MockClientBuilder::new(None).build().await;
3716
3717        // Wait for the init tasks to die.
3718        sleep(Duration::from_secs(1)).await;
3719
3720        let weak_client = WeakClient::from_client(&client);
3721        assert_eq!(weak_client.strong_count(), 1);
3722
3723        {
3724            let room_id = room_id!("!room:example.org");
3725
3726            // Have the client know the room.
3727            let response = SyncResponseBuilder::default()
3728                .add_joined_room(JoinedRoomBuilder::new(room_id))
3729                .build_sync_response();
3730            client.inner.base_client.receive_sync_response(response).await.unwrap();
3731
3732            client.event_cache().subscribe().unwrap();
3733
3734            let (_room_event_cache, _drop_handles) =
3735                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3736        }
3737
3738        drop(client);
3739
3740        // Give a bit of time for background tasks to die.
3741        sleep(Duration::from_secs(1)).await;
3742
3743        // The weak client must be the last reference to the client now.
3744        assert_eq!(weak_client.strong_count(), 0);
3745        let client = weak_client.get();
3746        assert!(
3747            client.is_none(),
3748            "too many strong references to the client: {}",
3749            Arc::strong_count(&client.unwrap().inner)
3750        );
3751    }
3752
3753    #[async_test]
3754    async fn test_supported_versions_caching() {
3755        let server = MatrixMockServer::new().await;
3756
3757        let versions_mock = server
3758            .mock_versions()
3759            .expect_default_access_token()
3760            .ok_with_unstable_features()
3761            .named("first versions mock")
3762            .expect(1)
3763            .mount_as_scoped()
3764            .await;
3765
3766        let memory_store = Arc::new(MemoryStore::new());
3767        let client = server
3768            .client_builder()
3769            .no_server_versions()
3770            .on_builder(|builder| {
3771                builder.store_config(
3772                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3773                        .state_store(memory_store.clone()),
3774                )
3775            })
3776            .build()
3777            .await;
3778
3779        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3780
3781        // The result was cached.
3782        assert_matches!(client.supported_versions_cached().await, Ok(Some(_)));
3783        // This subsequent call hits the in-memory cache.
3784        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3785
3786        drop(client);
3787
3788        let client = server
3789            .client_builder()
3790            .no_server_versions()
3791            .on_builder(|builder| {
3792                builder.store_config(
3793                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3794                        .state_store(memory_store.clone()),
3795                )
3796            })
3797            .build()
3798            .await;
3799
3800        // These calls to the new client hit the on-disk cache.
3801        assert!(
3802            client
3803                .unstable_features()
3804                .await
3805                .unwrap()
3806                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3807        );
3808        let supported = client.supported_versions().await.unwrap();
3809        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3810        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3811
3812        // Then this call hits the in-memory cache.
3813        let supported = client.supported_versions().await.unwrap();
3814        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3815        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3816
3817        drop(versions_mock);
3818
3819        // Now, reset the cache, and observe the endpoints being called again once.
3820        client.reset_supported_versions().await.unwrap();
3821
3822        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3823
3824        // Hits network again.
3825        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3826        // Hits in-memory cache again.
3827        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3828    }
3829
3830    #[async_test]
3831    async fn test_well_known_caching() {
3832        let server = MatrixMockServer::new().await;
3833        let server_url = server.uri();
3834        let domain = server_url.strip_prefix("http://").unwrap();
3835        let server_name = <&ServerName>::try_from(domain).unwrap();
3836        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3837
3838        let well_known_mock = server
3839            .mock_well_known()
3840            .ok()
3841            .named("well known mock")
3842            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3843            .mount_as_scoped()
3844            .await;
3845
3846        let memory_store = Arc::new(MemoryStore::new());
3847        let client = Client::builder()
3848            .insecure_server_name_no_tls(server_name)
3849            .store_config(
3850                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3851                    .state_store(memory_store.clone()),
3852            )
3853            .build()
3854            .await
3855            .unwrap();
3856
3857        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3858
3859        // This subsequent call hits the in-memory cache.
3860        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3861
3862        drop(client);
3863
3864        let client = server
3865            .client_builder()
3866            .no_server_versions()
3867            .on_builder(|builder| {
3868                builder.store_config(
3869                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3870                        .state_store(memory_store.clone()),
3871                )
3872            })
3873            .build()
3874            .await;
3875
3876        // This call to the new client hits the on-disk cache.
3877        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3878
3879        // Then this call hits the in-memory cache.
3880        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3881
3882        drop(well_known_mock);
3883
3884        // Now, reset the cache, and observe the endpoints being called again once.
3885        client.reset_well_known().await.unwrap();
3886
3887        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3888
3889        // Hits network again.
3890        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3891        // Hits in-memory cache again.
3892        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3893    }
3894
3895    #[async_test]
3896    async fn test_missing_well_known_caching() {
3897        let server = MatrixMockServer::new().await;
3898        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3899
3900        let well_known_mock = server
3901            .mock_well_known()
3902            .error_unrecognized()
3903            .named("first well-known mock")
3904            .expect(1)
3905            .mount_as_scoped()
3906            .await;
3907
3908        let memory_store = Arc::new(MemoryStore::new());
3909        let client = server
3910            .client_builder()
3911            .on_builder(|builder| {
3912                builder.store_config(
3913                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3914                        .state_store(memory_store.clone()),
3915                )
3916            })
3917            .build()
3918            .await;
3919
3920        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3921
3922        // This subsequent call hits the in-memory cache.
3923        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3924
3925        drop(client);
3926
3927        let client = server
3928            .client_builder()
3929            .on_builder(|builder| {
3930                builder.store_config(
3931                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3932                        .state_store(memory_store.clone()),
3933                )
3934            })
3935            .build()
3936            .await;
3937
3938        // This call to the new client hits the on-disk cache.
3939        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3940
3941        // Then this call hits the in-memory cache.
3942        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3943
3944        drop(well_known_mock);
3945
3946        // Now, reset the cache, and observe the endpoints being called again once.
3947        client.reset_well_known().await.unwrap();
3948
3949        server
3950            .mock_well_known()
3951            .error_unrecognized()
3952            .expect(1)
3953            .named("second well-known mock")
3954            .mount()
3955            .await;
3956
3957        // Hits network again.
3958        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3959        // Hits in-memory cache again.
3960        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3961    }
3962
3963    #[async_test]
3964    async fn test_no_network_doesnt_cause_infinite_retries() {
3965        // We want infinite retries for transient errors.
3966        let client = MockClientBuilder::new(None)
3967            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3968            .build()
3969            .await;
3970
3971        // We don't define a mock server on purpose here, so that the error is really a
3972        // network error.
3973        client.whoami().await.unwrap_err();
3974    }
3975
3976    #[async_test]
3977    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3978        let server = MatrixMockServer::new().await;
3979        let client = server.client_builder().build().await;
3980
3981        let room_id = room_id!("!room:example.org");
3982
3983        server
3984            .mock_sync()
3985            .ok_and_run(&client, |builder| {
3986                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3987            })
3988            .await;
3989
3990        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3991        assert_eq!(room.room_id(), room_id);
3992    }
3993
3994    #[async_test]
3995    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3996        let server = MatrixMockServer::new().await;
3997        let client = server.client_builder().build().await;
3998
3999        let room_id = room_id!("!room:example.org");
4000
4001        let client = Arc::new(client);
4002
4003        // Perform the /sync request with a delay so it starts after the
4004        // `await_room_remote_echo` call has happened
4005        spawn({
4006            let client = client.clone();
4007            async move {
4008                sleep(Duration::from_millis(100)).await;
4009
4010                server
4011                    .mock_sync()
4012                    .ok_and_run(&client, |builder| {
4013                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
4014                    })
4015                    .await;
4016            }
4017        });
4018
4019        let room =
4020            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
4021        assert_eq!(room.room_id(), room_id);
4022    }
4023
4024    #[async_test]
4025    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
4026        let client = MockClientBuilder::new(None).build().await;
4027
4028        let room_id = room_id!("!room:example.org");
4029        // Room is not present so the client won't be able to find it. The call will
4030        // timeout.
4031        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
4032    }
4033
4034    #[async_test]
4035    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
4036        let server = MatrixMockServer::new().await;
4037        let client = server.client_builder().build().await;
4038
4039        server.mock_create_room().ok().mount().await;
4040
4041        // Create a room in the internal store
4042        let room = client
4043            .create_room(assign!(CreateRoomRequest::new(), {
4044                invite: vec![],
4045                is_direct: false,
4046            }))
4047            .await
4048            .unwrap();
4049
4050        // Room is locally present, but not synced, the call will timeout
4051        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
4052            .await
4053            .unwrap_err();
4054    }
4055
4056    #[async_test]
4057    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
4058        let server = MatrixMockServer::new().await;
4059        let client = server.client_builder().build().await;
4060
4061        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
4062
4063        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4064        assert_matches!(ret, Ok(true));
4065    }
4066
4067    #[async_test]
4068    async fn test_is_room_alias_available_if_alias_is_resolved() {
4069        let server = MatrixMockServer::new().await;
4070        let client = server.client_builder().build().await;
4071
4072        server
4073            .mock_room_directory_resolve_alias()
4074            .ok("!some_room_id:matrix.org", Vec::new())
4075            .expect(1)
4076            .mount()
4077            .await;
4078
4079        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4080        assert_matches!(ret, Ok(false));
4081    }
4082
4083    #[async_test]
4084    async fn test_is_room_alias_available_if_error_found() {
4085        let server = MatrixMockServer::new().await;
4086        let client = server.client_builder().build().await;
4087
4088        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
4089
4090        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
4091        assert_matches!(ret, Err(_));
4092    }
4093
4094    #[async_test]
4095    async fn test_create_room_alias() {
4096        let server = MatrixMockServer::new().await;
4097        let client = server.client_builder().build().await;
4098
4099        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
4100
4101        let ret = client
4102            .create_room_alias(
4103                room_alias_id!("#some_alias:matrix.org"),
4104                room_id!("!some_room:matrix.org"),
4105            )
4106            .await;
4107        assert_matches!(ret, Ok(()));
4108    }
4109
4110    #[async_test]
4111    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
4112        let server = MatrixMockServer::new().await;
4113        let client = server.client_builder().build().await;
4114
4115        let room_id = room_id!("!a-room:matrix.org");
4116
4117        // Make sure the summary endpoint is called once
4118        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4119
4120        // We create a locally cached invited room
4121        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
4122
4123        // And we get a preview, the server endpoint was reached
4124        let preview = client
4125            .get_room_preview(room_id.into(), Vec::new())
4126            .await
4127            .expect("Room preview should be retrieved");
4128
4129        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
4130    }
4131
4132    #[async_test]
4133    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
4134        let server = MatrixMockServer::new().await;
4135        let client = server.client_builder().build().await;
4136
4137        let room_id = room_id!("!a-room:matrix.org");
4138
4139        // Make sure the summary endpoint is called once
4140        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4141
4142        // We create a locally cached left room
4143        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
4144
4145        // And we get a preview, the server endpoint was reached
4146        let preview = client
4147            .get_room_preview(room_id.into(), Vec::new())
4148            .await
4149            .expect("Room preview should be retrieved");
4150
4151        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
4152    }
4153
4154    #[async_test]
4155    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
4156        let server = MatrixMockServer::new().await;
4157        let client = server.client_builder().build().await;
4158
4159        let room_id = room_id!("!a-room:matrix.org");
4160
4161        // Make sure the summary endpoint is called once
4162        server.mock_room_summary().ok(room_id).mock_once().mount().await;
4163
4164        // We create a locally cached knocked room
4165        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
4166
4167        // And we get a preview, the server endpoint was reached
4168        let preview = client
4169            .get_room_preview(room_id.into(), Vec::new())
4170            .await
4171            .expect("Room preview should be retrieved");
4172
4173        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
4174    }
4175
4176    #[async_test]
4177    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
4178        let server = MatrixMockServer::new().await;
4179        let client = server.client_builder().build().await;
4180
4181        let room_id = room_id!("!a-room:matrix.org");
4182
4183        // Make sure the summary endpoint is not called
4184        server.mock_room_summary().ok(room_id).never().mount().await;
4185
4186        // We create a locally cached joined room
4187        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
4188
4189        // And we get a preview, no server endpoint was reached
4190        let preview = client
4191            .get_room_preview(room_id.into(), Vec::new())
4192            .await
4193            .expect("Room preview should be retrieved");
4194
4195        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
4196    }
4197
4198    #[async_test]
4199    async fn test_media_preview_config() {
4200        let server = MatrixMockServer::new().await;
4201        let client = server.client_builder().build().await;
4202
4203        server
4204            .mock_sync()
4205            .ok_and_run(&client, |builder| {
4206                builder.add_custom_global_account_data(json!({
4207                    "content": {
4208                        "media_previews": "private",
4209                        "invite_avatars": "off"
4210                    },
4211                    "type": "m.media_preview_config"
4212                }));
4213            })
4214            .await;
4215
4216        let (initial_value, stream) =
4217            client.account().observe_media_preview_config().await.unwrap();
4218
4219        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4220        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4221        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4222        pin_mut!(stream);
4223        assert_pending!(stream);
4224
4225        server
4226            .mock_sync()
4227            .ok_and_run(&client, |builder| {
4228                builder.add_custom_global_account_data(json!({
4229                    "content": {
4230                        "media_previews": "off",
4231                        "invite_avatars": "on"
4232                    },
4233                    "type": "m.media_preview_config"
4234                }));
4235            })
4236            .await;
4237
4238        assert_next_matches!(
4239            stream,
4240            MediaPreviewConfigEventContent {
4241                media_previews: Some(MediaPreviews::Off),
4242                invite_avatars: Some(InviteAvatars::On),
4243                ..
4244            }
4245        );
4246        assert_pending!(stream);
4247    }
4248
4249    #[async_test]
4250    async fn test_unstable_media_preview_config() {
4251        let server = MatrixMockServer::new().await;
4252        let client = server.client_builder().build().await;
4253
4254        server
4255            .mock_sync()
4256            .ok_and_run(&client, |builder| {
4257                builder.add_custom_global_account_data(json!({
4258                    "content": {
4259                        "media_previews": "private",
4260                        "invite_avatars": "off"
4261                    },
4262                    "type": "io.element.msc4278.media_preview_config"
4263                }));
4264            })
4265            .await;
4266
4267        let (initial_value, stream) =
4268            client.account().observe_media_preview_config().await.unwrap();
4269
4270        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
4271        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
4272        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
4273        pin_mut!(stream);
4274        assert_pending!(stream);
4275
4276        server
4277            .mock_sync()
4278            .ok_and_run(&client, |builder| {
4279                builder.add_custom_global_account_data(json!({
4280                    "content": {
4281                        "media_previews": "off",
4282                        "invite_avatars": "on"
4283                    },
4284                    "type": "io.element.msc4278.media_preview_config"
4285                }));
4286            })
4287            .await;
4288
4289        assert_next_matches!(
4290            stream,
4291            MediaPreviewConfigEventContent {
4292                media_previews: Some(MediaPreviews::Off),
4293                invite_avatars: Some(InviteAvatars::On),
4294                ..
4295            }
4296        );
4297        assert_pending!(stream);
4298    }
4299
4300    #[async_test]
4301    async fn test_media_preview_config_not_found() {
4302        let server = MatrixMockServer::new().await;
4303        let client = server.client_builder().build().await;
4304
4305        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
4306
4307        assert!(initial_value.is_none());
4308    }
4309
4310    #[async_test]
4311    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
4312        // The default Matrix version we use is 1.11 or higher, so authenticated media
4313        // is supported.
4314        let server = MatrixMockServer::new().await;
4315        let client = server.client_builder().build().await;
4316
4317        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4318
4319        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4320        client.load_or_fetch_max_upload_size().await.unwrap();
4321
4322        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4323    }
4324
4325    #[async_test]
4326    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
4327        // The server must advertise support for the stable feature for authenticated
4328        // media support, so we mock the `GET /versions` response.
4329        let server = MatrixMockServer::new().await;
4330        let client = server.client_builder().no_server_versions().build().await;
4331
4332        server
4333            .mock_versions()
4334            .ok_custom(
4335                &["v1.7", "v1.8", "v1.9", "v1.10"],
4336                &[("org.matrix.msc3916.stable", true)].into(),
4337            )
4338            .named("versions")
4339            .expect(1)
4340            .mount()
4341            .await;
4342
4343        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4344
4345        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4346        client.load_or_fetch_max_upload_size().await.unwrap();
4347
4348        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4349    }
4350
4351    #[async_test]
4352    async fn test_load_or_fetch_max_upload_size_no_auth() {
4353        // The server must not support Matrix 1.11 or higher for unauthenticated
4354        // media requests, so we mock the `GET /versions` response.
4355        let server = MatrixMockServer::new().await;
4356        let client = server.client_builder().no_server_versions().build().await;
4357
4358        server
4359            .mock_versions()
4360            .ok_custom(&["v1.1"], &Default::default())
4361            .named("versions")
4362            .expect(1)
4363            .mount()
4364            .await;
4365
4366        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4367
4368        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4369        client.load_or_fetch_max_upload_size().await.unwrap();
4370
4371        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4372    }
4373
4374    #[async_test]
4375    async fn test_uploading_a_too_large_media_file() {
4376        let server = MatrixMockServer::new().await;
4377        let client = server.client_builder().build().await;
4378
4379        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4380        client.load_or_fetch_max_upload_size().await.unwrap();
4381        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4382
4383        let data = vec![1, 2];
4384        let upload_request =
4385            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4386        let request = SendRequest {
4387            client: client.clone(),
4388            request: upload_request,
4389            config: None,
4390            send_progress: SharedObservable::new(TransmissionProgress::default()),
4391        };
4392        let media_request = SendMediaUploadRequest::new(request);
4393
4394        let error = media_request.await.err();
4395        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4396        assert_eq!(max, uint!(1));
4397        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4398    }
4399
4400    #[async_test]
4401    async fn test_dont_ignore_timeout_on_first_sync() {
4402        let server = MatrixMockServer::new().await;
4403        let client = server.client_builder().build().await;
4404
4405        server
4406            .mock_sync()
4407            .timeout(Some(Duration::from_secs(30)))
4408            .ok(|_| {})
4409            .mock_once()
4410            .named("sync_with_timeout")
4411            .mount()
4412            .await;
4413
4414        // Call the endpoint once to check the timeout.
4415        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4416
4417        timeout(Duration::from_secs(1), async {
4418            stream.next().await.unwrap().unwrap();
4419        })
4420        .await
4421        .unwrap();
4422    }
4423
4424    #[async_test]
4425    async fn test_ignore_timeout_on_first_sync() {
4426        let server = MatrixMockServer::new().await;
4427        let client = server.client_builder().build().await;
4428
4429        server
4430            .mock_sync()
4431            .timeout(None)
4432            .ok(|_| {})
4433            .mock_once()
4434            .named("sync_no_timeout")
4435            .mount()
4436            .await;
4437        server
4438            .mock_sync()
4439            .timeout(Some(Duration::from_secs(30)))
4440            .ok(|_| {})
4441            .mock_once()
4442            .named("sync_with_timeout")
4443            .mount()
4444            .await;
4445
4446        // Call each version of the endpoint once to check the timeouts.
4447        let mut stream = Box::pin(
4448            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4449        );
4450
4451        timeout(Duration::from_secs(1), async {
4452            stream.next().await.unwrap().unwrap();
4453            stream.next().await.unwrap().unwrap();
4454        })
4455        .await
4456        .unwrap();
4457    }
4458
4459    #[async_test]
4460    async fn test_get_dm_room_returns_the_room_we_have_with_this_user() {
4461        let server = MatrixMockServer::new().await;
4462        let client = server.client_builder().build().await;
4463        // This is the user ID that is inside MemberAdditional.
4464        // Note the confusing username, so we can share
4465        // GlobalAccountDataTestEvent::Direct with the invited test.
4466        let user_id = user_id!("@invited:localhost");
4467
4468        // When we receive a sync response saying "invited" is invited to a DM
4469        let f = EventFactory::new();
4470        let response = SyncResponseBuilder::default()
4471            .add_joined_room(
4472                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberAdditional),
4473            )
4474            .add_global_account_data(
4475                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4476            )
4477            .build_sync_response();
4478        client.base_client().receive_sync_response(response).await.unwrap();
4479
4480        // Then get_dm_room finds this room
4481        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4482        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4483    }
4484
4485    #[async_test]
4486    async fn test_get_dm_room_still_finds_room_where_participant_is_only_invited() {
4487        let server = MatrixMockServer::new().await;
4488        let client = server.client_builder().build().await;
4489        // This is the user ID that is inside MemberInvite
4490        let user_id = user_id!("@invited:localhost");
4491
4492        // When we receive a sync response saying "invited" is invited to a DM
4493        let f = EventFactory::new();
4494        let response = SyncResponseBuilder::default()
4495            .add_joined_room(
4496                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberInvite),
4497            )
4498            .add_global_account_data(
4499                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4500            )
4501            .build_sync_response();
4502        client.base_client().receive_sync_response(response).await.unwrap();
4503
4504        // Then get_dm_room finds this room
4505        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4506        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4507    }
4508
4509    #[async_test]
4510    async fn test_get_dm_room_still_finds_left_room() {
4511        // See the discussion in https://github.com/matrix-org/matrix-rust-sdk/issues/2017
4512        // and the high-level issue at https://github.com/vector-im/element-x-ios/issues/1077
4513
4514        let server = MatrixMockServer::new().await;
4515        let client = server.client_builder().build().await;
4516        // This is the user ID that is inside MemberAdditional.
4517        // Note the confusing username, so we can share
4518        // GlobalAccountDataTestEvent::Direct with the invited test.
4519        let user_id = user_id!("@invited:localhost");
4520
4521        // When we receive a sync response saying "invited" is invited to a DM
4522        let f = EventFactory::new();
4523        let response = SyncResponseBuilder::default()
4524            .add_joined_room(
4525                JoinedRoomBuilder::default().add_state_event(StateTestEvent::MemberLeave),
4526            )
4527            .add_global_account_data(
4528                f.direct().add_user(user_id.to_owned().into(), *DEFAULT_TEST_ROOM_ID),
4529            )
4530            .build_sync_response();
4531        client.base_client().receive_sync_response(response).await.unwrap();
4532
4533        // Then get_dm_room finds this room
4534        let found_room = client.get_dm_room(user_id).expect("DM not found!");
4535        assert!(found_room.get_member_no_sync(user_id).await.unwrap().is_some());
4536    }
4537
4538    #[async_test]
4539    async fn test_device_exists() {
4540        let server = MatrixMockServer::new().await;
4541        let client = server.client_builder().build().await;
4542
4543        server.mock_get_device().ok().expect(1).mount().await;
4544
4545        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(true));
4546    }
4547
4548    #[async_test]
4549    async fn test_device_exists_404() {
4550        let server = MatrixMockServer::new().await;
4551        let client = server.client_builder().build().await;
4552
4553        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Ok(false));
4554    }
4555
4556    #[async_test]
4557    async fn test_device_exists_500() {
4558        let server = MatrixMockServer::new().await;
4559        let client = server.client_builder().build().await;
4560
4561        server.mock_get_device().error500().expect(1).mount().await;
4562
4563        assert_matches!(client.device_exists(owned_device_id!("ABCDEF")).await, Err(_));
4564    }
4565
4566    #[async_test]
4567    async fn test_fetching_well_known_with_homeserver_url() {
4568        let server = MatrixMockServer::new().await;
4569        let client = server.client_builder().build().await;
4570        server.mock_well_known().ok().mount().await;
4571
4572        assert_matches!(client.fetch_client_well_known().await, Some(_));
4573    }
4574
4575    #[async_test]
4576    async fn test_fetching_well_known_with_server_name() {
4577        let server = MatrixMockServer::new().await;
4578        let server_name = ServerName::parse(server.server().address().to_string()).unwrap();
4579
4580        server.mock_well_known().ok().mount().await;
4581
4582        let client = MockClientBuilder::new(None)
4583            .on_builder(|builder| builder.insecure_server_name_no_tls(&server_name))
4584            .build()
4585            .await;
4586
4587        assert_matches!(client.fetch_client_well_known().await, Some(_));
4588    }
4589
4590    #[async_test]
4591    async fn test_fetching_well_known_with_domain_part_of_user_id() {
4592        let server = MatrixMockServer::new().await;
4593        server.mock_well_known().ok().mount().await;
4594
4595        let user_id =
4596            UserId::parse(format!("@user:{}", server.server().address())).expect("Invalid user id");
4597        let client = MockClientBuilder::new(None)
4598            .logged_in_with_token("A_TOKEN".to_owned(), user_id, owned_device_id!("ABCDEF"))
4599            .build()
4600            .await;
4601
4602        assert_matches!(client.fetch_client_well_known().await, Some(_));
4603    }
4604}