matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{btree_map, BTreeMap, BTreeSet},
19    fmt::{self, Debug},
20    future::{ready, Future},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{store::LockableCryptoStore, DecryptionSettings};
33use matrix_sdk_base::{
34    event_cache::store::EventCacheStoreLock,
35    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
36    sync::{Notification, RoomUpdates},
37    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
38    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
39};
40use matrix_sdk_common::ttl_cache::TtlCache;
41#[cfg(feature = "e2e-encryption")]
42use ruma::events::{room::encryption::RoomEncryptionEventContent, InitialStateEvent};
43use ruma::{
44    api::{
45        client::{
46            account::whoami,
47            alias::{create_alias, delete_alias, get_alias},
48            authenticated_media,
49            device::{delete_devices, get_devices, update_device},
50            directory::{get_public_rooms, get_public_rooms_filtered},
51            discovery::{
52                discover_homeserver::{self, RtcFocusInfo},
53                get_capabilities::{self, v3::Capabilities},
54                get_supported_versions,
55            },
56            error::ErrorKind,
57            filter::{create_filter::v3::Request as FilterUploadRequest, FilterDefinition},
58            knock::knock_room,
59            media,
60            membership::{join_room_by_id, join_room_by_id_or_alias},
61            room::create_room,
62            session::login::v3::DiscoveryInfo,
63            sync::sync_events,
64            threads::get_thread_subscriptions_changes,
65            uiaa,
66            user_directory::search_users,
67        },
68        error::FromHttpResponseError,
69        federation::discovery::get_server_version,
70        FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
71    },
72    assign,
73    push::Ruleset,
74    time::Instant,
75    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
76    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
77};
78use serde::de::DeserializeOwned;
79use tokio::sync::{broadcast, Mutex, OnceCell, RwLock, RwLockReadGuard};
80use tracing::{debug, error, instrument, trace, warn, Instrument, Span};
81use url::Url;
82
83use self::futures::SendRequest;
84use crate::{
85    authentication::{
86        matrix::MatrixAuth, oauth::OAuth, AuthCtx, AuthData, ReloadSessionCallback,
87        SaveSessionCallback,
88    },
89    config::{RequestConfig, SyncToken},
90    deduplicating_handler::DeduplicatingHandler,
91    error::HttpResult,
92    event_cache::EventCache,
93    event_handler::{
94        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
95        EventHandlerStore, ObservableEventHandler, SyncEvent,
96    },
97    http_client::HttpClient,
98    latest_events::LatestEvents,
99    media::MediaError,
100    notification_settings::NotificationSettings,
101    room::RoomMember,
102    room_preview::RoomPreview,
103    send_queue::{SendQueue, SendQueueData},
104    sliding_sync::Version as SlidingSyncVersion,
105    sync::{RoomUpdate, SyncResponse},
106    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
107    Room, SessionTokens, TransmissionProgress,
108};
109#[cfg(feature = "e2e-encryption")]
110use crate::{
111    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
112    store_locks::CrossProcessStoreLock,
113};
114
115mod builder;
116pub(crate) mod caches;
117pub(crate) mod futures;
118#[cfg(feature = "experimental-search")]
119pub(crate) mod search;
120
121pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
122#[cfg(feature = "experimental-search")]
123use crate::client::search::SearchIndex;
124
125#[cfg(not(target_family = "wasm"))]
126type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
127#[cfg(target_family = "wasm")]
128type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
129
130#[cfg(not(target_family = "wasm"))]
131type NotificationHandlerFn =
132    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
133#[cfg(target_family = "wasm")]
134type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
135
136/// Enum controlling if a loop running callbacks should continue or abort.
137///
138/// This is mainly used in the [`sync_with_callback`] method, the return value
139/// of the provided callback controls if the sync loop should be exited.
140///
141/// [`sync_with_callback`]: #method.sync_with_callback
142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
143pub enum LoopCtrl {
144    /// Continue running the loop.
145    Continue,
146    /// Break out of the loop.
147    Break,
148}
149
150/// Represents changes that can occur to a `Client`s `Session`.
151#[derive(Debug, Clone, PartialEq)]
152pub enum SessionChange {
153    /// The session's token is no longer valid.
154    UnknownToken {
155        /// Whether or not the session was soft logged out
156        soft_logout: bool,
157    },
158    /// The session's tokens have been refreshed.
159    TokensRefreshed,
160}
161
162/// Information about the server vendor obtained from the federation API.
163#[derive(Debug, Clone, PartialEq, Eq)]
164#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
165pub struct ServerVendorInfo {
166    /// The server name.
167    pub server_name: String,
168    /// The server version.
169    pub version: String,
170}
171
172/// An async/await enabled Matrix client.
173///
174/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
175#[derive(Clone)]
176pub struct Client {
177    pub(crate) inner: Arc<ClientInner>,
178}
179
180#[derive(Default)]
181pub(crate) struct ClientLocks {
182    /// Lock ensuring that only a single room may be marked as a DM at once.
183    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
184    /// explanation.
185    pub(crate) mark_as_dm_lock: Mutex<()>,
186
187    /// Lock ensuring that only a single secret store is getting opened at the
188    /// same time.
189    ///
190    /// This is important so we don't accidentally create multiple different new
191    /// default secret storage keys.
192    #[cfg(feature = "e2e-encryption")]
193    pub(crate) open_secret_store_lock: Mutex<()>,
194
195    /// Lock ensuring that we're only storing a single secret at a time.
196    ///
197    /// Take a look at the [`SecretStore::put_secret`] method for a more
198    /// detailed explanation.
199    ///
200    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
201    #[cfg(feature = "e2e-encryption")]
202    pub(crate) store_secret_lock: Mutex<()>,
203
204    /// Lock ensuring that only one method at a time might modify our backup.
205    #[cfg(feature = "e2e-encryption")]
206    pub(crate) backup_modify_lock: Mutex<()>,
207
208    /// Lock ensuring that we're going to attempt to upload backups for a single
209    /// requester.
210    #[cfg(feature = "e2e-encryption")]
211    pub(crate) backup_upload_lock: Mutex<()>,
212
213    /// Handler making sure we only have one group session sharing request in
214    /// flight per room.
215    #[cfg(feature = "e2e-encryption")]
216    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
217
218    /// Lock making sure we're only doing one key claim request at a time.
219    #[cfg(feature = "e2e-encryption")]
220    pub(crate) key_claim_lock: Mutex<()>,
221
222    /// Handler to ensure that only one members request is running at a time,
223    /// given a room.
224    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
225
226    /// Handler to ensure that only one encryption state request is running at a
227    /// time, given a room.
228    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
229
230    /// Deduplicating handler for sending read receipts. The string is an
231    /// internal implementation detail, see [`Self::send_single_receipt`].
232    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
233
234    #[cfg(feature = "e2e-encryption")]
235    pub(crate) cross_process_crypto_store_lock:
236        OnceCell<CrossProcessStoreLock<LockableCryptoStore>>,
237
238    /// Latest "generation" of data known by the crypto store.
239    ///
240    /// This is a counter that only increments, set in the database (and can
241    /// wrap). It's incremented whenever some process acquires a lock for the
242    /// first time. *This assumes the crypto store lock is being held, to
243    /// avoid data races on writing to this value in the store*.
244    ///
245    /// The current process will maintain this value in local memory and in the
246    /// DB over time. Observing a different value than the one read in
247    /// memory, when reading from the store indicates that somebody else has
248    /// written into the database under our feet.
249    ///
250    /// TODO: this should live in the `OlmMachine`, since it's information
251    /// related to the lock. As of today (2023-07-28), we blow up the entire
252    /// olm machine when there's a generation mismatch. So storing the
253    /// generation in the olm machine would make the client think there's
254    /// *always* a mismatch, and that's why we need to store the generation
255    /// outside the `OlmMachine`.
256    #[cfg(feature = "e2e-encryption")]
257    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261    /// All the data related to authentication and authorization.
262    pub(crate) auth_ctx: Arc<AuthCtx>,
263
264    /// The URL of the server.
265    ///
266    /// Not to be confused with the `Self::homeserver`. `server` is usually
267    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269    /// homeserver (at the time of writing — 2024-08-28).
270    ///
271    /// This value is optional depending on how the `Client` has been built.
272    /// If it's been built from a homeserver URL directly, we don't know the
273    /// server. However, if the `Client` has been built from a server URL or
274    /// name, then the homeserver has been discovered, and we know both.
275    server: Option<Url>,
276
277    /// The URL of the homeserver to connect to.
278    ///
279    /// This is the URL for the client-server Matrix API.
280    homeserver: StdRwLock<Url>,
281
282    /// The sliding sync version.
283    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285    /// The underlying HTTP client.
286    pub(crate) http_client: HttpClient,
287
288    /// User session data.
289    pub(super) base_client: BaseClient,
290
291    /// Collection of in-memory caches for the [`Client`].
292    pub(crate) caches: ClientCaches,
293
294    /// Collection of locks individual client methods might want to use, either
295    /// to ensure that only a single call to a method happens at once or to
296    /// deduplicate multiple calls to a method.
297    pub(crate) locks: ClientLocks,
298
299    /// The cross-process store locks holder name.
300    ///
301    /// The SDK provides cross-process store locks (see
302    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
303    /// `holder_name` is the value used for all cross-process store locks
304    /// used by this `Client`.
305    ///
306    /// If multiple `Client`s are running in different processes, this
307    /// value MUST be different for each `Client`.
308    cross_process_store_locks_holder_name: String,
309
310    /// A mapping of the times at which the current user sent typing notices,
311    /// keyed by room.
312    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314    /// Event handlers. See `add_event_handler`.
315    pub(crate) event_handlers: EventHandlerStore,
316
317    /// Notification handlers. See `register_notification_handler`.
318    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320    /// The sender-side of channels used to receive room updates.
321    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323    /// The sender-side of a channel used to observe all the room updates of a
324    /// sync response.
325    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327    /// Whether the client should update its homeserver URL with the discovery
328    /// information present in the login response.
329    respect_login_well_known: bool,
330
331    /// An event that can be listened on to wait for a successful sync. The
332    /// event will only be fired if a sync loop is running. Can be used for
333    /// synchronization, e.g. if we send out a request to create a room, we can
334    /// wait for the sync to get the data to fetch a room object from the state
335    /// store.
336    pub(crate) sync_beat: event_listener::Event,
337
338    /// A central cache for events, inactive first.
339    ///
340    /// It becomes active when [`EventCache::subscribe`] is called.
341    pub(crate) event_cache: OnceCell<EventCache>,
342
343    /// End-to-end encryption related state.
344    #[cfg(feature = "e2e-encryption")]
345    pub(crate) e2ee: EncryptionData,
346
347    /// The verification state of our own device.
348    #[cfg(feature = "e2e-encryption")]
349    pub(crate) verification_state: SharedObservable<VerificationState>,
350
351    /// Whether to enable the experimental support for sending and receiving
352    /// encrypted room history on invite, per [MSC4268].
353    ///
354    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) enable_share_history_on_invite: bool,
357
358    /// Data related to the [`SendQueue`].
359    ///
360    /// [`SendQueue`]: crate::send_queue::SendQueue
361    pub(crate) send_queue_data: Arc<SendQueueData>,
362
363    /// The `max_upload_size` value of the homeserver, it contains the max
364    /// request size you can send.
365    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367    /// The entry point to get the [`LatestEvent`] of rooms and threads.
368    ///
369    /// [`LatestEvent`]: crate::latest_event::LatestEvent
370    latest_events: OnceCell<LatestEvents>,
371
372    #[cfg(feature = "experimental-search")]
373    /// Handler for [`RoomIndex`]'s of each room
374    search_index: SearchIndex,
375}
376
377impl ClientInner {
378    /// Create a new `ClientInner`.
379    ///
380    /// All the fields passed as parameters here are those that must be cloned
381    /// upon instantiation of a sub-client, e.g. a client specialized for
382    /// notifications.
383    #[allow(clippy::too_many_arguments)]
384    async fn new(
385        auth_ctx: Arc<AuthCtx>,
386        server: Option<Url>,
387        homeserver: Url,
388        sliding_sync_version: SlidingSyncVersion,
389        http_client: HttpClient,
390        base_client: BaseClient,
391        server_info: ClientServerInfo,
392        respect_login_well_known: bool,
393        event_cache: OnceCell<EventCache>,
394        send_queue: Arc<SendQueueData>,
395        latest_events: OnceCell<LatestEvents>,
396        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
397        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
398        cross_process_store_locks_holder_name: String,
399        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
400    ) -> Arc<Self> {
401        let caches = ClientCaches {
402            server_info: server_info.into(),
403            server_metadata: Mutex::new(TtlCache::new()),
404        };
405
406        let client = Self {
407            server,
408            homeserver: StdRwLock::new(homeserver),
409            auth_ctx,
410            sliding_sync_version: StdRwLock::new(sliding_sync_version),
411            http_client,
412            base_client,
413            caches,
414            locks: Default::default(),
415            cross_process_store_locks_holder_name,
416            typing_notice_times: Default::default(),
417            event_handlers: Default::default(),
418            notification_handlers: Default::default(),
419            room_update_channels: Default::default(),
420            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
421            // ballast for all observers to catch up.
422            room_updates_sender: broadcast::Sender::new(32),
423            respect_login_well_known,
424            sync_beat: event_listener::Event::new(),
425            event_cache,
426            send_queue_data: send_queue,
427            latest_events,
428            #[cfg(feature = "e2e-encryption")]
429            e2ee: EncryptionData::new(encryption_settings),
430            #[cfg(feature = "e2e-encryption")]
431            verification_state: SharedObservable::new(VerificationState::Unknown),
432            #[cfg(feature = "e2e-encryption")]
433            enable_share_history_on_invite,
434            server_max_upload_size: Mutex::new(OnceCell::new()),
435            #[cfg(feature = "experimental-search")]
436            search_index: search_index_handler,
437        };
438
439        #[allow(clippy::let_and_return)]
440        let client = Arc::new(client);
441
442        #[cfg(feature = "e2e-encryption")]
443        client.e2ee.initialize_tasks(&client);
444
445        let _ = client
446            .event_cache
447            .get_or_init(|| async {
448                EventCache::new(
449                    WeakClient::from_inner(&client),
450                    client.base_client.event_cache_store().clone(),
451                )
452            })
453            .await;
454
455        client
456    }
457}
458
459#[cfg(not(tarpaulin_include))]
460impl Debug for Client {
461    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
462        write!(fmt, "Client")
463    }
464}
465
466impl Client {
467    /// Create a new [`Client`] that will use the given homeserver.
468    ///
469    /// # Arguments
470    ///
471    /// * `homeserver_url` - The homeserver that the client should connect to.
472    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
473        Self::builder().homeserver_url(homeserver_url).build().await
474    }
475
476    /// Returns a subscriber that publishes an event every time the ignore user
477    /// list changes.
478    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
479        self.inner.base_client.subscribe_to_ignore_user_list_changes()
480    }
481
482    /// Create a new [`ClientBuilder`].
483    pub fn builder() -> ClientBuilder {
484        ClientBuilder::new()
485    }
486
487    pub(crate) fn base_client(&self) -> &BaseClient {
488        &self.inner.base_client
489    }
490
491    /// The underlying HTTP client.
492    pub fn http_client(&self) -> &reqwest::Client {
493        &self.inner.http_client.inner
494    }
495
496    pub(crate) fn locks(&self) -> &ClientLocks {
497        &self.inner.locks
498    }
499
500    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
501        &self.inner.auth_ctx
502    }
503
504    /// The cross-process store locks holder name.
505    ///
506    /// The SDK provides cross-process store locks (see
507    /// [`matrix_sdk_common::store_locks::CrossProcessStoreLock`]). The
508    /// `holder_name` is the value used for all cross-process store locks
509    /// used by this `Client`.
510    pub fn cross_process_store_locks_holder_name(&self) -> &str {
511        &self.inner.cross_process_store_locks_holder_name
512    }
513
514    /// Change the homeserver URL used by this client.
515    ///
516    /// # Arguments
517    ///
518    /// * `homeserver_url` - The new URL to use.
519    fn set_homeserver(&self, homeserver_url: Url) {
520        *self.inner.homeserver.write().unwrap() = homeserver_url;
521    }
522
523    /// Get the capabilities of the homeserver.
524    ///
525    /// This method should be used to check what features are supported by the
526    /// homeserver.
527    ///
528    /// # Examples
529    ///
530    /// ```no_run
531    /// # use matrix_sdk::Client;
532    /// # use url::Url;
533    /// # async {
534    /// # let homeserver = Url::parse("http://example.com")?;
535    /// let client = Client::new(homeserver).await?;
536    ///
537    /// let capabilities = client.get_capabilities().await?;
538    ///
539    /// if capabilities.change_password.enabled {
540    ///     // Change password
541    /// }
542    /// # anyhow::Ok(()) };
543    /// ```
544    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
545        let res = self.send(get_capabilities::v3::Request::new()).await?;
546        Ok(res.capabilities)
547    }
548
549    /// Get the server vendor information from the federation API.
550    ///
551    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
552    /// both the server's software name and version.
553    ///
554    /// # Examples
555    ///
556    /// ```no_run
557    /// # use matrix_sdk::Client;
558    /// # use url::Url;
559    /// # async {
560    /// # let homeserver = Url::parse("http://example.com")?;
561    /// let client = Client::new(homeserver).await?;
562    ///
563    /// let server_info = client.server_vendor_info().await?;
564    /// println!(
565    ///     "Server: {}, Version: {}",
566    ///     server_info.server_name, server_info.version
567    /// );
568    /// # anyhow::Ok(()) };
569    /// ```
570    pub async fn server_vendor_info(&self) -> HttpResult<ServerVendorInfo> {
571        let res = self.send(get_server_version::v1::Request::new()).await?;
572
573        // Extract server info, using defaults if fields are missing.
574        let server = res.server.unwrap_or_default();
575        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
576        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
577
578        Ok(ServerVendorInfo { server_name: server_name_str, version })
579    }
580
581    /// Get a copy of the default request config.
582    ///
583    /// The default request config is what's used when sending requests if no
584    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
585    /// function with such a parameter.
586    ///
587    /// If the default request config was not customized through
588    /// [`ClientBuilder`] when creating this `Client`, the returned value will
589    /// be equivalent to [`RequestConfig::default()`].
590    pub fn request_config(&self) -> RequestConfig {
591        self.inner.http_client.request_config
592    }
593
594    /// Check whether the client has been activated.
595    ///
596    /// A client is considered active when:
597    ///
598    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
599    ///    is logged in,
600    /// 2. Has loaded cached data from storage,
601    /// 3. If encryption is enabled, it also initialized or restored its
602    ///    `OlmMachine`.
603    pub fn is_active(&self) -> bool {
604        self.inner.base_client.is_active()
605    }
606
607    /// The server used by the client.
608    ///
609    /// See `Self::server` to learn more.
610    pub fn server(&self) -> Option<&Url> {
611        self.inner.server.as_ref()
612    }
613
614    /// The homeserver of the client.
615    pub fn homeserver(&self) -> Url {
616        self.inner.homeserver.read().unwrap().clone()
617    }
618
619    /// Get the sliding sync version.
620    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
621        self.inner.sliding_sync_version.read().unwrap().clone()
622    }
623
624    /// Override the sliding sync version.
625    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
626        let mut lock = self.inner.sliding_sync_version.write().unwrap();
627        *lock = version;
628    }
629
630    /// Get the Matrix user session meta information.
631    ///
632    /// If the client is currently logged in, this will return a
633    /// [`SessionMeta`] object which contains the user ID and device ID.
634    /// Otherwise it returns `None`.
635    pub fn session_meta(&self) -> Option<&SessionMeta> {
636        self.base_client().session_meta()
637    }
638
639    /// Returns a receiver that gets events for each room info update. To watch
640    /// for new events, use `receiver.resubscribe()`.
641    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
642        self.base_client().room_info_notable_update_receiver()
643    }
644
645    /// Performs a search for users.
646    /// The search is performed case-insensitively on user IDs and display names
647    ///
648    /// # Arguments
649    ///
650    /// * `search_term` - The search term for the search
651    /// * `limit` - The maximum number of results to return. Defaults to 10.
652    ///
653    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
654    pub async fn search_users(
655        &self,
656        search_term: &str,
657        limit: u64,
658    ) -> HttpResult<search_users::v3::Response> {
659        let mut request = search_users::v3::Request::new(search_term.to_owned());
660
661        if let Some(limit) = UInt::new(limit) {
662            request.limit = limit;
663        }
664
665        self.send(request).await
666    }
667
668    /// Get the user id of the current owner of the client.
669    pub fn user_id(&self) -> Option<&UserId> {
670        self.session_meta().map(|s| s.user_id.as_ref())
671    }
672
673    /// Get the device ID that identifies the current session.
674    pub fn device_id(&self) -> Option<&DeviceId> {
675        self.session_meta().map(|s| s.device_id.as_ref())
676    }
677
678    /// Get the current access token for this session.
679    ///
680    /// Will be `None` if the client has not been logged in.
681    pub fn access_token(&self) -> Option<String> {
682        self.auth_ctx().access_token()
683    }
684
685    /// Get the current tokens for this session.
686    ///
687    /// To be notified of changes in the session tokens, use
688    /// [`Client::subscribe_to_session_changes()`] or
689    /// [`Client::set_session_callbacks()`].
690    ///
691    /// Returns `None` if the client has not been logged in.
692    pub fn session_tokens(&self) -> Option<SessionTokens> {
693        self.auth_ctx().session_tokens()
694    }
695
696    /// Access the authentication API used to log in this client.
697    ///
698    /// Will be `None` if the client has not been logged in.
699    pub fn auth_api(&self) -> Option<AuthApi> {
700        match self.auth_ctx().auth_data.get()? {
701            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
702            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
703        }
704    }
705
706    /// Get the whole session info of this client.
707    ///
708    /// Will be `None` if the client has not been logged in.
709    ///
710    /// Can be used with [`Client::restore_session`] to restore a previously
711    /// logged-in session.
712    pub fn session(&self) -> Option<AuthSession> {
713        match self.auth_api()? {
714            AuthApi::Matrix(api) => api.session().map(Into::into),
715            AuthApi::OAuth(api) => api.full_session().map(Into::into),
716        }
717    }
718
719    /// Get a reference to the state store.
720    pub fn state_store(&self) -> &DynStateStore {
721        self.base_client().state_store()
722    }
723
724    /// Get a reference to the event cache store.
725    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
726        self.base_client().event_cache_store()
727    }
728
729    /// Access the native Matrix authentication API with this client.
730    pub fn matrix_auth(&self) -> MatrixAuth {
731        MatrixAuth::new(self.clone())
732    }
733
734    /// Get the account of the current owner of the client.
735    pub fn account(&self) -> Account {
736        Account::new(self.clone())
737    }
738
739    /// Get the encryption manager of the client.
740    #[cfg(feature = "e2e-encryption")]
741    pub fn encryption(&self) -> Encryption {
742        Encryption::new(self.clone())
743    }
744
745    /// Get the media manager of the client.
746    pub fn media(&self) -> Media {
747        Media::new(self.clone())
748    }
749
750    /// Get the pusher manager of the client.
751    pub fn pusher(&self) -> Pusher {
752        Pusher::new(self.clone())
753    }
754
755    /// Access the OAuth 2.0 API of the client.
756    pub fn oauth(&self) -> OAuth {
757        OAuth::new(self.clone())
758    }
759
760    /// Register a handler for a specific event type.
761    ///
762    /// The handler is a function or closure with one or more arguments. The
763    /// first argument is the event itself. All additional arguments are
764    /// "context" arguments: They have to implement [`EventHandlerContext`].
765    /// This trait is named that way because most of the types implementing it
766    /// give additional context about an event: The room it was in, its raw form
767    /// and other similar things. As two exceptions to this,
768    /// [`Client`] and [`EventHandlerHandle`] also implement the
769    /// `EventHandlerContext` trait so you don't have to clone your client
770    /// into the event handler manually and a handler can decide to remove
771    /// itself.
772    ///
773    /// Some context arguments are not universally applicable. A context
774    /// argument that isn't available for the given event type will result in
775    /// the event handler being skipped and an error being logged. The following
776    /// context argument types are only available for a subset of event types:
777    ///
778    /// * [`Room`] is only available for room-specific events, i.e. not for
779    ///   events like global account data events or presence events.
780    ///
781    /// You can provide custom context via
782    /// [`add_event_handler_context`](Client::add_event_handler_context) and
783    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
784    /// into the event handler.
785    ///
786    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
787    ///
788    /// # Examples
789    ///
790    /// ```no_run
791    /// use matrix_sdk::{
792    ///     deserialized_responses::EncryptionInfo,
793    ///     event_handler::Ctx,
794    ///     ruma::{
795    ///         events::{
796    ///             macros::EventContent,
797    ///             push_rules::PushRulesEvent,
798    ///             room::{message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent},
799    ///         },
800    ///         push::Action,
801    ///         Int, MilliSecondsSinceUnixEpoch,
802    ///     },
803    ///     Client, Room,
804    /// };
805    /// use serde::{Deserialize, Serialize};
806    ///
807    /// # async fn example(client: Client) {
808    /// client.add_event_handler(
809    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
810    ///         // Common usage: Room event plus room and client.
811    ///     },
812    /// );
813    /// client.add_event_handler(
814    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
815    ///         async move {
816    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
817    ///             // unencrypted events and events that were decrypted by the SDK.
818    ///         }
819    ///     },
820    /// );
821    /// client.add_event_handler(
822    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
823    ///         async move {
824    ///             // A `Vec<Action>` parameter allows you to know which push actions
825    ///             // are applicable for an event. For example, an event with
826    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
827    ///             // in the timeline.
828    ///         }
829    ///     },
830    /// );
831    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
832    ///     // You can omit any or all arguments after the first.
833    /// });
834    ///
835    /// // Registering a temporary event handler:
836    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
837    ///     /* Event handler */
838    /// });
839    /// client.remove_event_handler(handle);
840    ///
841    /// // Registering custom event handler context:
842    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
843    /// struct MyContext {
844    ///     number: usize,
845    /// }
846    /// client.add_event_handler_context(MyContext { number: 5 });
847    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
848    ///     // Use the context
849    /// });
850    ///
851    /// // Custom events work exactly the same way, you just need to declare
852    /// // the content struct and use the EventContent derive macro on it.
853    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
854    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
855    /// struct TokenEventContent {
856    ///     token: String,
857    ///     #[serde(rename = "exp")]
858    ///     expires_at: MilliSecondsSinceUnixEpoch,
859    /// }
860    ///
861    /// client.add_event_handler(|ev: SyncTokenEvent, room: Room| async move {
862    ///     todo!("Display the token");
863    /// });
864    ///
865    /// // Event handler closures can also capture local variables.
866    /// // Make sure they are cheap to clone though, because they will be cloned
867    /// // every time the closure is called.
868    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
869    ///
870    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
871    ///     println!("Calling the handler with identifier {data}");
872    /// });
873    /// # }
874    /// ```
875    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
876    where
877        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
878        H: EventHandler<Ev, Ctx>,
879    {
880        self.add_event_handler_impl(handler, None)
881    }
882
883    /// Register a handler for a specific room, and event type.
884    ///
885    /// This method works the same way as
886    /// [`add_event_handler`][Self::add_event_handler], except that the handler
887    /// will only be called for events in the room with the specified ID. See
888    /// that method for more details on event handler functions.
889    ///
890    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
891    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
892    /// your use case.
893    pub fn add_room_event_handler<Ev, Ctx, H>(
894        &self,
895        room_id: &RoomId,
896        handler: H,
897    ) -> EventHandlerHandle
898    where
899        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
900        H: EventHandler<Ev, Ctx>,
901    {
902        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
903    }
904
905    /// Observe a specific event type.
906    ///
907    /// `Ev` represents the kind of event that will be observed. `Ctx`
908    /// represents the context that will come with the event. It relies on the
909    /// same mechanism as [`Client::add_event_handler`]. The main difference is
910    /// that it returns an [`ObservableEventHandler`] and doesn't require a
911    /// user-defined closure. It is possible to subscribe to the
912    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
913    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
914    /// Ctx)`.
915    ///
916    /// Be careful that only the most recent value can be observed. Subscribers
917    /// are notified when a new value is sent, but there is no guarantee
918    /// that they will see all values.
919    ///
920    /// # Example
921    ///
922    /// Let's see a classical usage:
923    ///
924    /// ```
925    /// use futures_util::StreamExt as _;
926    /// use matrix_sdk::{
927    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
928    ///     Client, Room,
929    /// };
930    ///
931    /// # async fn example(client: Client) -> Option<()> {
932    /// let observer =
933    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
934    ///
935    /// let mut subscriber = observer.subscribe();
936    ///
937    /// let (event, (room, push_actions)) = subscriber.next().await?;
938    /// # Some(())
939    /// # }
940    /// ```
941    ///
942    /// Now let's see how to get several contexts that can be useful for you:
943    ///
944    /// ```
945    /// use matrix_sdk::{
946    ///     deserialized_responses::EncryptionInfo,
947    ///     ruma::{
948    ///         events::room::{
949    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
950    ///         },
951    ///         push::Action,
952    ///     },
953    ///     Client, Room,
954    /// };
955    ///
956    /// # async fn example(client: Client) {
957    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
958    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
959    ///
960    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
961    /// // to distinguish between unencrypted events and events that were decrypted
962    /// // by the SDK.
963    /// let _ = client
964    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
965    ///     );
966    ///
967    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
968    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
969    /// // should be highlighted in the timeline.
970    /// let _ =
971    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
972    ///
973    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
974    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
975    /// # }
976    /// ```
977    ///
978    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
979    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
980    where
981        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
982        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
983    {
984        self.observe_room_events_impl(None)
985    }
986
987    /// Observe a specific room, and event type.
988    ///
989    /// This method works the same way as [`Client::observe_events`], except
990    /// that the observability will only be applied for events in the room with
991    /// the specified ID. See that method for more details.
992    ///
993    /// Be careful that only the most recent value can be observed. Subscribers
994    /// are notified when a new value is sent, but there is no guarantee
995    /// that they will see all values.
996    pub fn observe_room_events<Ev, Ctx>(
997        &self,
998        room_id: &RoomId,
999    ) -> ObservableEventHandler<(Ev, Ctx)>
1000    where
1001        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1002        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1003    {
1004        self.observe_room_events_impl(Some(room_id.to_owned()))
1005    }
1006
1007    /// Shared implementation for `Client::observe_events` and
1008    /// `Client::observe_room_events`.
1009    fn observe_room_events_impl<Ev, Ctx>(
1010        &self,
1011        room_id: Option<OwnedRoomId>,
1012    ) -> ObservableEventHandler<(Ev, Ctx)>
1013    where
1014        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1015        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1016    {
1017        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1018        // new value.
1019        let shared_observable = SharedObservable::new(None);
1020
1021        ObservableEventHandler::new(
1022            shared_observable.clone(),
1023            self.event_handler_drop_guard(self.add_event_handler_impl(
1024                move |event: Ev, context: Ctx| {
1025                    shared_observable.set(Some((event, context)));
1026
1027                    ready(())
1028                },
1029                room_id,
1030            )),
1031        )
1032    }
1033
1034    /// Remove the event handler associated with the handle.
1035    ///
1036    /// Note that you **must not** call `remove_event_handler` from the
1037    /// non-async part of an event handler, that is:
1038    ///
1039    /// ```ignore
1040    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1041    ///     // ⚠ this will cause a deadlock ⚠
1042    ///     client.remove_event_handler(handle);
1043    ///
1044    ///     async move {
1045    ///         // removing the event handler here is fine
1046    ///         client.remove_event_handler(handle);
1047    ///     }
1048    /// })
1049    /// ```
1050    ///
1051    /// Note also that handlers that remove themselves will still execute with
1052    /// events received in the same sync cycle.
1053    ///
1054    /// # Arguments
1055    ///
1056    /// `handle` - The [`EventHandlerHandle`] that is returned when
1057    /// registering the event handler with [`Client::add_event_handler`].
1058    ///
1059    /// # Examples
1060    ///
1061    /// ```no_run
1062    /// # use url::Url;
1063    /// # use tokio::sync::mpsc;
1064    /// #
1065    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1066    /// #
1067    /// use matrix_sdk::{
1068    ///     event_handler::EventHandlerHandle,
1069    ///     ruma::events::room::member::SyncRoomMemberEvent, Client,
1070    /// };
1071    /// #
1072    /// # futures_executor::block_on(async {
1073    /// # let client = matrix_sdk::Client::builder()
1074    /// #     .homeserver_url(homeserver)
1075    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1076    /// #     .build()
1077    /// #     .await
1078    /// #     .unwrap();
1079    ///
1080    /// client.add_event_handler(
1081    ///     |ev: SyncRoomMemberEvent,
1082    ///      client: Client,
1083    ///      handle: EventHandlerHandle| async move {
1084    ///         // Common usage: Check arriving Event is the expected one
1085    ///         println!("Expected RoomMemberEvent received!");
1086    ///         client.remove_event_handler(handle);
1087    ///     },
1088    /// );
1089    /// # });
1090    /// ```
1091    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1092        self.inner.event_handlers.remove(handle);
1093    }
1094
1095    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1096    /// the given handle.
1097    ///
1098    /// When the returned value is dropped, the event handler will be removed.
1099    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1100        EventHandlerDropGuard::new(handle, self.clone())
1101    }
1102
1103    /// Add an arbitrary value for use as event handler context.
1104    ///
1105    /// The value can be obtained in an event handler by adding an argument of
1106    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1107    ///
1108    /// If a value of the same type has been added before, it will be
1109    /// overwritten.
1110    ///
1111    /// # Examples
1112    ///
1113    /// ```no_run
1114    /// use matrix_sdk::{
1115    ///     event_handler::Ctx, ruma::events::room::message::SyncRoomMessageEvent,
1116    ///     Room,
1117    /// };
1118    /// # #[derive(Clone)]
1119    /// # struct SomeType;
1120    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1121    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1122    /// # futures_executor::block_on(async {
1123    /// # let client = matrix_sdk::Client::builder()
1124    /// #     .homeserver_url(homeserver)
1125    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1126    /// #     .build()
1127    /// #     .await
1128    /// #     .unwrap();
1129    ///
1130    /// // Handle used to send messages to the UI part of the app
1131    /// let my_gui_handle: SomeType = obtain_gui_handle();
1132    ///
1133    /// client.add_event_handler_context(my_gui_handle.clone());
1134    /// client.add_event_handler(
1135    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1136    ///         async move {
1137    ///             // gui_handle.send(DisplayMessage { message: ev });
1138    ///         }
1139    ///     },
1140    /// );
1141    /// # });
1142    /// ```
1143    pub fn add_event_handler_context<T>(&self, ctx: T)
1144    where
1145        T: Clone + Send + Sync + 'static,
1146    {
1147        self.inner.event_handlers.add_context(ctx);
1148    }
1149
1150    /// Register a handler for a notification.
1151    ///
1152    /// Similar to [`Client::add_event_handler`], but only allows functions
1153    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1154    /// [`Client`] for now.
1155    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1156    where
1157        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1158        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1159    {
1160        self.inner.notification_handlers.write().await.push(Box::new(
1161            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1162        ));
1163
1164        self
1165    }
1166
1167    /// Subscribe to all updates for the room with the given ID.
1168    ///
1169    /// The returned receiver will receive a new message for each sync response
1170    /// that contains updates for that room.
1171    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1172        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1173            btree_map::Entry::Vacant(entry) => {
1174                let (tx, rx) = broadcast::channel(8);
1175                entry.insert(tx);
1176                rx
1177            }
1178            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1179        }
1180    }
1181
1182    /// Subscribe to all updates to all rooms, whenever any has been received in
1183    /// a sync response.
1184    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1185        self.inner.room_updates_sender.subscribe()
1186    }
1187
1188    pub(crate) async fn notification_handlers(
1189        &self,
1190    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1191        self.inner.notification_handlers.read().await
1192    }
1193
1194    /// Get all the rooms the client knows about.
1195    ///
1196    /// This will return the list of joined, invited, and left rooms.
1197    pub fn rooms(&self) -> Vec<Room> {
1198        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1199    }
1200
1201    /// Get all the rooms the client knows about, filtered by room state.
1202    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1203        self.base_client()
1204            .rooms_filtered(filter)
1205            .into_iter()
1206            .map(|room| Room::new(self.clone(), room))
1207            .collect()
1208    }
1209
1210    /// Get a stream of all the rooms, in addition to the existing rooms.
1211    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1212        let (rooms, stream) = self.base_client().rooms_stream();
1213
1214        let map_room = |room| Room::new(self.clone(), room);
1215
1216        (
1217            rooms.into_iter().map(map_room).collect(),
1218            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1219        )
1220    }
1221
1222    /// Returns the joined rooms this client knows about.
1223    pub fn joined_rooms(&self) -> Vec<Room> {
1224        self.rooms_filtered(RoomStateFilter::JOINED)
1225    }
1226
1227    /// Returns the invited rooms this client knows about.
1228    pub fn invited_rooms(&self) -> Vec<Room> {
1229        self.rooms_filtered(RoomStateFilter::INVITED)
1230    }
1231
1232    /// Returns the left rooms this client knows about.
1233    pub fn left_rooms(&self) -> Vec<Room> {
1234        self.rooms_filtered(RoomStateFilter::LEFT)
1235    }
1236
1237    /// Returns the joined space rooms this client knows about.
1238    pub fn joined_space_rooms(&self) -> Vec<Room> {
1239        self.base_client()
1240            .rooms_filtered(RoomStateFilter::JOINED)
1241            .into_iter()
1242            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1243            .collect()
1244    }
1245
1246    /// Get a room with the given room id.
1247    ///
1248    /// # Arguments
1249    ///
1250    /// `room_id` - The unique id of the room that should be fetched.
1251    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1252        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1253    }
1254
1255    /// Gets the preview of a room, whether the current user has joined it or
1256    /// not.
1257    pub async fn get_room_preview(
1258        &self,
1259        room_or_alias_id: &RoomOrAliasId,
1260        via: Vec<OwnedServerName>,
1261    ) -> Result<RoomPreview> {
1262        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1263            Ok(room_id) => room_id.to_owned(),
1264            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1265        };
1266
1267        if let Some(room) = self.get_room(&room_id) {
1268            // The cached data can only be trusted if the room state is joined or
1269            // banned: for invite and knock rooms, no updates will be received
1270            // for the rooms after the invite/knock action took place so we may
1271            // have very out to date data for important fields such as
1272            // `join_rule`. For left rooms, the homeserver should return the latest info.
1273            match room.state() {
1274                RoomState::Joined | RoomState::Banned => {
1275                    return Ok(RoomPreview::from_known_room(&room).await);
1276                }
1277                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1278            }
1279        }
1280
1281        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1282    }
1283
1284    /// Resolve a room alias to a room id and a list of servers which know
1285    /// about it.
1286    ///
1287    /// # Arguments
1288    ///
1289    /// `room_alias` - The room alias to be resolved.
1290    pub async fn resolve_room_alias(
1291        &self,
1292        room_alias: &RoomAliasId,
1293    ) -> HttpResult<get_alias::v3::Response> {
1294        let request = get_alias::v3::Request::new(room_alias.to_owned());
1295        self.send(request).await
1296    }
1297
1298    /// Checks if a room alias is not in use yet.
1299    ///
1300    /// Returns:
1301    /// - `Ok(true)` if the room alias is available.
1302    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1303    ///   status code).
1304    /// - An `Err` otherwise.
1305    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1306        match self.resolve_room_alias(alias).await {
1307            // The room alias was resolved, so it's already in use.
1308            Ok(_) => Ok(false),
1309            Err(error) => {
1310                match error.client_api_error_kind() {
1311                    // The room alias wasn't found, so it's available.
1312                    Some(ErrorKind::NotFound) => Ok(true),
1313                    _ => Err(error),
1314                }
1315            }
1316        }
1317    }
1318
1319    /// Adds a new room alias associated with a room to the room directory.
1320    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1321        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1322        self.send(request).await?;
1323        Ok(())
1324    }
1325
1326    /// Removes a room alias from the room directory.
1327    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1328        let request = delete_alias::v3::Request::new(alias.to_owned());
1329        self.send(request).await?;
1330        Ok(())
1331    }
1332
1333    /// Update the homeserver from the login response well-known if needed.
1334    ///
1335    /// # Arguments
1336    ///
1337    /// * `login_well_known` - The `well_known` field from a successful login
1338    ///   response.
1339    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1340        if self.inner.respect_login_well_known {
1341            if let Some(well_known) = login_well_known {
1342                if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
1343                    self.set_homeserver(homeserver);
1344                }
1345            }
1346        }
1347    }
1348
1349    /// Similar to [`Client::restore_session_with`], with
1350    /// [`RoomLoadSettings::default()`].
1351    ///
1352    /// # Panics
1353    ///
1354    /// Panics if a session was already restored or logged in.
1355    #[instrument(skip_all)]
1356    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1357        self.restore_session_with(session, RoomLoadSettings::default()).await
1358    }
1359
1360    /// Restore a session previously logged-in using one of the available
1361    /// authentication APIs. The number of rooms to restore is controlled by
1362    /// [`RoomLoadSettings`].
1363    ///
1364    /// See the documentation of the corresponding authentication API's
1365    /// `restore_session` method for more information.
1366    ///
1367    /// # Panics
1368    ///
1369    /// Panics if a session was already restored or logged in.
1370    #[instrument(skip_all)]
1371    pub async fn restore_session_with(
1372        &self,
1373        session: impl Into<AuthSession>,
1374        room_load_settings: RoomLoadSettings,
1375    ) -> Result<()> {
1376        let session = session.into();
1377        match session {
1378            AuthSession::Matrix(session) => {
1379                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1380            }
1381            AuthSession::OAuth(session) => {
1382                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1383            }
1384        }
1385    }
1386
1387    /// Refresh the access token using the authentication API used to log into
1388    /// this session.
1389    ///
1390    /// See the documentation of the authentication API's `refresh_access_token`
1391    /// method for more information.
1392    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1393        let Some(auth_api) = self.auth_api() else {
1394            return Err(RefreshTokenError::RefreshTokenRequired);
1395        };
1396
1397        match auth_api {
1398            AuthApi::Matrix(api) => {
1399                trace!("Token refresh: Using the homeserver.");
1400                Box::pin(api.refresh_access_token()).await?;
1401            }
1402            AuthApi::OAuth(api) => {
1403                trace!("Token refresh: Using OAuth 2.0.");
1404                Box::pin(api.refresh_access_token()).await?;
1405            }
1406        }
1407
1408        Ok(())
1409    }
1410
1411    /// Log out the current session using the proper authentication API.
1412    ///
1413    /// # Errors
1414    ///
1415    /// Returns an error if the session is not authenticated or if an error
1416    /// occurred while making the request to the server.
1417    pub async fn logout(&self) -> Result<(), Error> {
1418        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1419        match auth_api {
1420            AuthApi::Matrix(matrix_auth) => {
1421                matrix_auth.logout().await?;
1422                Ok(())
1423            }
1424            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1425        }
1426    }
1427
1428    /// Get or upload a sync filter.
1429    ///
1430    /// This method will either get a filter ID from the store or upload the
1431    /// filter definition to the homeserver and return the new filter ID.
1432    ///
1433    /// # Arguments
1434    ///
1435    /// * `filter_name` - The unique name of the filter, this name will be used
1436    /// locally to store and identify the filter ID returned by the server.
1437    ///
1438    /// * `definition` - The filter definition that should be uploaded to the
1439    /// server if no filter ID can be found in the store.
1440    ///
1441    /// # Examples
1442    ///
1443    /// ```no_run
1444    /// # use matrix_sdk::{
1445    /// #    Client, config::SyncSettings,
1446    /// #    ruma::api::client::{
1447    /// #        filter::{
1448    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1449    /// #        },
1450    /// #        sync::sync_events::v3::Filter,
1451    /// #    }
1452    /// # };
1453    /// # use url::Url;
1454    /// # async {
1455    /// # let homeserver = Url::parse("http://example.com").unwrap();
1456    /// # let client = Client::new(homeserver).await.unwrap();
1457    /// let mut filter = FilterDefinition::default();
1458    ///
1459    /// // Let's enable member lazy loading.
1460    /// filter.room.state.lazy_load_options =
1461    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1462    ///
1463    /// let filter_id = client
1464    ///     .get_or_upload_filter("sync", filter)
1465    ///     .await
1466    ///     .unwrap();
1467    ///
1468    /// let sync_settings = SyncSettings::new()
1469    ///     .filter(Filter::FilterId(filter_id));
1470    ///
1471    /// let response = client.sync_once(sync_settings).await.unwrap();
1472    /// # };
1473    #[instrument(skip(self, definition))]
1474    pub async fn get_or_upload_filter(
1475        &self,
1476        filter_name: &str,
1477        definition: FilterDefinition,
1478    ) -> Result<String> {
1479        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1480            debug!("Found filter locally");
1481            Ok(filter)
1482        } else {
1483            debug!("Didn't find filter locally");
1484            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1485            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1486            let response = self.send(request).await?;
1487
1488            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1489
1490            Ok(response.filter_id)
1491        }
1492    }
1493
1494    /// Prepare to join a room by ID, by getting the current details about it
1495    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1496        let room = self.get_room(room_id)?;
1497
1498        let inviter = match room.invite_details().await {
1499            Ok(details) => details.inviter,
1500            Err(Error::WrongRoomState(_)) => None,
1501            Err(e) => {
1502                warn!("Error fetching invite details for room: {e:?}");
1503                None
1504            }
1505        };
1506
1507        Some(PreJoinRoomInfo { inviter })
1508    }
1509
1510    /// Finish joining a room.
1511    ///
1512    /// If the room was an invite that should be marked as a DM, will include it
1513    /// in the DM event after creating the joined room.
1514    ///
1515    /// If encrypted history sharing is enabled, will check to see if we have a
1516    /// key bundle, and import it if so.
1517    ///
1518    /// # Arguments
1519    ///
1520    /// * `room_id` - The `RoomId` of the room that was joined.
1521    /// * `pre_join_room_info` - Information about the room before we joined.
1522    async fn finish_join_room(
1523        &self,
1524        room_id: &RoomId,
1525        pre_join_room_info: Option<PreJoinRoomInfo>,
1526    ) -> Result<Room> {
1527        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1528            room.state() == RoomState::Invited
1529                && room.is_direct().await.unwrap_or_else(|e| {
1530                    warn!(%room_id, "is_direct() failed: {e}");
1531                    false
1532                })
1533        } else {
1534            false
1535        };
1536
1537        let base_room = self
1538            .base_client()
1539            .room_joined(
1540                room_id,
1541                pre_join_room_info
1542                    .as_ref()
1543                    .and_then(|info| info.inviter.as_ref())
1544                    .map(|i| i.user_id().to_owned()),
1545            )
1546            .await?;
1547        let room = Room::new(self.clone(), base_room);
1548
1549        if mark_as_dm {
1550            room.set_is_direct(true).await?;
1551        }
1552
1553        #[cfg(feature = "e2e-encryption")]
1554        if self.inner.enable_share_history_on_invite {
1555            if let Some(inviter) =
1556                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1557            {
1558                crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1559                    .await?;
1560            }
1561        }
1562
1563        // Suppress "unused variable" and "unused field" lints
1564        #[cfg(not(feature = "e2e-encryption"))]
1565        let _ = pre_join_room_info.map(|i| i.inviter);
1566
1567        Ok(room)
1568    }
1569
1570    /// Join a room by `RoomId`.
1571    ///
1572    /// Returns the `Room` in the joined state.
1573    ///
1574    /// # Arguments
1575    ///
1576    /// * `room_id` - The `RoomId` of the room to be joined.
1577    #[instrument(skip(self))]
1578    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1579        // See who invited us to this room, if anyone. Note we have to do this before
1580        // making the `/join` request, otherwise we could race against the sync.
1581        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1582
1583        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1584        let response = self.send(request).await?;
1585        self.finish_join_room(&response.room_id, pre_join_info).await
1586    }
1587
1588    /// Join a room by `RoomOrAliasId`.
1589    ///
1590    /// Returns the `Room` in the joined state.
1591    ///
1592    /// # Arguments
1593    ///
1594    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1595    ///   alias looks like `#name:example.com`.
1596    /// * `server_names` - The server names to be used for resolving the alias,
1597    ///   if needs be.
1598    pub async fn join_room_by_id_or_alias(
1599        &self,
1600        alias: &RoomOrAliasId,
1601        server_names: &[OwnedServerName],
1602    ) -> Result<Room> {
1603        let pre_join_info = {
1604            match alias.try_into() {
1605                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1606                Err(_) => {
1607                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1608                    // responding to an invitation to the room, and therefore don't need to handle
1609                    // things that happen as a result of invites.
1610                    None
1611                }
1612            }
1613        };
1614        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1615            via: server_names.to_owned(),
1616        });
1617        let response = self.send(request).await?;
1618        self.finish_join_room(&response.room_id, pre_join_info).await
1619    }
1620
1621    /// Search the homeserver's directory of public rooms.
1622    ///
1623    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1624    /// a `get_public_rooms::Response`.
1625    ///
1626    /// # Arguments
1627    ///
1628    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1629    ///
1630    /// * `since` - Pagination token from a previous request.
1631    ///
1632    /// * `server` - The name of the server, if `None` the requested server is
1633    ///   used.
1634    ///
1635    /// # Examples
1636    /// ```no_run
1637    /// use matrix_sdk::Client;
1638    /// # use url::Url;
1639    /// # let homeserver = Url::parse("http://example.com").unwrap();
1640    /// # let limit = Some(10);
1641    /// # let since = Some("since token");
1642    /// # let server = Some("servername.com".try_into().unwrap());
1643    /// # async {
1644    /// let mut client = Client::new(homeserver).await.unwrap();
1645    ///
1646    /// client.public_rooms(limit, since, server).await;
1647    /// # };
1648    /// ```
1649    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1650    pub async fn public_rooms(
1651        &self,
1652        limit: Option<u32>,
1653        since: Option<&str>,
1654        server: Option<&ServerName>,
1655    ) -> HttpResult<get_public_rooms::v3::Response> {
1656        let limit = limit.map(UInt::from);
1657
1658        let request = assign!(get_public_rooms::v3::Request::new(), {
1659            limit,
1660            since: since.map(ToOwned::to_owned),
1661            server: server.map(ToOwned::to_owned),
1662        });
1663        self.send(request).await
1664    }
1665
1666    /// Create a room with the given parameters.
1667    ///
1668    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1669    /// created room.
1670    ///
1671    /// If you want to create a direct message with one specific user, you can
1672    /// use [`create_dm`][Self::create_dm], which is more convenient than
1673    /// assembling the [`create_room::v3::Request`] yourself.
1674    ///
1675    /// If the `is_direct` field of the request is set to `true` and at least
1676    /// one user is invited, the room will be automatically added to the direct
1677    /// rooms in the account data.
1678    ///
1679    /// # Examples
1680    ///
1681    /// ```no_run
1682    /// use matrix_sdk::{
1683    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1684    ///     Client,
1685    /// };
1686    /// # use url::Url;
1687    /// #
1688    /// # async {
1689    /// # let homeserver = Url::parse("http://example.com").unwrap();
1690    /// let request = CreateRoomRequest::new();
1691    /// let client = Client::new(homeserver).await.unwrap();
1692    /// assert!(client.create_room(request).await.is_ok());
1693    /// # };
1694    /// ```
1695    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1696        let invite = request.invite.clone();
1697        let is_direct_room = request.is_direct;
1698        let response = self.send(request).await?;
1699
1700        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1701
1702        let joined_room = Room::new(self.clone(), base_room);
1703
1704        if is_direct_room && !invite.is_empty() {
1705            if let Err(error) =
1706                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1707            {
1708                // FIXME: Retry in the background
1709                error!("Failed to mark room as DM: {error}");
1710            }
1711        }
1712
1713        Ok(joined_room)
1714    }
1715
1716    /// Create a DM room.
1717    ///
1718    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1719    /// given user being invited, the room marked `is_direct` and both the
1720    /// creator and invitee getting the default maximum power level.
1721    ///
1722    /// If the `e2e-encryption` feature is enabled, the room will also be
1723    /// encrypted.
1724    ///
1725    /// # Arguments
1726    ///
1727    /// * `user_id` - The ID of the user to create a DM for.
1728    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1729        #[cfg(feature = "e2e-encryption")]
1730        let initial_state =
1731            vec![InitialStateEvent::new(RoomEncryptionEventContent::with_recommended_defaults())
1732                .to_raw_any()];
1733
1734        #[cfg(not(feature = "e2e-encryption"))]
1735        let initial_state = vec![];
1736
1737        let request = assign!(create_room::v3::Request::new(), {
1738            invite: vec![user_id.to_owned()],
1739            is_direct: true,
1740            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1741            initial_state,
1742        });
1743
1744        self.create_room(request).await
1745    }
1746
1747    /// Search the homeserver's directory for public rooms with a filter.
1748    ///
1749    /// # Arguments
1750    ///
1751    /// * `room_search` - The easiest way to create this request is using the
1752    ///   `get_public_rooms_filtered::Request` itself.
1753    ///
1754    /// # Examples
1755    ///
1756    /// ```no_run
1757    /// # use url::Url;
1758    /// # use matrix_sdk::Client;
1759    /// # async {
1760    /// # let homeserver = Url::parse("http://example.com")?;
1761    /// use matrix_sdk::ruma::{
1762    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1763    /// };
1764    /// # let mut client = Client::new(homeserver).await?;
1765    ///
1766    /// let mut filter = Filter::new();
1767    /// filter.generic_search_term = Some("rust".to_owned());
1768    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1769    /// request.filter = filter;
1770    ///
1771    /// let response = client.public_rooms_filtered(request).await?;
1772    ///
1773    /// for room in response.chunk {
1774    ///     println!("Found room {room:?}");
1775    /// }
1776    /// # anyhow::Ok(()) };
1777    /// ```
1778    pub async fn public_rooms_filtered(
1779        &self,
1780        request: get_public_rooms_filtered::v3::Request,
1781    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1782        self.send(request).await
1783    }
1784
1785    /// Send an arbitrary request to the server, without updating client state.
1786    ///
1787    /// **Warning:** Because this method *does not* update the client state, it
1788    /// is important to make sure that you account for this yourself, and
1789    /// use wrapper methods where available.  This method should *only* be
1790    /// used if a wrapper method for the endpoint you'd like to use is not
1791    /// available.
1792    ///
1793    /// # Arguments
1794    ///
1795    /// * `request` - A filled out and valid request for the endpoint to be hit
1796    ///
1797    /// * `timeout` - An optional request timeout setting, this overrides the
1798    ///   default request setting if one was set.
1799    ///
1800    /// # Examples
1801    ///
1802    /// ```no_run
1803    /// # use matrix_sdk::{Client, config::SyncSettings};
1804    /// # use url::Url;
1805    /// # async {
1806    /// # let homeserver = Url::parse("http://localhost:8080")?;
1807    /// # let mut client = Client::new(homeserver).await?;
1808    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1809    ///
1810    /// // First construct the request you want to make
1811    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1812    /// // for all available Endpoints
1813    /// let user_id = user_id!("@example:localhost").to_owned();
1814    /// let request = profile::get_profile::v3::Request::new(user_id);
1815    ///
1816    /// // Start the request using Client::send()
1817    /// let response = client.send(request).await?;
1818    ///
1819    /// // Check the corresponding Response struct to find out what types are
1820    /// // returned
1821    /// # anyhow::Ok(()) };
1822    /// ```
1823    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1824    where
1825        Request: OutgoingRequest + Clone + Debug,
1826        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1827    {
1828        SendRequest {
1829            client: self.clone(),
1830            request,
1831            config: None,
1832            send_progress: Default::default(),
1833        }
1834    }
1835
1836    pub(crate) async fn send_inner<Request>(
1837        &self,
1838        request: Request,
1839        config: Option<RequestConfig>,
1840        send_progress: SharedObservable<TransmissionProgress>,
1841    ) -> HttpResult<Request::IncomingResponse>
1842    where
1843        Request: OutgoingRequest + Debug,
1844        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1845    {
1846        let homeserver = self.homeserver().to_string();
1847        let access_token = self.access_token();
1848
1849        self.inner
1850            .http_client
1851            .send(
1852                request,
1853                config,
1854                homeserver,
1855                access_token.as_deref(),
1856                &self.supported_versions().await?,
1857                send_progress,
1858            )
1859            .await
1860    }
1861
1862    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1863        _ = self
1864            .inner
1865            .auth_ctx
1866            .session_change_sender
1867            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1868    }
1869
1870    /// Fetches server versions from network; no caching.
1871    pub async fn fetch_server_versions(
1872        &self,
1873        request_config: Option<RequestConfig>,
1874    ) -> HttpResult<get_supported_versions::Response> {
1875        let server_versions = self
1876            .inner
1877            .http_client
1878            .send(
1879                get_supported_versions::Request::new(),
1880                request_config,
1881                self.homeserver().to_string(),
1882                None,
1883                &SupportedVersions {
1884                    versions: [MatrixVersion::V1_0].into(),
1885                    features: Default::default(),
1886                },
1887                Default::default(),
1888            )
1889            .await?;
1890
1891        Ok(server_versions)
1892    }
1893
1894    /// Fetches client well_known from network; no caching.
1895    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1896        let server_url_string = self
1897            .server()
1898            .unwrap_or(
1899                // Sometimes people configure their well-known directly on the homeserver so use
1900                // this as a fallback when the server name is unknown.
1901                &self.homeserver(),
1902            )
1903            .to_string();
1904
1905        let well_known = self
1906            .inner
1907            .http_client
1908            .send(
1909                discover_homeserver::Request::new(),
1910                Some(RequestConfig::short_retry()),
1911                server_url_string,
1912                None,
1913                &SupportedVersions {
1914                    versions: [MatrixVersion::V1_0].into(),
1915                    features: Default::default(),
1916                },
1917                Default::default(),
1918            )
1919            .await;
1920
1921        match well_known {
1922            Ok(well_known) => Some(well_known),
1923            Err(http_error) => {
1924                // It is perfectly valid to not have a well-known file.
1925                // Maybe we should check for a specific error code to be sure?
1926                warn!("Failed to fetch client well-known: {http_error}");
1927                None
1928            }
1929        }
1930    }
1931
1932    /// Load server info from storage, or fetch them from network and cache
1933    /// them.
1934    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1935        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1936            Ok(Some(stored)) => {
1937                if let Some(server_info) =
1938                    stored.into_server_info().and_then(|info| info.maybe_decode())
1939                {
1940                    return Ok(server_info);
1941                }
1942            }
1943            Ok(None) => {
1944                // fallthrough: cache is empty
1945            }
1946            Err(err) => {
1947                warn!("error when loading cached server info: {err}");
1948                // fallthrough to network.
1949            }
1950        }
1951
1952        let server_versions = self.fetch_server_versions(None).await?;
1953        let well_known = self.fetch_client_well_known().await;
1954        let server_info = ServerInfo::new(
1955            server_versions.versions.clone(),
1956            server_versions.unstable_features.clone(),
1957            well_known.map(Into::into),
1958        );
1959
1960        // Attempt to cache the result in storage.
1961        {
1962            if let Err(err) = self
1963                .state_store()
1964                .set_kv_data(
1965                    StateStoreDataKey::ServerInfo,
1966                    StateStoreDataValue::ServerInfo(server_info.clone()),
1967                )
1968                .await
1969            {
1970                warn!("error when caching server info: {err}");
1971            }
1972        }
1973
1974        Ok(server_info)
1975    }
1976
1977    async fn get_or_load_and_cache_server_info<
1978        Value,
1979        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
1980    >(
1981        &self,
1982        map: MapFunction,
1983    ) -> HttpResult<Value> {
1984        let server_info = &self.inner.caches.server_info;
1985        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
1986            return Ok(val);
1987        }
1988
1989        let mut guarded_server_info = server_info.write().await;
1990        if let CachedValue::Cached(val) = map(&guarded_server_info) {
1991            return Ok(val);
1992        }
1993
1994        let server_info = self.load_or_fetch_server_info().await?;
1995
1996        // Fill both unstable features and server versions at once.
1997        let mut supported = server_info.supported_versions();
1998        if supported.versions.is_empty() {
1999            supported.versions = [MatrixVersion::V1_0].into();
2000        }
2001
2002        guarded_server_info.supported_versions = CachedValue::Cached(supported);
2003        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2004
2005        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2006        // fetch an optional property), the function will always return some.
2007        Ok(map(&guarded_server_info).unwrap_cached_value())
2008    }
2009
2010    /// Get the Matrix versions and features supported by the homeserver by
2011    /// fetching them from the server or the cache.
2012    ///
2013    /// This is equivalent to calling both [`Client::server_versions()`] and
2014    /// [`Client::unstable_features()`]. To always fetch the result from the
2015    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2016    /// and then `.as_supported_versions()` on the response.
2017    ///
2018    /// # Examples
2019    ///
2020    /// ```no_run
2021    /// use ruma::api::{FeatureFlag, MatrixVersion};
2022    /// # use matrix_sdk::{Client, config::SyncSettings};
2023    /// # use url::Url;
2024    /// # async {
2025    /// # let homeserver = Url::parse("http://localhost:8080")?;
2026    /// # let mut client = Client::new(homeserver).await?;
2027    ///
2028    /// let supported = client.supported_versions().await?;
2029    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2030    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2031    ///
2032    /// let msc_x_feature = FeatureFlag::from("msc_x");
2033    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2034    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2035    /// # anyhow::Ok(()) };
2036    /// ```
2037    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2038        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2039            .await
2040    }
2041
2042    /// Get the Matrix versions supported by the homeserver by fetching them
2043    /// from the server or the cache.
2044    ///
2045    /// # Examples
2046    ///
2047    /// ```no_run
2048    /// use ruma::api::MatrixVersion;
2049    /// # use matrix_sdk::{Client, config::SyncSettings};
2050    /// # use url::Url;
2051    /// # async {
2052    /// # let homeserver = Url::parse("http://localhost:8080")?;
2053    /// # let mut client = Client::new(homeserver).await?;
2054    ///
2055    /// let server_versions = client.server_versions().await?;
2056    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2057    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2058    /// # anyhow::Ok(()) };
2059    /// ```
2060    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2061        self.get_or_load_and_cache_server_info(|server_info| {
2062            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2063        })
2064        .await
2065    }
2066
2067    /// Get the unstable features supported by the homeserver by fetching them
2068    /// from the server or the cache.
2069    ///
2070    /// # Examples
2071    ///
2072    /// ```no_run
2073    /// use matrix_sdk::ruma::api::FeatureFlag;
2074    /// # use matrix_sdk::{Client, config::SyncSettings};
2075    /// # use url::Url;
2076    /// # async {
2077    /// # let homeserver = Url::parse("http://localhost:8080")?;
2078    /// # let mut client = Client::new(homeserver).await?;
2079    ///
2080    /// let msc_x_feature = FeatureFlag::from("msc_x");
2081    /// let unstable_features = client.unstable_features().await?;
2082    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2083    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2084    /// # anyhow::Ok(()) };
2085    /// ```
2086    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2087        self.get_or_load_and_cache_server_info(|server_info| {
2088            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2089        })
2090        .await
2091    }
2092
2093    /// Get information about the homeserver's advertised RTC foci by fetching
2094    /// the well-known file from the server or the cache.
2095    ///
2096    /// # Examples
2097    /// ```no_run
2098    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2099    /// # use url::Url;
2100    /// # async {
2101    /// # let homeserver = Url::parse("http://localhost:8080")?;
2102    /// # let mut client = Client::new(homeserver).await?;
2103    /// let rtc_foci = client.rtc_foci().await?;
2104    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2105    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2106    ///     _ => None,
2107    /// });
2108    /// if let Some(info) = default_livekit_focus_info {
2109    ///     println!("Default LiveKit service URL: {}", info.service_url);
2110    /// }
2111    /// # anyhow::Ok(()) };
2112    /// ```
2113    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2114        let well_known = self
2115            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2116            .await?;
2117
2118        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2119    }
2120
2121    /// Empty the server version and unstable features cache.
2122    ///
2123    /// Since the SDK caches server info (versions, unstable features,
2124    /// well-known etc), it's possible to have a stale entry in the cache. This
2125    /// functions makes it possible to force reset it.
2126    pub async fn reset_server_info(&self) -> Result<()> {
2127        // Empty the in-memory caches.
2128        let mut guard = self.inner.caches.server_info.write().await;
2129        guard.supported_versions = CachedValue::NotSet;
2130
2131        // Empty the store cache.
2132        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2133    }
2134
2135    /// Check whether MSC 4028 is enabled on the homeserver.
2136    ///
2137    /// # Examples
2138    ///
2139    /// ```no_run
2140    /// # use matrix_sdk::{Client, config::SyncSettings};
2141    /// # use url::Url;
2142    /// # async {
2143    /// # let homeserver = Url::parse("http://localhost:8080")?;
2144    /// # let mut client = Client::new(homeserver).await?;
2145    /// let msc4028_enabled =
2146    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2147    /// # anyhow::Ok(()) };
2148    /// ```
2149    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2150        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2151    }
2152
2153    /// Get information of all our own devices.
2154    ///
2155    /// # Examples
2156    ///
2157    /// ```no_run
2158    /// # use matrix_sdk::{Client, config::SyncSettings};
2159    /// # use url::Url;
2160    /// # async {
2161    /// # let homeserver = Url::parse("http://localhost:8080")?;
2162    /// # let mut client = Client::new(homeserver).await?;
2163    /// let response = client.devices().await?;
2164    ///
2165    /// for device in response.devices {
2166    ///     println!(
2167    ///         "Device: {} {}",
2168    ///         device.device_id,
2169    ///         device.display_name.as_deref().unwrap_or("")
2170    ///     );
2171    /// }
2172    /// # anyhow::Ok(()) };
2173    /// ```
2174    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2175        let request = get_devices::v3::Request::new();
2176
2177        self.send(request).await
2178    }
2179
2180    /// Delete the given devices from the server.
2181    ///
2182    /// # Arguments
2183    ///
2184    /// * `devices` - The list of devices that should be deleted from the
2185    ///   server.
2186    ///
2187    /// * `auth_data` - This request requires user interactive auth, the first
2188    ///   request needs to set this to `None` and will always fail with an
2189    ///   `UiaaResponse`. The response will contain information for the
2190    ///   interactive auth and the same request needs to be made but this time
2191    ///   with some `auth_data` provided.
2192    ///
2193    /// ```no_run
2194    /// # use matrix_sdk::{
2195    /// #    ruma::{api::client::uiaa, device_id},
2196    /// #    Client, Error, config::SyncSettings,
2197    /// # };
2198    /// # use serde_json::json;
2199    /// # use url::Url;
2200    /// # use std::collections::BTreeMap;
2201    /// # async {
2202    /// # let homeserver = Url::parse("http://localhost:8080")?;
2203    /// # let mut client = Client::new(homeserver).await?;
2204    /// let devices = &[device_id!("DEVICEID").to_owned()];
2205    ///
2206    /// if let Err(e) = client.delete_devices(devices, None).await {
2207    ///     if let Some(info) = e.as_uiaa_response() {
2208    ///         let mut password = uiaa::Password::new(
2209    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2210    ///             "wordpass".to_owned(),
2211    ///         );
2212    ///         password.session = info.session.clone();
2213    ///
2214    ///         client
2215    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2216    ///             .await?;
2217    ///     }
2218    /// }
2219    /// # anyhow::Ok(()) };
2220    pub async fn delete_devices(
2221        &self,
2222        devices: &[OwnedDeviceId],
2223        auth_data: Option<uiaa::AuthData>,
2224    ) -> HttpResult<delete_devices::v3::Response> {
2225        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2226        request.auth = auth_data;
2227
2228        self.send(request).await
2229    }
2230
2231    /// Change the display name of a device owned by the current user.
2232    ///
2233    /// Returns a `update_device::Response` which specifies the result
2234    /// of the operation.
2235    ///
2236    /// # Arguments
2237    ///
2238    /// * `device_id` - The ID of the device to change the display name of.
2239    /// * `display_name` - The new display name to set.
2240    pub async fn rename_device(
2241        &self,
2242        device_id: &DeviceId,
2243        display_name: &str,
2244    ) -> HttpResult<update_device::v3::Response> {
2245        let mut request = update_device::v3::Request::new(device_id.to_owned());
2246        request.display_name = Some(display_name.to_owned());
2247
2248        self.send(request).await
2249    }
2250
2251    /// Synchronize the client's state with the latest state on the server.
2252    ///
2253    /// ## Syncing Events
2254    ///
2255    /// Messages or any other type of event need to be periodically fetched from
2256    /// the server, this is achieved by sending a `/sync` request to the server.
2257    ///
2258    /// The first sync is sent out without a [`token`]. The response of the
2259    /// first sync will contain a [`next_batch`] field which should then be
2260    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2261    /// don't receive the same events multiple times.
2262    ///
2263    /// ## Long Polling
2264    ///
2265    /// A sync should in the usual case always be in flight. The
2266    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2267    /// long the server will wait for new events before it will respond.
2268    /// The server will respond immediately if some new events arrive before the
2269    /// timeout has expired. If no changes arrive and the timeout expires an
2270    /// empty sync response will be sent to the client.
2271    ///
2272    /// This method of sending a request that may not receive a response
2273    /// immediately is called long polling.
2274    ///
2275    /// ## Filtering Events
2276    ///
2277    /// The number or type of messages and events that the client should receive
2278    /// from the server can be altered using a [`Filter`].
2279    ///
2280    /// Filters can be non-trivial and, since they will be sent with every sync
2281    /// request, they may take up a bunch of unnecessary bandwidth.
2282    ///
2283    /// Luckily filters can be uploaded to the server and reused using an unique
2284    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2285    /// method.
2286    ///
2287    /// # Arguments
2288    ///
2289    /// * `sync_settings` - Settings for the sync call, this allows us to set
2290    /// various options to configure the sync:
2291    ///     * [`filter`] - To configure which events we receive and which get
2292    ///       [filtered] by the server
2293    ///     * [`timeout`] - To configure our [long polling] setup.
2294    ///     * [`token`] - To tell the server which events we already received
2295    ///       and where we wish to continue syncing.
2296    ///     * [`full_state`] - To tell the server that we wish to receive all
2297    ///       state events, regardless of our configured [`token`].
2298    ///     * [`set_presence`] - To tell the server to set the presence and to
2299    ///       which state.
2300    ///
2301    /// # Examples
2302    ///
2303    /// ```no_run
2304    /// # use url::Url;
2305    /// # async {
2306    /// # let homeserver = Url::parse("http://localhost:8080")?;
2307    /// # let username = "";
2308    /// # let password = "";
2309    /// use matrix_sdk::{
2310    ///     config::SyncSettings,
2311    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2312    /// };
2313    ///
2314    /// let client = Client::new(homeserver).await?;
2315    /// client.matrix_auth().login_username(username, password).send().await?;
2316    ///
2317    /// // Sync once so we receive the client state and old messages.
2318    /// client.sync_once(SyncSettings::default()).await?;
2319    ///
2320    /// // Register our handler so we start responding once we receive a new
2321    /// // event.
2322    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2323    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2324    /// });
2325    ///
2326    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2327    /// // from our `sync_once()` call automatically.
2328    /// client.sync(SyncSettings::default()).await;
2329    /// # anyhow::Ok(()) };
2330    /// ```
2331    ///
2332    /// [`sync`]: #method.sync
2333    /// [`SyncSettings`]: crate::config::SyncSettings
2334    /// [`token`]: crate::config::SyncSettings#method.token
2335    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2336    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2337    /// [`set_presence`]: ruma::presence::PresenceState
2338    /// [`filter`]: crate::config::SyncSettings#method.filter
2339    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2340    /// [`next_batch`]: SyncResponse#structfield.next_batch
2341    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2342    /// [long polling]: #long-polling
2343    /// [filtered]: #filtering-events
2344    #[instrument(skip(self))]
2345    pub async fn sync_once(
2346        &self,
2347        sync_settings: crate::config::SyncSettings,
2348    ) -> Result<SyncResponse> {
2349        // The sync might not return for quite a while due to the timeout.
2350        // We'll see if there's anything crypto related to send out before we
2351        // sync, i.e. if we closed our client after a sync but before the
2352        // crypto requests were sent out.
2353        //
2354        // This will mostly be a no-op.
2355        #[cfg(feature = "e2e-encryption")]
2356        if let Err(e) = self.send_outgoing_requests().await {
2357            error!(error = ?e, "Error while sending outgoing E2EE requests");
2358        }
2359
2360        let token = match sync_settings.token {
2361            SyncToken::Specific(token) => Some(token),
2362            SyncToken::NoToken => None,
2363            SyncToken::ReusePrevious => self.sync_token().await,
2364        };
2365
2366        let request = assign!(sync_events::v3::Request::new(), {
2367            filter: sync_settings.filter.map(|f| *f),
2368            since: token,
2369            full_state: sync_settings.full_state,
2370            set_presence: sync_settings.set_presence,
2371            timeout: sync_settings.timeout,
2372            use_state_after: true,
2373        });
2374        let mut request_config = self.request_config();
2375        if let Some(timeout) = sync_settings.timeout {
2376            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2377            request_config.timeout = Some(base_timeout + timeout);
2378        }
2379
2380        let response = self.send(request).with_request_config(request_config).await?;
2381        let next_batch = response.next_batch.clone();
2382        let response = self.process_sync(response).await?;
2383
2384        #[cfg(feature = "e2e-encryption")]
2385        if let Err(e) = self.send_outgoing_requests().await {
2386            error!(error = ?e, "Error while sending outgoing E2EE requests");
2387        }
2388
2389        self.inner.sync_beat.notify(usize::MAX);
2390
2391        Ok(SyncResponse::new(next_batch, response))
2392    }
2393
2394    /// Repeatedly synchronize the client state with the server.
2395    ///
2396    /// This method will only return on error, if cancellation is needed
2397    /// the method should be wrapped in a cancelable task or the
2398    /// [`Client::sync_with_callback`] method can be used or
2399    /// [`Client::sync_with_result_callback`] if you want to handle error
2400    /// cases in the loop, too.
2401    ///
2402    /// This method will internally call [`Client::sync_once`] in a loop.
2403    ///
2404    /// This method can be used with the [`Client::add_event_handler`]
2405    /// method to react to individual events. If you instead wish to handle
2406    /// events in a bulk manner the [`Client::sync_with_callback`],
2407    /// [`Client::sync_with_result_callback`] and
2408    /// [`Client::sync_stream`] methods can be used instead. Those methods
2409    /// repeatedly return the whole sync response.
2410    ///
2411    /// # Arguments
2412    ///
2413    /// * `sync_settings` - Settings for the sync call. *Note* that those
2414    ///   settings will be only used for the first sync call. See the argument
2415    ///   docs for [`Client::sync_once`] for more info.
2416    ///
2417    /// # Return
2418    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2419    /// up to the user of the API to check the error and decide whether the sync
2420    /// should continue or not.
2421    ///
2422    /// # Examples
2423    ///
2424    /// ```no_run
2425    /// # use url::Url;
2426    /// # async {
2427    /// # let homeserver = Url::parse("http://localhost:8080")?;
2428    /// # let username = "";
2429    /// # let password = "";
2430    /// use matrix_sdk::{
2431    ///     config::SyncSettings,
2432    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent, Client,
2433    /// };
2434    ///
2435    /// let client = Client::new(homeserver).await?;
2436    /// client.matrix_auth().login_username(&username, &password).send().await?;
2437    ///
2438    /// // Register our handler so we start responding once we receive a new
2439    /// // event.
2440    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2441    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2442    /// });
2443    ///
2444    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2445    /// // automatically.
2446    /// client.sync(SyncSettings::default()).await?;
2447    /// # anyhow::Ok(()) };
2448    /// ```
2449    ///
2450    /// [argument docs]: #method.sync_once
2451    /// [`sync_with_callback`]: #method.sync_with_callback
2452    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2453        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2454    }
2455
2456    /// Repeatedly call sync to synchronize the client state with the server.
2457    ///
2458    /// # Arguments
2459    ///
2460    /// * `sync_settings` - Settings for the sync call. *Note* that those
2461    ///   settings will be only used for the first sync call. See the argument
2462    ///   docs for [`Client::sync_once`] for more info.
2463    ///
2464    /// * `callback` - A callback that will be called every time a successful
2465    ///   response has been fetched from the server. The callback must return a
2466    ///   boolean which signalizes if the method should stop syncing. If the
2467    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2468    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2469    ///
2470    /// # Return
2471    /// The sync runs until an error occurs or the
2472    /// callback indicates that the Loop should stop. If the callback asked for
2473    /// a regular stop, the result will be `Ok(())` otherwise the
2474    /// `Err(Error)` is returned.
2475    ///
2476    /// # Examples
2477    ///
2478    /// The following example demonstrates how to sync forever while sending all
2479    /// the interesting events through a mpsc channel to another thread e.g. a
2480    /// UI thread.
2481    ///
2482    /// ```no_run
2483    /// # use std::time::Duration;
2484    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2485    /// # use url::Url;
2486    /// # async {
2487    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2488    /// # let mut client = Client::new(homeserver).await.unwrap();
2489    ///
2490    /// use tokio::sync::mpsc::channel;
2491    ///
2492    /// let (tx, rx) = channel(100);
2493    ///
2494    /// let sync_channel = &tx;
2495    /// let sync_settings = SyncSettings::new()
2496    ///     .timeout(Duration::from_secs(30));
2497    ///
2498    /// client
2499    ///     .sync_with_callback(sync_settings, |response| async move {
2500    ///         let channel = sync_channel;
2501    ///         for (room_id, room) in response.rooms.joined {
2502    ///             for event in room.timeline.events {
2503    ///                 channel.send(event).await.unwrap();
2504    ///             }
2505    ///         }
2506    ///
2507    ///         LoopCtrl::Continue
2508    ///     })
2509    ///     .await;
2510    /// };
2511    /// ```
2512    #[instrument(skip_all)]
2513    pub async fn sync_with_callback<C>(
2514        &self,
2515        sync_settings: crate::config::SyncSettings,
2516        callback: impl Fn(SyncResponse) -> C,
2517    ) -> Result<(), Error>
2518    where
2519        C: Future<Output = LoopCtrl>,
2520    {
2521        self.sync_with_result_callback(sync_settings, |result| async {
2522            Ok(callback(result?).await)
2523        })
2524        .await
2525    }
2526
2527    /// Repeatedly call sync to synchronize the client state with the server.
2528    ///
2529    /// # Arguments
2530    ///
2531    /// * `sync_settings` - Settings for the sync call. *Note* that those
2532    ///   settings will be only used for the first sync call. See the argument
2533    ///   docs for [`Client::sync_once`] for more info.
2534    ///
2535    /// * `callback` - A callback that will be called every time after a
2536    ///   response has been received, failure or not. The callback returns a
2537    ///   `Result<LoopCtrl, Error>`, too. When returning
2538    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2539    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2540    ///   function returns `Ok(())`. In case the callback can't handle the
2541    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2542    ///   which results in the sync ending and the `Err(Error)` being returned.
2543    ///
2544    /// # Return
2545    /// The sync runs until an error occurs that the callback can't handle or
2546    /// the callback indicates that the Loop should stop. If the callback
2547    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2548    /// `Err(Error)` is returned.
2549    ///
2550    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2551    /// this, and are handled first without sending the result to the
2552    /// callback. Only after they have exceeded is the `Result` handed to
2553    /// the callback.
2554    ///
2555    /// # Examples
2556    ///
2557    /// The following example demonstrates how to sync forever while sending all
2558    /// the interesting events through a mpsc channel to another thread e.g. a
2559    /// UI thread.
2560    ///
2561    /// ```no_run
2562    /// # use std::time::Duration;
2563    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2564    /// # use url::Url;
2565    /// # async {
2566    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2567    /// # let mut client = Client::new(homeserver).await.unwrap();
2568    /// #
2569    /// use tokio::sync::mpsc::channel;
2570    ///
2571    /// let (tx, rx) = channel(100);
2572    ///
2573    /// let sync_channel = &tx;
2574    /// let sync_settings = SyncSettings::new()
2575    ///     .timeout(Duration::from_secs(30));
2576    ///
2577    /// client
2578    ///     .sync_with_result_callback(sync_settings, |response| async move {
2579    ///         let channel = sync_channel;
2580    ///         let sync_response = response?;
2581    ///         for (room_id, room) in sync_response.rooms.joined {
2582    ///              for event in room.timeline.events {
2583    ///                  channel.send(event).await.unwrap();
2584    ///               }
2585    ///         }
2586    ///
2587    ///         Ok(LoopCtrl::Continue)
2588    ///     })
2589    ///     .await;
2590    /// };
2591    /// ```
2592    #[instrument(skip(self, callback))]
2593    pub async fn sync_with_result_callback<C>(
2594        &self,
2595        sync_settings: crate::config::SyncSettings,
2596        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2597    ) -> Result<(), Error>
2598    where
2599        C: Future<Output = Result<LoopCtrl, Error>>,
2600    {
2601        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2602
2603        while let Some(result) = sync_stream.next().await {
2604            trace!("Running callback");
2605            if callback(result).await? == LoopCtrl::Break {
2606                trace!("Callback told us to stop");
2607                break;
2608            }
2609            trace!("Done running callback");
2610        }
2611
2612        Ok(())
2613    }
2614
2615    //// Repeatedly synchronize the client state with the server.
2616    ///
2617    /// This method will internally call [`Client::sync_once`] in a loop and is
2618    /// equivalent to the [`Client::sync`] method but the responses are provided
2619    /// as an async stream.
2620    ///
2621    /// # Arguments
2622    ///
2623    /// * `sync_settings` - Settings for the sync call. *Note* that those
2624    ///   settings will be only used for the first sync call. See the argument
2625    ///   docs for [`Client::sync_once`] for more info.
2626    ///
2627    /// # Examples
2628    ///
2629    /// ```no_run
2630    /// # use url::Url;
2631    /// # async {
2632    /// # let homeserver = Url::parse("http://localhost:8080")?;
2633    /// # let username = "";
2634    /// # let password = "";
2635    /// use futures_util::StreamExt;
2636    /// use matrix_sdk::{config::SyncSettings, Client};
2637    ///
2638    /// let client = Client::new(homeserver).await?;
2639    /// client.matrix_auth().login_username(&username, &password).send().await?;
2640    ///
2641    /// let mut sync_stream =
2642    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2643    ///
2644    /// while let Some(Ok(response)) = sync_stream.next().await {
2645    ///     for room in response.rooms.joined.values() {
2646    ///         for e in &room.timeline.events {
2647    ///             if let Ok(event) = e.raw().deserialize() {
2648    ///                 println!("Received event {:?}", event);
2649    ///             }
2650    ///         }
2651    ///     }
2652    /// }
2653    ///
2654    /// # anyhow::Ok(()) };
2655    /// ```
2656    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2657    #[instrument(skip(self))]
2658    pub async fn sync_stream(
2659        &self,
2660        mut sync_settings: crate::config::SyncSettings,
2661    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2662        let mut is_first_sync = true;
2663        let mut timeout = None;
2664        let mut last_sync_time: Option<Instant> = None;
2665
2666        let parent_span = Span::current();
2667
2668        async_stream::stream!({
2669            loop {
2670                trace!("Syncing");
2671
2672                if sync_settings.ignore_timeout_on_first_sync {
2673                    if is_first_sync {
2674                        timeout = sync_settings.timeout.take();
2675                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2676                        sync_settings.timeout = timeout.take();
2677                    }
2678
2679                    is_first_sync = false;
2680                }
2681
2682                yield self
2683                    .sync_loop_helper(&mut sync_settings)
2684                    .instrument(parent_span.clone())
2685                    .await;
2686
2687                Client::delay_sync(&mut last_sync_time).await
2688            }
2689        })
2690    }
2691
2692    /// Get the current, if any, sync token of the client.
2693    /// This will be None if the client didn't sync at least once.
2694    pub(crate) async fn sync_token(&self) -> Option<String> {
2695        self.inner.base_client.sync_token().await
2696    }
2697
2698    /// Gets information about the owner of a given access token.
2699    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2700        let request = whoami::v3::Request::new();
2701        self.send(request).await
2702    }
2703
2704    /// Subscribes a new receiver to client SessionChange broadcasts.
2705    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2706        let broadcast = &self.auth_ctx().session_change_sender;
2707        broadcast.subscribe()
2708    }
2709
2710    /// Sets the save/restore session callbacks.
2711    ///
2712    /// This is another mechanism to get synchronous updates to session tokens,
2713    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2714    pub fn set_session_callbacks(
2715        &self,
2716        reload_session_callback: Box<ReloadSessionCallback>,
2717        save_session_callback: Box<SaveSessionCallback>,
2718    ) -> Result<()> {
2719        self.inner
2720            .auth_ctx
2721            .reload_session_callback
2722            .set(reload_session_callback)
2723            .map_err(|_| Error::MultipleSessionCallbacks)?;
2724
2725        self.inner
2726            .auth_ctx
2727            .save_session_callback
2728            .set(save_session_callback)
2729            .map_err(|_| Error::MultipleSessionCallbacks)?;
2730
2731        Ok(())
2732    }
2733
2734    /// Get the notification settings of the current owner of the client.
2735    pub async fn notification_settings(&self) -> NotificationSettings {
2736        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2737        NotificationSettings::new(self.clone(), ruleset)
2738    }
2739
2740    /// Create a new specialized `Client` that can process notifications.
2741    ///
2742    /// See [`CrossProcessStoreLock::new`] to learn more about
2743    /// `cross_process_store_locks_holder_name`.
2744    ///
2745    /// [`CrossProcessStoreLock::new`]: matrix_sdk_common::store_locks::CrossProcessStoreLock::new
2746    pub async fn notification_client(
2747        &self,
2748        cross_process_store_locks_holder_name: String,
2749    ) -> Result<Client> {
2750        let client = Client {
2751            inner: ClientInner::new(
2752                self.inner.auth_ctx.clone(),
2753                self.server().cloned(),
2754                self.homeserver(),
2755                self.sliding_sync_version(),
2756                self.inner.http_client.clone(),
2757                self.inner
2758                    .base_client
2759                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2760                    .await?,
2761                self.inner.caches.server_info.read().await.clone(),
2762                self.inner.respect_login_well_known,
2763                self.inner.event_cache.clone(),
2764                self.inner.send_queue_data.clone(),
2765                self.inner.latest_events.clone(),
2766                #[cfg(feature = "e2e-encryption")]
2767                self.inner.e2ee.encryption_settings,
2768                #[cfg(feature = "e2e-encryption")]
2769                self.inner.enable_share_history_on_invite,
2770                cross_process_store_locks_holder_name,
2771                #[cfg(feature = "experimental-search")]
2772                self.inner.search_index.clone(),
2773            )
2774            .await,
2775        };
2776
2777        Ok(client)
2778    }
2779
2780    /// The [`EventCache`] instance for this [`Client`].
2781    pub fn event_cache(&self) -> &EventCache {
2782        // SAFETY: always initialized in the `Client` ctor.
2783        self.inner.event_cache.get().unwrap()
2784    }
2785
2786    /// The [`LatestEvents`] instance for this [`Client`].
2787    pub async fn latest_events(&self) -> &LatestEvents {
2788        self.inner
2789            .latest_events
2790            .get_or_init(|| async {
2791                LatestEvents::new(
2792                    WeakClient::from_client(self),
2793                    self.event_cache().clone(),
2794                    SendQueue::new(self.clone()),
2795                )
2796            })
2797            .await
2798    }
2799
2800    /// Waits until an at least partially synced room is received, and returns
2801    /// it.
2802    ///
2803    /// **Note: this function will loop endlessly until either it finds the room
2804    /// or an externally set timeout happens.**
2805    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2806        loop {
2807            if let Some(room) = self.get_room(room_id) {
2808                if room.is_state_partially_or_fully_synced() {
2809                    debug!("Found just created room!");
2810                    return room;
2811                }
2812                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2813            } else {
2814                debug!("Room wasn't found, waiting for sync beat to try again");
2815            }
2816            self.inner.sync_beat.listen().await;
2817        }
2818    }
2819
2820    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2821    /// join it.
2822    pub async fn knock(
2823        &self,
2824        room_id_or_alias: OwnedRoomOrAliasId,
2825        reason: Option<String>,
2826        server_names: Vec<OwnedServerName>,
2827    ) -> Result<Room> {
2828        let request =
2829            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2830        let response = self.send(request).await?;
2831        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2832        Ok(Room::new(self.clone(), base_room))
2833    }
2834
2835    /// Checks whether the provided `user_id` belongs to an ignored user.
2836    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2837        self.base_client().is_user_ignored(user_id).await
2838    }
2839
2840    /// Gets the `max_upload_size` value from the homeserver, getting either a
2841    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2842    /// missing.
2843    ///
2844    /// Check the spec for more info:
2845    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2846    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2847        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2848        if let Some(data) = max_upload_size_lock.get() {
2849            return Ok(data.to_owned());
2850        }
2851
2852        // Use the authenticated endpoint when the server supports it.
2853        let supported_versions = self.supported_versions().await?;
2854        let use_auth =
2855            authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2856
2857        let upload_size = if use_auth {
2858            self.send(authenticated_media::get_media_config::v1::Request::default())
2859                .await?
2860                .upload_size
2861        } else {
2862            #[allow(deprecated)]
2863            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2864        };
2865
2866        match max_upload_size_lock.set(upload_size) {
2867            Ok(_) => Ok(upload_size),
2868            Err(error) => {
2869                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2870            }
2871        }
2872    }
2873
2874    /// The settings to use for decrypting events.
2875    #[cfg(feature = "e2e-encryption")]
2876    pub fn decryption_settings(&self) -> &DecryptionSettings {
2877        &self.base_client().decryption_settings
2878    }
2879
2880    #[cfg(feature = "experimental-search")]
2881    pub(crate) fn search_index(&self) -> &SearchIndex {
2882        &self.inner.search_index
2883    }
2884
2885    /// Whether the client is configured to take thread subscriptions (MSC4306
2886    /// and MSC4308) into account.
2887    ///
2888    /// This may cause filtering out of thread subscriptions, and loading the
2889    /// thread subscriptions via the sliding sync extension, when the room
2890    /// list service is being used.
2891    pub fn enabled_thread_subscriptions(&self) -> bool {
2892        match self.base_client().threading_support {
2893            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2894            ThreadingSupport::Disabled => false,
2895        }
2896    }
2897
2898    /// Fetch thread subscriptions changes between `from` and up to `to`.
2899    ///
2900    /// The `limit` optional parameter can be used to limit the number of
2901    /// entries in a response. It can also be overridden by the server, if
2902    /// it's deemed too large.
2903    pub async fn fetch_thread_subscriptions(
2904        &self,
2905        from: Option<String>,
2906        to: Option<String>,
2907        limit: Option<UInt>,
2908    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2909        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2910            from,
2911            to,
2912            limit,
2913        });
2914        Ok(self.send(request).await?)
2915    }
2916}
2917
2918#[cfg(any(feature = "testing", test))]
2919impl Client {
2920    /// Test helper to mark users as tracked by the crypto layer.
2921    #[cfg(feature = "e2e-encryption")]
2922    pub async fn update_tracked_users_for_testing(
2923        &self,
2924        user_ids: impl IntoIterator<Item = &UserId>,
2925    ) {
2926        let olm = self.olm_machine().await;
2927        let olm = olm.as_ref().unwrap();
2928        olm.update_tracked_users(user_ids).await.unwrap();
2929    }
2930}
2931
2932/// A weak reference to the inner client, useful when trying to get a handle
2933/// on the owning client.
2934#[derive(Clone, Debug)]
2935pub(crate) struct WeakClient {
2936    client: Weak<ClientInner>,
2937}
2938
2939impl WeakClient {
2940    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
2941    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
2942        Self { client: Arc::downgrade(client) }
2943    }
2944
2945    /// Construct a [`WeakClient`] from a [`Client`].
2946    pub fn from_client(client: &Client) -> Self {
2947        Self::from_inner(&client.inner)
2948    }
2949
2950    /// Attempts to get a [`Client`] from this [`WeakClient`].
2951    pub fn get(&self) -> Option<Client> {
2952        self.client.upgrade().map(|inner| Client { inner })
2953    }
2954
2955    /// Gets the number of strong (`Arc`) pointers still pointing to this
2956    /// client.
2957    #[allow(dead_code)]
2958    pub fn strong_count(&self) -> usize {
2959        self.client.strong_count()
2960    }
2961}
2962
2963#[derive(Clone)]
2964struct ClientServerInfo {
2965    /// The Matrix versions and unstable features the server supports (known
2966    /// ones only).
2967    supported_versions: CachedValue<SupportedVersions>,
2968
2969    /// The server's well-known file, if any.
2970    well_known: CachedValue<Option<WellKnownResponse>>,
2971}
2972
2973/// A cached value that can either be set or not set, used to avoid confusion
2974/// between a value that is set to `None` (because it doesn't exist) and a value
2975/// that has not been cached yet.
2976#[derive(Clone)]
2977enum CachedValue<Value> {
2978    /// A value has been cached.
2979    Cached(Value),
2980    /// Nothing has been cached yet.
2981    NotSet,
2982}
2983
2984impl<Value> CachedValue<Value> {
2985    /// Unwraps the cached value, returning it if it exists.
2986    ///
2987    /// # Panics
2988    ///
2989    /// If the cached value is not set, this will panic.
2990    fn unwrap_cached_value(self) -> Value {
2991        match self {
2992            CachedValue::Cached(value) => value,
2993            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
2994        }
2995    }
2996
2997    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
2998    fn as_ref(&self) -> CachedValue<&Value> {
2999        match self {
3000            Self::Cached(value) => CachedValue::Cached(value),
3001            Self::NotSet => CachedValue::NotSet,
3002        }
3003    }
3004
3005    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3006    /// function to a contained value (if `Cached`) or returns `NotSet` (if
3007    /// `NotSet`).
3008    fn map<Other, F>(self, f: F) -> CachedValue<Other>
3009    where
3010        F: FnOnce(Value) -> Other,
3011    {
3012        match self {
3013            Self::Cached(value) => CachedValue::Cached(f(value)),
3014            Self::NotSet => CachedValue::NotSet,
3015        }
3016    }
3017}
3018
3019/// Information about the state of a room before we joined it.
3020#[derive(Debug, Clone, Default)]
3021struct PreJoinRoomInfo {
3022    /// The user who invited us to the room, if any.
3023    pub inviter: Option<RoomMember>,
3024}
3025
3026// The http mocking library is not supported for wasm32
3027#[cfg(all(test, not(target_family = "wasm")))]
3028pub(crate) mod tests {
3029    use std::{sync::Arc, time::Duration};
3030
3031    use assert_matches::assert_matches;
3032    use assert_matches2::assert_let;
3033    use eyeball::SharedObservable;
3034    use futures_util::{pin_mut, FutureExt, StreamExt};
3035    use js_int::{uint, UInt};
3036    use matrix_sdk_base::{
3037        store::{MemoryStore, StoreConfig},
3038        RoomState,
3039    };
3040    use matrix_sdk_test::{
3041        async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
3042        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
3043    };
3044    #[cfg(target_family = "wasm")]
3045    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3046
3047    use ruma::{
3048        api::{
3049            client::{
3050                discovery::discover_homeserver::RtcFocusInfo,
3051                room::create_room::v3::Request as CreateRoomRequest,
3052            },
3053            FeatureFlag, MatrixVersion,
3054        },
3055        assign,
3056        events::{
3057            ignored_user_list::IgnoredUserListEventContent,
3058            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3059        },
3060        owned_room_id, owned_user_id, room_alias_id, room_id, RoomId, ServerName, UserId,
3061    };
3062    use serde_json::json;
3063    use stream_assert::{assert_next_matches, assert_pending};
3064    use tokio::{
3065        spawn,
3066        time::{sleep, timeout},
3067    };
3068    use url::Url;
3069
3070    use super::Client;
3071    use crate::{
3072        client::{futures::SendMediaUploadRequest, WeakClient},
3073        config::{RequestConfig, SyncSettings},
3074        futures::SendRequest,
3075        media::MediaError,
3076        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3077        Error, TransmissionProgress,
3078    };
3079
3080    #[async_test]
3081    async fn test_account_data() {
3082        let server = MatrixMockServer::new().await;
3083        let client = server.client_builder().build().await;
3084
3085        let f = EventFactory::new();
3086        server
3087            .mock_sync()
3088            .ok_and_run(&client, |builder| {
3089                builder.add_global_account_data(
3090                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3091                );
3092            })
3093            .await;
3094
3095        let content = client
3096            .account()
3097            .account_data::<IgnoredUserListEventContent>()
3098            .await
3099            .unwrap()
3100            .unwrap()
3101            .deserialize()
3102            .unwrap();
3103
3104        assert_eq!(content.ignored_users.len(), 1);
3105    }
3106
3107    #[async_test]
3108    async fn test_successful_discovery() {
3109        // Imagine this is `matrix.org`.
3110        let server = MatrixMockServer::new().await;
3111        let server_url = server.uri();
3112
3113        // Imagine this is `matrix-client.matrix.org`.
3114        let homeserver = MatrixMockServer::new().await;
3115        let homeserver_url = homeserver.uri();
3116
3117        // Imagine Alice has the user ID `@alice:matrix.org`.
3118        let domain = server_url.strip_prefix("http://").unwrap();
3119        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3120
3121        // The `.well-known` is on the server (e.g. `matrix.org`).
3122        server
3123            .mock_well_known()
3124            .ok_with_homeserver_url(&homeserver_url)
3125            .mock_once()
3126            .named("well-known")
3127            .mount()
3128            .await;
3129
3130        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3131        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3132
3133        let client = Client::builder()
3134            .insecure_server_name_no_tls(alice.server_name())
3135            .build()
3136            .await
3137            .unwrap();
3138
3139        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3140        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3141        client.server_versions().await.unwrap();
3142    }
3143
3144    #[async_test]
3145    async fn test_discovery_broken_server() {
3146        let server = MatrixMockServer::new().await;
3147        let server_url = server.uri();
3148        let domain = server_url.strip_prefix("http://").unwrap();
3149        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3150
3151        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3152
3153        assert!(
3154            Client::builder()
3155                .insecure_server_name_no_tls(alice.server_name())
3156                .build()
3157                .await
3158                .is_err(),
3159            "Creating a client from a user ID should fail when the .well-known request fails."
3160        );
3161    }
3162
3163    #[async_test]
3164    async fn test_room_creation() {
3165        let server = MatrixMockServer::new().await;
3166        let client = server.client_builder().build().await;
3167
3168        server
3169            .mock_sync()
3170            .ok_and_run(&client, |builder| {
3171                builder.add_joined_room(
3172                    JoinedRoomBuilder::default()
3173                        .add_state_event(StateTestEvent::Member)
3174                        .add_state_event(StateTestEvent::PowerLevels),
3175                );
3176            })
3177            .await;
3178
3179        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3180        assert_eq!(room.state(), RoomState::Joined);
3181    }
3182
3183    #[async_test]
3184    async fn test_retry_limit_http_requests() {
3185        let server = MatrixMockServer::new().await;
3186        let client = server
3187            .client_builder()
3188            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3189            .build()
3190            .await;
3191
3192        assert!(client.request_config().retry_limit.unwrap() == 3);
3193
3194        server.mock_login().error500().expect(3).mount().await;
3195
3196        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3197    }
3198
3199    #[async_test]
3200    async fn test_retry_timeout_http_requests() {
3201        // Keep this timeout small so that the test doesn't take long
3202        let retry_timeout = Duration::from_secs(5);
3203        let server = MatrixMockServer::new().await;
3204        let client = server
3205            .client_builder()
3206            .on_builder(|builder| {
3207                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3208            })
3209            .build()
3210            .await;
3211
3212        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3213
3214        server.mock_login().error500().expect(2..).mount().await;
3215
3216        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3217    }
3218
3219    #[async_test]
3220    async fn test_short_retry_initial_http_requests() {
3221        let server = MatrixMockServer::new().await;
3222        let client = server
3223            .client_builder()
3224            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3225            .build()
3226            .await;
3227
3228        server.mock_login().error500().expect(3..).mount().await;
3229
3230        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3231    }
3232
3233    #[async_test]
3234    async fn test_no_retry_http_requests() {
3235        let server = MatrixMockServer::new().await;
3236        let client = server.client_builder().build().await;
3237
3238        server.mock_devices().error500().mock_once().mount().await;
3239
3240        client.devices().await.unwrap_err();
3241    }
3242
3243    #[async_test]
3244    async fn test_set_homeserver() {
3245        let client = MockClientBuilder::new(None).build().await;
3246        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3247
3248        let homeserver = Url::parse("http://example.com/").unwrap();
3249        client.set_homeserver(homeserver.clone());
3250        assert_eq!(client.homeserver(), homeserver);
3251    }
3252
3253    #[async_test]
3254    async fn test_search_user_request() {
3255        let server = MatrixMockServer::new().await;
3256        let client = server.client_builder().build().await;
3257
3258        server.mock_user_directory().ok().mock_once().mount().await;
3259
3260        let response = client.search_users("test", 50).await.unwrap();
3261        assert_eq!(response.results.len(), 1);
3262        let result = &response.results[0];
3263        assert_eq!(result.user_id.to_string(), "@test:example.me");
3264        assert_eq!(result.display_name.clone().unwrap(), "Test");
3265        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3266        assert!(!response.limited);
3267    }
3268
3269    #[async_test]
3270    async fn test_request_unstable_features() {
3271        let server = MatrixMockServer::new().await;
3272        let client = server.client_builder().no_server_versions().build().await;
3273
3274        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3275
3276        let unstable_features = client.unstable_features().await.unwrap();
3277        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3278        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3279    }
3280
3281    #[async_test]
3282    async fn test_can_homeserver_push_encrypted_event_to_device() {
3283        let server = MatrixMockServer::new().await;
3284        let client = server.client_builder().no_server_versions().build().await;
3285
3286        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3287
3288        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3289        assert!(msc4028_enabled);
3290    }
3291
3292    #[async_test]
3293    async fn test_recently_visited_rooms() {
3294        // Tracking recently visited rooms requires authentication
3295        let client = MockClientBuilder::new(None).unlogged().build().await;
3296        assert_matches!(
3297            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3298            Err(Error::AuthenticationRequired)
3299        );
3300
3301        let client = MockClientBuilder::new(None).build().await;
3302        let account = client.account();
3303
3304        // We should start off with an empty list
3305        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3306
3307        // Tracking a valid room id should add it to the list
3308        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3309        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3310        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3311
3312        // And the existing list shouldn't be changed
3313        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3314        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3315
3316        // Tracking the same room again shouldn't change the list
3317        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3318        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3319        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3320
3321        // Tracking a second room should add it to the front of the list
3322        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3323        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3324        assert_eq!(
3325            account.get_recently_visited_rooms().await.unwrap(),
3326            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3327        );
3328
3329        // Tracking the first room yet again should move it to the front of the list
3330        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3331        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3332        assert_eq!(
3333            account.get_recently_visited_rooms().await.unwrap(),
3334            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3335        );
3336
3337        // Tracking should be capped at 20
3338        for n in 0..20 {
3339            account
3340                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3341                .await
3342                .unwrap();
3343        }
3344
3345        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3346
3347        // And the initial rooms should've been pushed out
3348        let rooms = account.get_recently_visited_rooms().await.unwrap();
3349        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3350        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3351
3352        // And the last tracked room should be the first
3353        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3354    }
3355
3356    #[async_test]
3357    async fn test_client_no_cycle_with_event_cache() {
3358        let client = MockClientBuilder::new(None).build().await;
3359
3360        // Wait for the init tasks to die.
3361        sleep(Duration::from_secs(1)).await;
3362
3363        let weak_client = WeakClient::from_client(&client);
3364        assert_eq!(weak_client.strong_count(), 1);
3365
3366        {
3367            let room_id = room_id!("!room:example.org");
3368
3369            // Have the client know the room.
3370            let response = SyncResponseBuilder::default()
3371                .add_joined_room(JoinedRoomBuilder::new(room_id))
3372                .build_sync_response();
3373            client.inner.base_client.receive_sync_response(response).await.unwrap();
3374
3375            client.event_cache().subscribe().unwrap();
3376
3377            let (_room_event_cache, _drop_handles) =
3378                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3379        }
3380
3381        drop(client);
3382
3383        // Give a bit of time for background tasks to die.
3384        sleep(Duration::from_secs(1)).await;
3385
3386        // The weak client must be the last reference to the client now.
3387        assert_eq!(weak_client.strong_count(), 0);
3388        let client = weak_client.get();
3389        assert!(
3390            client.is_none(),
3391            "too many strong references to the client: {}",
3392            Arc::strong_count(&client.unwrap().inner)
3393        );
3394    }
3395
3396    #[async_test]
3397    async fn test_server_info_caching() {
3398        let server = MatrixMockServer::new().await;
3399        let server_url = server.uri();
3400        let domain = server_url.strip_prefix("http://").unwrap();
3401        let server_name = <&ServerName>::try_from(domain).unwrap();
3402        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3403
3404        let well_known_mock = server
3405            .mock_well_known()
3406            .ok()
3407            .named("well known mock")
3408            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3409            .mount_as_scoped()
3410            .await;
3411
3412        let versions_mock = server
3413            .mock_versions()
3414            .ok_with_unstable_features()
3415            .named("first versions mock")
3416            .expect(1)
3417            .mount_as_scoped()
3418            .await;
3419
3420        let memory_store = Arc::new(MemoryStore::new());
3421        let client = Client::builder()
3422            .insecure_server_name_no_tls(server_name)
3423            .store_config(
3424                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3425                    .state_store(memory_store.clone()),
3426            )
3427            .build()
3428            .await
3429            .unwrap();
3430
3431        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3432
3433        // These subsequent calls hit the in-memory cache.
3434        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3435        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3436
3437        drop(client);
3438
3439        let client = server
3440            .client_builder()
3441            .no_server_versions()
3442            .on_builder(|builder| {
3443                builder.store_config(
3444                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3445                        .state_store(memory_store.clone()),
3446                )
3447            })
3448            .build()
3449            .await;
3450
3451        // These calls to the new client hit the on-disk cache.
3452        assert!(client
3453            .unstable_features()
3454            .await
3455            .unwrap()
3456            .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3457        let supported = client.supported_versions().await.unwrap();
3458        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3459        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3460
3461        // Then this call hits the in-memory cache.
3462        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3463
3464        drop(versions_mock);
3465        drop(well_known_mock);
3466
3467        // Now, reset the cache, and observe the endpoints being called again once.
3468        client.reset_server_info().await.unwrap();
3469
3470        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3471
3472        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3473
3474        // Hits network again.
3475        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3476        // Hits in-memory cache again.
3477        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3478        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3479    }
3480
3481    #[async_test]
3482    async fn test_server_info_without_a_well_known() {
3483        let server = MatrixMockServer::new().await;
3484        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3485
3486        let versions_mock = server
3487            .mock_versions()
3488            .ok_with_unstable_features()
3489            .named("first versions mock")
3490            .expect(1)
3491            .mount_as_scoped()
3492            .await;
3493
3494        let memory_store = Arc::new(MemoryStore::new());
3495        let client = server
3496            .client_builder()
3497            .no_server_versions()
3498            .on_builder(|builder| {
3499                builder.store_config(
3500                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3501                        .state_store(memory_store.clone()),
3502                )
3503            })
3504            .build()
3505            .await;
3506
3507        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3508
3509        // These subsequent calls hit the in-memory cache.
3510        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3511        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3512
3513        drop(client);
3514
3515        let client = server
3516            .client_builder()
3517            .no_server_versions()
3518            .on_builder(|builder| {
3519                builder.store_config(
3520                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3521                        .state_store(memory_store.clone()),
3522                )
3523            })
3524            .build()
3525            .await;
3526
3527        // This call to the new client hits the on-disk cache.
3528        assert!(client
3529            .unstable_features()
3530            .await
3531            .unwrap()
3532            .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3533
3534        // Then this call hits the in-memory cache.
3535        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3536
3537        drop(versions_mock);
3538
3539        // Now, reset the cache, and observe the endpoints being called again once.
3540        client.reset_server_info().await.unwrap();
3541
3542        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3543
3544        // Hits network again.
3545        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3546        // Hits in-memory cache again.
3547        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3548        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3549    }
3550
3551    #[async_test]
3552    async fn test_no_network_doesnt_cause_infinite_retries() {
3553        // We want infinite retries for transient errors.
3554        let client = MockClientBuilder::new(None)
3555            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3556            .build()
3557            .await;
3558
3559        // We don't define a mock server on purpose here, so that the error is really a
3560        // network error.
3561        client.whoami().await.unwrap_err();
3562    }
3563
3564    #[async_test]
3565    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3566        let server = MatrixMockServer::new().await;
3567        let client = server.client_builder().build().await;
3568
3569        let room_id = room_id!("!room:example.org");
3570
3571        server
3572            .mock_sync()
3573            .ok_and_run(&client, |builder| {
3574                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3575            })
3576            .await;
3577
3578        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3579        assert_eq!(room.room_id(), room_id);
3580    }
3581
3582    #[async_test]
3583    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3584        let server = MatrixMockServer::new().await;
3585        let client = server.client_builder().build().await;
3586
3587        let room_id = room_id!("!room:example.org");
3588
3589        let client = Arc::new(client);
3590
3591        // Perform the /sync request with a delay so it starts after the
3592        // `await_room_remote_echo` call has happened
3593        spawn({
3594            let client = client.clone();
3595            async move {
3596                sleep(Duration::from_millis(100)).await;
3597
3598                server
3599                    .mock_sync()
3600                    .ok_and_run(&client, |builder| {
3601                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3602                    })
3603                    .await;
3604            }
3605        });
3606
3607        let room =
3608            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3609        assert_eq!(room.room_id(), room_id);
3610    }
3611
3612    #[async_test]
3613    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3614        let client = MockClientBuilder::new(None).build().await;
3615
3616        let room_id = room_id!("!room:example.org");
3617        // Room is not present so the client won't be able to find it. The call will
3618        // timeout.
3619        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3620    }
3621
3622    #[async_test]
3623    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3624        let server = MatrixMockServer::new().await;
3625        let client = server.client_builder().build().await;
3626
3627        server.mock_create_room().ok().mount().await;
3628
3629        // Create a room in the internal store
3630        let room = client
3631            .create_room(assign!(CreateRoomRequest::new(), {
3632                invite: vec![],
3633                is_direct: false,
3634            }))
3635            .await
3636            .unwrap();
3637
3638        // Room is locally present, but not synced, the call will timeout
3639        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3640            .await
3641            .unwrap_err();
3642    }
3643
3644    #[async_test]
3645    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3646        let server = MatrixMockServer::new().await;
3647        let client = server.client_builder().build().await;
3648
3649        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3650
3651        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3652        assert_matches!(ret, Ok(true));
3653    }
3654
3655    #[async_test]
3656    async fn test_is_room_alias_available_if_alias_is_resolved() {
3657        let server = MatrixMockServer::new().await;
3658        let client = server.client_builder().build().await;
3659
3660        server
3661            .mock_room_directory_resolve_alias()
3662            .ok("!some_room_id:matrix.org", Vec::new())
3663            .expect(1)
3664            .mount()
3665            .await;
3666
3667        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3668        assert_matches!(ret, Ok(false));
3669    }
3670
3671    #[async_test]
3672    async fn test_is_room_alias_available_if_error_found() {
3673        let server = MatrixMockServer::new().await;
3674        let client = server.client_builder().build().await;
3675
3676        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3677
3678        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3679        assert_matches!(ret, Err(_));
3680    }
3681
3682    #[async_test]
3683    async fn test_create_room_alias() {
3684        let server = MatrixMockServer::new().await;
3685        let client = server.client_builder().build().await;
3686
3687        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3688
3689        let ret = client
3690            .create_room_alias(
3691                room_alias_id!("#some_alias:matrix.org"),
3692                room_id!("!some_room:matrix.org"),
3693            )
3694            .await;
3695        assert_matches!(ret, Ok(()));
3696    }
3697
3698    #[async_test]
3699    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3700        let server = MatrixMockServer::new().await;
3701        let client = server.client_builder().build().await;
3702
3703        let room_id = room_id!("!a-room:matrix.org");
3704
3705        // Make sure the summary endpoint is called once
3706        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3707
3708        // We create a locally cached invited room
3709        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3710
3711        // And we get a preview, the server endpoint was reached
3712        let preview = client
3713            .get_room_preview(room_id.into(), Vec::new())
3714            .await
3715            .expect("Room preview should be retrieved");
3716
3717        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3718    }
3719
3720    #[async_test]
3721    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3722        let server = MatrixMockServer::new().await;
3723        let client = server.client_builder().build().await;
3724
3725        let room_id = room_id!("!a-room:matrix.org");
3726
3727        // Make sure the summary endpoint is called once
3728        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3729
3730        // We create a locally cached left room
3731        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3732
3733        // And we get a preview, the server endpoint was reached
3734        let preview = client
3735            .get_room_preview(room_id.into(), Vec::new())
3736            .await
3737            .expect("Room preview should be retrieved");
3738
3739        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3740    }
3741
3742    #[async_test]
3743    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3744        let server = MatrixMockServer::new().await;
3745        let client = server.client_builder().build().await;
3746
3747        let room_id = room_id!("!a-room:matrix.org");
3748
3749        // Make sure the summary endpoint is called once
3750        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3751
3752        // We create a locally cached knocked room
3753        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3754
3755        // And we get a preview, the server endpoint was reached
3756        let preview = client
3757            .get_room_preview(room_id.into(), Vec::new())
3758            .await
3759            .expect("Room preview should be retrieved");
3760
3761        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3762    }
3763
3764    #[async_test]
3765    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3766        let server = MatrixMockServer::new().await;
3767        let client = server.client_builder().build().await;
3768
3769        let room_id = room_id!("!a-room:matrix.org");
3770
3771        // Make sure the summary endpoint is not called
3772        server.mock_room_summary().ok(room_id).never().mount().await;
3773
3774        // We create a locally cached joined room
3775        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3776
3777        // And we get a preview, no server endpoint was reached
3778        let preview = client
3779            .get_room_preview(room_id.into(), Vec::new())
3780            .await
3781            .expect("Room preview should be retrieved");
3782
3783        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3784    }
3785
3786    #[async_test]
3787    async fn test_media_preview_config() {
3788        let server = MatrixMockServer::new().await;
3789        let client = server.client_builder().build().await;
3790
3791        server
3792            .mock_sync()
3793            .ok_and_run(&client, |builder| {
3794                builder.add_custom_global_account_data(json!({
3795                    "content": {
3796                        "media_previews": "private",
3797                        "invite_avatars": "off"
3798                    },
3799                    "type": "m.media_preview_config"
3800                }));
3801            })
3802            .await;
3803
3804        let (initial_value, stream) =
3805            client.account().observe_media_preview_config().await.unwrap();
3806
3807        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3808        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3809        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3810        pin_mut!(stream);
3811        assert_pending!(stream);
3812
3813        server
3814            .mock_sync()
3815            .ok_and_run(&client, |builder| {
3816                builder.add_custom_global_account_data(json!({
3817                    "content": {
3818                        "media_previews": "off",
3819                        "invite_avatars": "on"
3820                    },
3821                    "type": "m.media_preview_config"
3822                }));
3823            })
3824            .await;
3825
3826        assert_next_matches!(
3827            stream,
3828            MediaPreviewConfigEventContent {
3829                media_previews: Some(MediaPreviews::Off),
3830                invite_avatars: Some(InviteAvatars::On),
3831                ..
3832            }
3833        );
3834        assert_pending!(stream);
3835    }
3836
3837    #[async_test]
3838    async fn test_unstable_media_preview_config() {
3839        let server = MatrixMockServer::new().await;
3840        let client = server.client_builder().build().await;
3841
3842        server
3843            .mock_sync()
3844            .ok_and_run(&client, |builder| {
3845                builder.add_custom_global_account_data(json!({
3846                    "content": {
3847                        "media_previews": "private",
3848                        "invite_avatars": "off"
3849                    },
3850                    "type": "io.element.msc4278.media_preview_config"
3851                }));
3852            })
3853            .await;
3854
3855        let (initial_value, stream) =
3856            client.account().observe_media_preview_config().await.unwrap();
3857
3858        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3859        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3860        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3861        pin_mut!(stream);
3862        assert_pending!(stream);
3863
3864        server
3865            .mock_sync()
3866            .ok_and_run(&client, |builder| {
3867                builder.add_custom_global_account_data(json!({
3868                    "content": {
3869                        "media_previews": "off",
3870                        "invite_avatars": "on"
3871                    },
3872                    "type": "io.element.msc4278.media_preview_config"
3873                }));
3874            })
3875            .await;
3876
3877        assert_next_matches!(
3878            stream,
3879            MediaPreviewConfigEventContent {
3880                media_previews: Some(MediaPreviews::Off),
3881                invite_avatars: Some(InviteAvatars::On),
3882                ..
3883            }
3884        );
3885        assert_pending!(stream);
3886    }
3887
3888    #[async_test]
3889    async fn test_media_preview_config_not_found() {
3890        let server = MatrixMockServer::new().await;
3891        let client = server.client_builder().build().await;
3892
3893        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3894
3895        assert!(initial_value.is_none());
3896    }
3897
3898    #[async_test]
3899    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3900        // The default Matrix version we use is 1.11 or higher, so authenticated media
3901        // is supported.
3902        let server = MatrixMockServer::new().await;
3903        let client = server.client_builder().build().await;
3904
3905        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3906
3907        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3908        client.load_or_fetch_max_upload_size().await.unwrap();
3909
3910        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3911    }
3912
3913    #[async_test]
3914    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3915        // The server must advertise support for the stable feature for authenticated
3916        // media support, so we mock the `GET /versions` response.
3917        let server = MatrixMockServer::new().await;
3918        let client = server.client_builder().no_server_versions().build().await;
3919
3920        server
3921            .mock_versions()
3922            .ok_custom(
3923                &["v1.7", "v1.8", "v1.9", "v1.10"],
3924                &[("org.matrix.msc3916.stable", true)].into(),
3925            )
3926            .named("versions")
3927            .expect(1)
3928            .mount()
3929            .await;
3930
3931        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3932
3933        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3934        client.load_or_fetch_max_upload_size().await.unwrap();
3935
3936        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3937    }
3938
3939    #[async_test]
3940    async fn test_load_or_fetch_max_upload_size_no_auth() {
3941        // The server must not support Matrix 1.11 or higher for unauthenticated
3942        // media requests, so we mock the `GET /versions` response.
3943        let server = MatrixMockServer::new().await;
3944        let client = server.client_builder().no_server_versions().build().await;
3945
3946        server
3947            .mock_versions()
3948            .ok_custom(&["v1.1"], &Default::default())
3949            .named("versions")
3950            .expect(1)
3951            .mount()
3952            .await;
3953
3954        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3955
3956        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
3957        client.load_or_fetch_max_upload_size().await.unwrap();
3958
3959        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3960    }
3961
3962    #[async_test]
3963    async fn test_uploading_a_too_large_media_file() {
3964        let server = MatrixMockServer::new().await;
3965        let client = server.client_builder().build().await;
3966
3967        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
3968        client.load_or_fetch_max_upload_size().await.unwrap();
3969        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
3970
3971        let data = vec![1, 2];
3972        let upload_request =
3973            ruma::api::client::media::create_content::v3::Request::new(data.clone());
3974        let request = SendRequest {
3975            client: client.clone(),
3976            request: upload_request,
3977            config: None,
3978            send_progress: SharedObservable::new(TransmissionProgress::default()),
3979        };
3980        let media_request = SendMediaUploadRequest::new(request);
3981
3982        let error = media_request.await.err();
3983        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
3984        assert_eq!(max, uint!(1));
3985        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
3986    }
3987
3988    #[async_test]
3989    async fn test_dont_ignore_timeout_on_first_sync() {
3990        let server = MatrixMockServer::new().await;
3991        let client = server.client_builder().build().await;
3992
3993        server
3994            .mock_sync()
3995            .timeout(Some(Duration::from_secs(30)))
3996            .ok(|_| {})
3997            .mock_once()
3998            .named("sync_with_timeout")
3999            .mount()
4000            .await;
4001
4002        // Call the endpoint once to check the timeout.
4003        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4004
4005        timeout(Duration::from_secs(1), async {
4006            stream.next().await.unwrap().unwrap();
4007        })
4008        .await
4009        .unwrap();
4010    }
4011
4012    #[async_test]
4013    async fn test_ignore_timeout_on_first_sync() {
4014        let server = MatrixMockServer::new().await;
4015        let client = server.client_builder().build().await;
4016
4017        server
4018            .mock_sync()
4019            .timeout(None)
4020            .ok(|_| {})
4021            .mock_once()
4022            .named("sync_no_timeout")
4023            .mount()
4024            .await;
4025        server
4026            .mock_sync()
4027            .timeout(Some(Duration::from_secs(30)))
4028            .ok(|_| {})
4029            .mock_once()
4030            .named("sync_with_timeout")
4031            .mount()
4032            .await;
4033
4034        // Call each version of the endpoint once to check the timeouts.
4035        let mut stream = Box::pin(
4036            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4037        );
4038
4039        timeout(Duration::from_secs(1), async {
4040            stream.next().await.unwrap().unwrap();
4041            stream.next().await.unwrap().unwrap();
4042        })
4043        .await
4044        .unwrap();
4045    }
4046}