matrix_sdk/client/
mod.rs

1// Copyright 2020 Damir Jelić
2// Copyright 2020 The Matrix.org Foundation C.I.C.
3// Copyright 2022 Famedly GmbH
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::{
18    collections::{BTreeMap, BTreeSet, btree_map},
19    fmt::{self, Debug},
20    future::{Future, ready},
21    pin::Pin,
22    sync::{Arc, Mutex as StdMutex, RwLock as StdRwLock, Weak},
23    time::Duration,
24};
25
26use caches::ClientCaches;
27use eyeball::{SharedObservable, Subscriber};
28use eyeball_im::{Vector, VectorDiff};
29use futures_core::Stream;
30use futures_util::StreamExt;
31#[cfg(feature = "e2e-encryption")]
32use matrix_sdk_base::crypto::{DecryptionSettings, store::LockableCryptoStore};
33use matrix_sdk_base::{
34    BaseClient, RoomInfoNotableUpdate, RoomState, RoomStateFilter, SendOutsideWasm, SessionMeta,
35    StateStoreDataKey, StateStoreDataValue, SyncOutsideWasm, ThreadingSupport,
36    event_cache::store::EventCacheStoreLock,
37    media::store::MediaStoreLock,
38    store::{DynStateStore, RoomLoadSettings, ServerInfo, WellKnownResponse},
39    sync::{Notification, RoomUpdates},
40};
41use matrix_sdk_common::ttl_cache::TtlCache;
42#[cfg(feature = "e2e-encryption")]
43use ruma::events::{InitialStateEvent, room::encryption::RoomEncryptionEventContent};
44use ruma::{
45    DeviceId, OwnedDeviceId, OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName,
46    RoomAliasId, RoomId, RoomOrAliasId, ServerName, UInt, UserId,
47    api::{
48        FeatureFlag, MatrixVersion, OutgoingRequest, SupportedVersions,
49        client::{
50            account::whoami,
51            alias::{create_alias, delete_alias, get_alias},
52            authenticated_media,
53            device::{delete_devices, get_devices, update_device},
54            directory::{get_public_rooms, get_public_rooms_filtered},
55            discovery::{
56                discover_homeserver::{self, RtcFocusInfo},
57                get_capabilities::{self, v3::Capabilities},
58                get_supported_versions,
59            },
60            error::ErrorKind,
61            filter::{FilterDefinition, create_filter::v3::Request as FilterUploadRequest},
62            knock::knock_room,
63            media,
64            membership::{join_room_by_id, join_room_by_id_or_alias},
65            room::create_room,
66            session::login::v3::DiscoveryInfo,
67            sync::sync_events,
68            threads::get_thread_subscriptions_changes,
69            uiaa,
70            user_directory::search_users,
71        },
72        error::FromHttpResponseError,
73        federation::discovery::get_server_version,
74    },
75    assign,
76    push::Ruleset,
77    time::Instant,
78};
79use serde::de::DeserializeOwned;
80use tokio::sync::{Mutex, OnceCell, RwLock, RwLockReadGuard, broadcast};
81use tracing::{Instrument, Span, debug, error, instrument, trace, warn};
82use url::Url;
83
84use self::futures::SendRequest;
85use crate::{
86    Account, AuthApi, AuthSession, Error, HttpError, Media, Pusher, RefreshTokenError, Result,
87    Room, SessionTokens, TransmissionProgress,
88    authentication::{
89        AuthCtx, AuthData, ReloadSessionCallback, SaveSessionCallback, matrix::MatrixAuth,
90        oauth::OAuth,
91    },
92    client::thread_subscriptions::ThreadSubscriptionCatchup,
93    config::{RequestConfig, SyncToken},
94    deduplicating_handler::DeduplicatingHandler,
95    error::HttpResult,
96    event_cache::EventCache,
97    event_handler::{
98        EventHandler, EventHandlerContext, EventHandlerDropGuard, EventHandlerHandle,
99        EventHandlerStore, ObservableEventHandler, SyncEvent,
100    },
101    http_client::{HttpClient, SupportedAuthScheme},
102    latest_events::LatestEvents,
103    media::MediaError,
104    notification_settings::NotificationSettings,
105    room::RoomMember,
106    room_preview::RoomPreview,
107    send_queue::{SendQueue, SendQueueData},
108    sliding_sync::Version as SlidingSyncVersion,
109    sync::{RoomUpdate, SyncResponse},
110};
111#[cfg(feature = "e2e-encryption")]
112use crate::{
113    cross_process_lock::CrossProcessLock,
114    encryption::{Encryption, EncryptionData, EncryptionSettings, VerificationState},
115};
116
117mod builder;
118pub(crate) mod caches;
119pub(crate) mod futures;
120pub(crate) mod thread_subscriptions;
121
122pub use self::builder::{ClientBuildError, ClientBuilder, sanitize_server_name};
123#[cfg(feature = "experimental-search")]
124use crate::search_index::SearchIndex;
125
126#[cfg(not(target_family = "wasm"))]
127type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
128#[cfg(target_family = "wasm")]
129type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
130
131#[cfg(not(target_family = "wasm"))]
132type NotificationHandlerFn =
133    Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut + Send + Sync>;
134#[cfg(target_family = "wasm")]
135type NotificationHandlerFn = Box<dyn Fn(Notification, Room, Client) -> NotificationHandlerFut>;
136
137/// Enum controlling if a loop running callbacks should continue or abort.
138///
139/// This is mainly used in the [`sync_with_callback`] method, the return value
140/// of the provided callback controls if the sync loop should be exited.
141///
142/// [`sync_with_callback`]: #method.sync_with_callback
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub enum LoopCtrl {
145    /// Continue running the loop.
146    Continue,
147    /// Break out of the loop.
148    Break,
149}
150
151/// Represents changes that can occur to a `Client`s `Session`.
152#[derive(Debug, Clone, PartialEq)]
153pub enum SessionChange {
154    /// The session's token is no longer valid.
155    UnknownToken {
156        /// Whether or not the session was soft logged out
157        soft_logout: bool,
158    },
159    /// The session's tokens have been refreshed.
160    TokensRefreshed,
161}
162
163/// Information about the server vendor obtained from the federation API.
164#[derive(Debug, Clone, PartialEq, Eq)]
165#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
166pub struct ServerVendorInfo {
167    /// The server name.
168    pub server_name: String,
169    /// The server version.
170    pub version: String,
171}
172
173/// An async/await enabled Matrix client.
174///
175/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
176#[derive(Clone)]
177pub struct Client {
178    pub(crate) inner: Arc<ClientInner>,
179}
180
181#[derive(Default)]
182pub(crate) struct ClientLocks {
183    /// Lock ensuring that only a single room may be marked as a DM at once.
184    /// Look at the [`Account::mark_as_dm()`] method for a more detailed
185    /// explanation.
186    pub(crate) mark_as_dm_lock: Mutex<()>,
187
188    /// Lock ensuring that only a single secret store is getting opened at the
189    /// same time.
190    ///
191    /// This is important so we don't accidentally create multiple different new
192    /// default secret storage keys.
193    #[cfg(feature = "e2e-encryption")]
194    pub(crate) open_secret_store_lock: Mutex<()>,
195
196    /// Lock ensuring that we're only storing a single secret at a time.
197    ///
198    /// Take a look at the [`SecretStore::put_secret`] method for a more
199    /// detailed explanation.
200    ///
201    /// [`SecretStore::put_secret`]: crate::encryption::secret_storage::SecretStore::put_secret
202    #[cfg(feature = "e2e-encryption")]
203    pub(crate) store_secret_lock: Mutex<()>,
204
205    /// Lock ensuring that only one method at a time might modify our backup.
206    #[cfg(feature = "e2e-encryption")]
207    pub(crate) backup_modify_lock: Mutex<()>,
208
209    /// Lock ensuring that we're going to attempt to upload backups for a single
210    /// requester.
211    #[cfg(feature = "e2e-encryption")]
212    pub(crate) backup_upload_lock: Mutex<()>,
213
214    /// Handler making sure we only have one group session sharing request in
215    /// flight per room.
216    #[cfg(feature = "e2e-encryption")]
217    pub(crate) group_session_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
218
219    /// Lock making sure we're only doing one key claim request at a time.
220    #[cfg(feature = "e2e-encryption")]
221    pub(crate) key_claim_lock: Mutex<()>,
222
223    /// Handler to ensure that only one members request is running at a time,
224    /// given a room.
225    pub(crate) members_request_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
226
227    /// Handler to ensure that only one encryption state request is running at a
228    /// time, given a room.
229    pub(crate) encryption_state_deduplicated_handler: DeduplicatingHandler<OwnedRoomId>,
230
231    /// Deduplicating handler for sending read receipts. The string is an
232    /// internal implementation detail, see [`Self::send_single_receipt`].
233    pub(crate) read_receipt_deduplicated_handler: DeduplicatingHandler<(String, OwnedEventId)>,
234
235    #[cfg(feature = "e2e-encryption")]
236    pub(crate) cross_process_crypto_store_lock: OnceCell<CrossProcessLock<LockableCryptoStore>>,
237
238    /// Latest "generation" of data known by the crypto store.
239    ///
240    /// This is a counter that only increments, set in the database (and can
241    /// wrap). It's incremented whenever some process acquires a lock for the
242    /// first time. *This assumes the crypto store lock is being held, to
243    /// avoid data races on writing to this value in the store*.
244    ///
245    /// The current process will maintain this value in local memory and in the
246    /// DB over time. Observing a different value than the one read in
247    /// memory, when reading from the store indicates that somebody else has
248    /// written into the database under our feet.
249    ///
250    /// TODO: this should live in the `OlmMachine`, since it's information
251    /// related to the lock. As of today (2023-07-28), we blow up the entire
252    /// olm machine when there's a generation mismatch. So storing the
253    /// generation in the olm machine would make the client think there's
254    /// *always* a mismatch, and that's why we need to store the generation
255    /// outside the `OlmMachine`.
256    #[cfg(feature = "e2e-encryption")]
257    pub(crate) crypto_store_generation: Arc<Mutex<Option<u64>>>,
258}
259
260pub(crate) struct ClientInner {
261    /// All the data related to authentication and authorization.
262    pub(crate) auth_ctx: Arc<AuthCtx>,
263
264    /// The URL of the server.
265    ///
266    /// Not to be confused with the `Self::homeserver`. `server` is usually
267    /// the server part in a user ID, e.g. with `@mnt_io:matrix.org`, here
268    /// `matrix.org` is the server, whilst `matrix-client.matrix.org` is the
269    /// homeserver (at the time of writing — 2024-08-28).
270    ///
271    /// This value is optional depending on how the `Client` has been built.
272    /// If it's been built from a homeserver URL directly, we don't know the
273    /// server. However, if the `Client` has been built from a server URL or
274    /// name, then the homeserver has been discovered, and we know both.
275    server: Option<Url>,
276
277    /// The URL of the homeserver to connect to.
278    ///
279    /// This is the URL for the client-server Matrix API.
280    homeserver: StdRwLock<Url>,
281
282    /// The sliding sync version.
283    sliding_sync_version: StdRwLock<SlidingSyncVersion>,
284
285    /// The underlying HTTP client.
286    pub(crate) http_client: HttpClient,
287
288    /// User session data.
289    pub(super) base_client: BaseClient,
290
291    /// Collection of in-memory caches for the [`Client`].
292    pub(crate) caches: ClientCaches,
293
294    /// Collection of locks individual client methods might want to use, either
295    /// to ensure that only a single call to a method happens at once or to
296    /// deduplicate multiple calls to a method.
297    pub(crate) locks: ClientLocks,
298
299    /// The cross-process store locks holder name.
300    ///
301    /// The SDK provides cross-process store locks (see
302    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
303    /// `holder_name` is the value used for all cross-process store locks
304    /// used by this `Client`.
305    ///
306    /// If multiple `Client`s are running in different processes, this
307    /// value MUST be different for each `Client`.
308    cross_process_store_locks_holder_name: String,
309
310    /// A mapping of the times at which the current user sent typing notices,
311    /// keyed by room.
312    pub(crate) typing_notice_times: StdRwLock<BTreeMap<OwnedRoomId, Instant>>,
313
314    /// Event handlers. See `add_event_handler`.
315    pub(crate) event_handlers: EventHandlerStore,
316
317    /// Notification handlers. See `register_notification_handler`.
318    notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
319
320    /// The sender-side of channels used to receive room updates.
321    pub(crate) room_update_channels: StdMutex<BTreeMap<OwnedRoomId, broadcast::Sender<RoomUpdate>>>,
322
323    /// The sender-side of a channel used to observe all the room updates of a
324    /// sync response.
325    pub(crate) room_updates_sender: broadcast::Sender<RoomUpdates>,
326
327    /// Whether the client should update its homeserver URL with the discovery
328    /// information present in the login response.
329    respect_login_well_known: bool,
330
331    /// An event that can be listened on to wait for a successful sync. The
332    /// event will only be fired if a sync loop is running. Can be used for
333    /// synchronization, e.g. if we send out a request to create a room, we can
334    /// wait for the sync to get the data to fetch a room object from the state
335    /// store.
336    pub(crate) sync_beat: event_listener::Event,
337
338    /// A central cache for events, inactive first.
339    ///
340    /// It becomes active when [`EventCache::subscribe`] is called.
341    pub(crate) event_cache: OnceCell<EventCache>,
342
343    /// End-to-end encryption related state.
344    #[cfg(feature = "e2e-encryption")]
345    pub(crate) e2ee: EncryptionData,
346
347    /// The verification state of our own device.
348    #[cfg(feature = "e2e-encryption")]
349    pub(crate) verification_state: SharedObservable<VerificationState>,
350
351    /// Whether to enable the experimental support for sending and receiving
352    /// encrypted room history on invite, per [MSC4268].
353    ///
354    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
355    #[cfg(feature = "e2e-encryption")]
356    pub(crate) enable_share_history_on_invite: bool,
357
358    /// Data related to the [`SendQueue`].
359    ///
360    /// [`SendQueue`]: crate::send_queue::SendQueue
361    pub(crate) send_queue_data: Arc<SendQueueData>,
362
363    /// The `max_upload_size` value of the homeserver, it contains the max
364    /// request size you can send.
365    pub(crate) server_max_upload_size: Mutex<OnceCell<UInt>>,
366
367    /// The entry point to get the [`LatestEvent`] of rooms and threads.
368    ///
369    /// [`LatestEvent`]: crate::latest_event::LatestEvent
370    latest_events: OnceCell<LatestEvents>,
371
372    /// Service handling the catching up of thread subscriptions in the
373    /// background.
374    thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
375
376    #[cfg(feature = "experimental-search")]
377    /// Handler for [`RoomIndex`]'s of each room
378    search_index: SearchIndex,
379}
380
381impl ClientInner {
382    /// Create a new `ClientInner`.
383    ///
384    /// All the fields passed as parameters here are those that must be cloned
385    /// upon instantiation of a sub-client, e.g. a client specialized for
386    /// notifications.
387    #[allow(clippy::too_many_arguments)]
388    async fn new(
389        auth_ctx: Arc<AuthCtx>,
390        server: Option<Url>,
391        homeserver: Url,
392        sliding_sync_version: SlidingSyncVersion,
393        http_client: HttpClient,
394        base_client: BaseClient,
395        server_info: ClientServerInfo,
396        respect_login_well_known: bool,
397        event_cache: OnceCell<EventCache>,
398        send_queue: Arc<SendQueueData>,
399        latest_events: OnceCell<LatestEvents>,
400        #[cfg(feature = "e2e-encryption")] encryption_settings: EncryptionSettings,
401        #[cfg(feature = "e2e-encryption")] enable_share_history_on_invite: bool,
402        cross_process_store_locks_holder_name: String,
403        #[cfg(feature = "experimental-search")] search_index_handler: SearchIndex,
404        thread_subscription_catchup: OnceCell<Arc<ThreadSubscriptionCatchup>>,
405    ) -> Arc<Self> {
406        let caches = ClientCaches {
407            server_info: server_info.into(),
408            server_metadata: Mutex::new(TtlCache::new()),
409        };
410
411        let client = Self {
412            server,
413            homeserver: StdRwLock::new(homeserver),
414            auth_ctx,
415            sliding_sync_version: StdRwLock::new(sliding_sync_version),
416            http_client,
417            base_client,
418            caches,
419            locks: Default::default(),
420            cross_process_store_locks_holder_name,
421            typing_notice_times: Default::default(),
422            event_handlers: Default::default(),
423            notification_handlers: Default::default(),
424            room_update_channels: Default::default(),
425            // A single `RoomUpdates` is sent once per sync, so we assume that 32 is sufficient
426            // ballast for all observers to catch up.
427            room_updates_sender: broadcast::Sender::new(32),
428            respect_login_well_known,
429            sync_beat: event_listener::Event::new(),
430            event_cache,
431            send_queue_data: send_queue,
432            latest_events,
433            #[cfg(feature = "e2e-encryption")]
434            e2ee: EncryptionData::new(encryption_settings),
435            #[cfg(feature = "e2e-encryption")]
436            verification_state: SharedObservable::new(VerificationState::Unknown),
437            #[cfg(feature = "e2e-encryption")]
438            enable_share_history_on_invite,
439            server_max_upload_size: Mutex::new(OnceCell::new()),
440            #[cfg(feature = "experimental-search")]
441            search_index: search_index_handler,
442            thread_subscription_catchup,
443        };
444
445        #[allow(clippy::let_and_return)]
446        let client = Arc::new(client);
447
448        #[cfg(feature = "e2e-encryption")]
449        client.e2ee.initialize_tasks(&client);
450
451        let _ = client
452            .event_cache
453            .get_or_init(|| async {
454                EventCache::new(
455                    WeakClient::from_inner(&client),
456                    client.base_client.event_cache_store().clone(),
457                )
458            })
459            .await;
460
461        let _ = client
462            .thread_subscription_catchup
463            .get_or_init(|| async {
464                ThreadSubscriptionCatchup::new(Client { inner: client.clone() })
465            })
466            .await;
467
468        client
469    }
470}
471
472#[cfg(not(tarpaulin_include))]
473impl Debug for Client {
474    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
475        write!(fmt, "Client")
476    }
477}
478
479impl Client {
480    /// Create a new [`Client`] that will use the given homeserver.
481    ///
482    /// # Arguments
483    ///
484    /// * `homeserver_url` - The homeserver that the client should connect to.
485    pub async fn new(homeserver_url: Url) -> Result<Self, ClientBuildError> {
486        Self::builder().homeserver_url(homeserver_url).build().await
487    }
488
489    /// Returns a subscriber that publishes an event every time the ignore user
490    /// list changes.
491    pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
492        self.inner.base_client.subscribe_to_ignore_user_list_changes()
493    }
494
495    /// Create a new [`ClientBuilder`].
496    pub fn builder() -> ClientBuilder {
497        ClientBuilder::new()
498    }
499
500    pub(crate) fn base_client(&self) -> &BaseClient {
501        &self.inner.base_client
502    }
503
504    /// The underlying HTTP client.
505    pub fn http_client(&self) -> &reqwest::Client {
506        &self.inner.http_client.inner
507    }
508
509    pub(crate) fn locks(&self) -> &ClientLocks {
510        &self.inner.locks
511    }
512
513    pub(crate) fn auth_ctx(&self) -> &AuthCtx {
514        &self.inner.auth_ctx
515    }
516
517    /// The cross-process store locks holder name.
518    ///
519    /// The SDK provides cross-process store locks (see
520    /// [`matrix_sdk_common::cross_process_lock::CrossProcessLock`]). The
521    /// `holder_name` is the value used for all cross-process store locks
522    /// used by this `Client`.
523    pub fn cross_process_store_locks_holder_name(&self) -> &str {
524        &self.inner.cross_process_store_locks_holder_name
525    }
526
527    /// Change the homeserver URL used by this client.
528    ///
529    /// # Arguments
530    ///
531    /// * `homeserver_url` - The new URL to use.
532    fn set_homeserver(&self, homeserver_url: Url) {
533        *self.inner.homeserver.write().unwrap() = homeserver_url;
534    }
535
536    /// Change to a different homeserver and re-resolve well-known.
537    #[cfg(feature = "e2e-encryption")]
538    pub(crate) async fn switch_homeserver_and_re_resolve_well_known(
539        &self,
540        homeserver_url: Url,
541    ) -> Result<()> {
542        self.set_homeserver(homeserver_url);
543        self.reset_server_info().await?;
544        if let ServerInfo { well_known: Some(well_known), .. } =
545            self.load_or_fetch_server_info().await?
546        {
547            self.set_homeserver(Url::parse(&well_known.homeserver.base_url)?);
548        }
549        Ok(())
550    }
551
552    /// Get the capabilities of the homeserver.
553    ///
554    /// This method should be used to check what features are supported by the
555    /// homeserver.
556    ///
557    /// # Examples
558    ///
559    /// ```no_run
560    /// # use matrix_sdk::Client;
561    /// # use url::Url;
562    /// # async {
563    /// # let homeserver = Url::parse("http://example.com")?;
564    /// let client = Client::new(homeserver).await?;
565    ///
566    /// let capabilities = client.get_capabilities().await?;
567    ///
568    /// if capabilities.change_password.enabled {
569    ///     // Change password
570    /// }
571    /// # anyhow::Ok(()) };
572    /// ```
573    pub async fn get_capabilities(&self) -> HttpResult<Capabilities> {
574        let res = self.send(get_capabilities::v3::Request::new()).await?;
575        Ok(res.capabilities)
576    }
577
578    /// Get the server vendor information from the federation API.
579    ///
580    /// This method calls the `/_matrix/federation/v1/version` endpoint to get
581    /// both the server's software name and version.
582    ///
583    /// # Examples
584    ///
585    /// ```no_run
586    /// # use matrix_sdk::Client;
587    /// # use url::Url;
588    /// # async {
589    /// # let homeserver = Url::parse("http://example.com")?;
590    /// let client = Client::new(homeserver).await?;
591    ///
592    /// let server_info = client.server_vendor_info(None).await?;
593    /// println!(
594    ///     "Server: {}, Version: {}",
595    ///     server_info.server_name, server_info.version
596    /// );
597    /// # anyhow::Ok(()) };
598    /// ```
599    pub async fn server_vendor_info(
600        &self,
601        request_config: Option<RequestConfig>,
602    ) -> HttpResult<ServerVendorInfo> {
603        let res = self
604            .send_inner(get_server_version::v1::Request::new(), request_config, Default::default())
605            .await?;
606
607        // Extract server info, using defaults if fields are missing.
608        let server = res.server.unwrap_or_default();
609        let server_name_str = server.name.unwrap_or_else(|| "unknown".to_owned());
610        let version = server.version.unwrap_or_else(|| "unknown".to_owned());
611
612        Ok(ServerVendorInfo { server_name: server_name_str, version })
613    }
614
615    /// Get a copy of the default request config.
616    ///
617    /// The default request config is what's used when sending requests if no
618    /// `RequestConfig` is explicitly passed to [`send`][Self::send] or another
619    /// function with such a parameter.
620    ///
621    /// If the default request config was not customized through
622    /// [`ClientBuilder`] when creating this `Client`, the returned value will
623    /// be equivalent to [`RequestConfig::default()`].
624    pub fn request_config(&self) -> RequestConfig {
625        self.inner.http_client.request_config
626    }
627
628    /// Check whether the client has been activated.
629    ///
630    /// A client is considered active when:
631    ///
632    /// 1. It has a `SessionMeta` (user ID, device ID and access token), i.e. it
633    ///    is logged in,
634    /// 2. Has loaded cached data from storage,
635    /// 3. If encryption is enabled, it also initialized or restored its
636    ///    `OlmMachine`.
637    pub fn is_active(&self) -> bool {
638        self.inner.base_client.is_active()
639    }
640
641    /// The server used by the client.
642    ///
643    /// See `Self::server` to learn more.
644    pub fn server(&self) -> Option<&Url> {
645        self.inner.server.as_ref()
646    }
647
648    /// The homeserver of the client.
649    pub fn homeserver(&self) -> Url {
650        self.inner.homeserver.read().unwrap().clone()
651    }
652
653    /// Get the sliding sync version.
654    pub fn sliding_sync_version(&self) -> SlidingSyncVersion {
655        self.inner.sliding_sync_version.read().unwrap().clone()
656    }
657
658    /// Override the sliding sync version.
659    pub fn set_sliding_sync_version(&self, version: SlidingSyncVersion) {
660        let mut lock = self.inner.sliding_sync_version.write().unwrap();
661        *lock = version;
662    }
663
664    /// Get the Matrix user session meta information.
665    ///
666    /// If the client is currently logged in, this will return a
667    /// [`SessionMeta`] object which contains the user ID and device ID.
668    /// Otherwise it returns `None`.
669    pub fn session_meta(&self) -> Option<&SessionMeta> {
670        self.base_client().session_meta()
671    }
672
673    /// Returns a receiver that gets events for each room info update. To watch
674    /// for new events, use `receiver.resubscribe()`.
675    pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
676        self.base_client().room_info_notable_update_receiver()
677    }
678
679    /// Performs a search for users.
680    /// The search is performed case-insensitively on user IDs and display names
681    ///
682    /// # Arguments
683    ///
684    /// * `search_term` - The search term for the search
685    /// * `limit` - The maximum number of results to return. Defaults to 10.
686    ///
687    /// [user directory]: https://spec.matrix.org/v1.6/client-server-api/#user-directory
688    pub async fn search_users(
689        &self,
690        search_term: &str,
691        limit: u64,
692    ) -> HttpResult<search_users::v3::Response> {
693        let mut request = search_users::v3::Request::new(search_term.to_owned());
694
695        if let Some(limit) = UInt::new(limit) {
696            request.limit = limit;
697        }
698
699        self.send(request).await
700    }
701
702    /// Get the user id of the current owner of the client.
703    pub fn user_id(&self) -> Option<&UserId> {
704        self.session_meta().map(|s| s.user_id.as_ref())
705    }
706
707    /// Get the device ID that identifies the current session.
708    pub fn device_id(&self) -> Option<&DeviceId> {
709        self.session_meta().map(|s| s.device_id.as_ref())
710    }
711
712    /// Get the current access token for this session.
713    ///
714    /// Will be `None` if the client has not been logged in.
715    pub fn access_token(&self) -> Option<String> {
716        self.auth_ctx().access_token()
717    }
718
719    /// Get the current tokens for this session.
720    ///
721    /// To be notified of changes in the session tokens, use
722    /// [`Client::subscribe_to_session_changes()`] or
723    /// [`Client::set_session_callbacks()`].
724    ///
725    /// Returns `None` if the client has not been logged in.
726    pub fn session_tokens(&self) -> Option<SessionTokens> {
727        self.auth_ctx().session_tokens()
728    }
729
730    /// Access the authentication API used to log in this client.
731    ///
732    /// Will be `None` if the client has not been logged in.
733    pub fn auth_api(&self) -> Option<AuthApi> {
734        match self.auth_ctx().auth_data.get()? {
735            AuthData::Matrix => Some(AuthApi::Matrix(self.matrix_auth())),
736            AuthData::OAuth(_) => Some(AuthApi::OAuth(self.oauth())),
737        }
738    }
739
740    /// Get the whole session info of this client.
741    ///
742    /// Will be `None` if the client has not been logged in.
743    ///
744    /// Can be used with [`Client::restore_session`] to restore a previously
745    /// logged-in session.
746    pub fn session(&self) -> Option<AuthSession> {
747        match self.auth_api()? {
748            AuthApi::Matrix(api) => api.session().map(Into::into),
749            AuthApi::OAuth(api) => api.full_session().map(Into::into),
750        }
751    }
752
753    /// Get a reference to the state store.
754    pub fn state_store(&self) -> &DynStateStore {
755        self.base_client().state_store()
756    }
757
758    /// Get a reference to the event cache store.
759    pub fn event_cache_store(&self) -> &EventCacheStoreLock {
760        self.base_client().event_cache_store()
761    }
762
763    /// Get a reference to the media store.
764    pub fn media_store(&self) -> &MediaStoreLock {
765        self.base_client().media_store()
766    }
767
768    /// Access the native Matrix authentication API with this client.
769    pub fn matrix_auth(&self) -> MatrixAuth {
770        MatrixAuth::new(self.clone())
771    }
772
773    /// Get the account of the current owner of the client.
774    pub fn account(&self) -> Account {
775        Account::new(self.clone())
776    }
777
778    /// Get the encryption manager of the client.
779    #[cfg(feature = "e2e-encryption")]
780    pub fn encryption(&self) -> Encryption {
781        Encryption::new(self.clone())
782    }
783
784    /// Get the media manager of the client.
785    pub fn media(&self) -> Media {
786        Media::new(self.clone())
787    }
788
789    /// Get the pusher manager of the client.
790    pub fn pusher(&self) -> Pusher {
791        Pusher::new(self.clone())
792    }
793
794    /// Access the OAuth 2.0 API of the client.
795    pub fn oauth(&self) -> OAuth {
796        OAuth::new(self.clone())
797    }
798
799    /// Register a handler for a specific event type.
800    ///
801    /// The handler is a function or closure with one or more arguments. The
802    /// first argument is the event itself. All additional arguments are
803    /// "context" arguments: They have to implement [`EventHandlerContext`].
804    /// This trait is named that way because most of the types implementing it
805    /// give additional context about an event: The room it was in, its raw form
806    /// and other similar things. As two exceptions to this,
807    /// [`Client`] and [`EventHandlerHandle`] also implement the
808    /// `EventHandlerContext` trait so you don't have to clone your client
809    /// into the event handler manually and a handler can decide to remove
810    /// itself.
811    ///
812    /// Some context arguments are not universally applicable. A context
813    /// argument that isn't available for the given event type will result in
814    /// the event handler being skipped and an error being logged. The following
815    /// context argument types are only available for a subset of event types:
816    ///
817    /// * [`Room`] is only available for room-specific events, i.e. not for
818    ///   events like global account data events or presence events.
819    ///
820    /// You can provide custom context via
821    /// [`add_event_handler_context`](Client::add_event_handler_context) and
822    /// then use [`Ctx<T>`](crate::event_handler::Ctx) to extract the context
823    /// into the event handler.
824    ///
825    /// [`EventHandlerContext`]: crate::event_handler::EventHandlerContext
826    ///
827    /// # Examples
828    ///
829    /// ```no_run
830    /// use matrix_sdk::{
831    ///     deserialized_responses::EncryptionInfo,
832    ///     event_handler::Ctx,
833    ///     ruma::{
834    ///         events::{
835    ///             macros::EventContent,
836    ///             push_rules::PushRulesEvent,
837    ///             room::{
838    ///                 message::SyncRoomMessageEvent,
839    ///                 topic::SyncRoomTopicEvent,
840    ///                 member::{StrippedRoomMemberEvent, SyncRoomMemberEvent},
841    ///             },
842    ///         },
843    ///         push::Action,
844    ///         Int, MilliSecondsSinceUnixEpoch,
845    ///     },
846    ///     Client, Room,
847    /// };
848    /// use serde::{Deserialize, Serialize};
849    ///
850    /// # async fn example(client: Client) {
851    /// client.add_event_handler(
852    ///     |ev: SyncRoomMessageEvent, room: Room, client: Client| async move {
853    ///         // Common usage: Room event plus room and client.
854    ///     },
855    /// );
856    /// client.add_event_handler(
857    ///     |ev: SyncRoomMessageEvent, room: Room, encryption_info: Option<EncryptionInfo>| {
858    ///         async move {
859    ///             // An `Option<EncryptionInfo>` parameter lets you distinguish between
860    ///             // unencrypted events and events that were decrypted by the SDK.
861    ///         }
862    ///     },
863    /// );
864    /// client.add_event_handler(
865    ///     |ev: SyncRoomMessageEvent, room: Room, push_actions: Vec<Action>| {
866    ///         async move {
867    ///             // A `Vec<Action>` parameter allows you to know which push actions
868    ///             // are applicable for an event. For example, an event with
869    ///             // `Action::SetTweak(Tweak::Highlight(true))` should be highlighted
870    ///             // in the timeline.
871    ///         }
872    ///     },
873    /// );
874    /// client.add_event_handler(|ev: SyncRoomTopicEvent| async move {
875    ///     // You can omit any or all arguments after the first.
876    /// });
877    ///
878    /// // Registering a temporary event handler:
879    /// let handle = client.add_event_handler(|ev: SyncRoomMessageEvent| async move {
880    ///     /* Event handler */
881    /// });
882    /// client.remove_event_handler(handle);
883    ///
884    /// // Registering custom event handler context:
885    /// #[derive(Debug, Clone)] // The context will be cloned for event handler.
886    /// struct MyContext {
887    ///     number: usize,
888    /// }
889    /// client.add_event_handler_context(MyContext { number: 5 });
890    /// client.add_event_handler(|ev: SyncRoomMessageEvent, context: Ctx<MyContext>| async move {
891    ///     // Use the context
892    /// });
893    ///
894    /// // This will handle membership events in joined rooms. Invites are special, see below.
895    /// client.add_event_handler(
896    ///     |ev: SyncRoomMemberEvent| async move {},
897    /// );
898    ///
899    /// // To handle state events in invited rooms (including invite membership events),
900    /// // `StrippedRoomMemberEvent` should be used.
901    /// // https://spec.matrix.org/v1.16/client-server-api/#stripped-state
902    /// client.add_event_handler(
903    ///     |ev: StrippedRoomMemberEvent| async move {},
904    /// );
905    ///
906    /// // Custom events work exactly the same way, you just need to declare
907    /// // the content struct and use the EventContent derive macro on it.
908    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
909    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
910    /// struct TokenEventContent {
911    ///     token: String,
912    ///     #[serde(rename = "exp")]
913    ///     expires_at: MilliSecondsSinceUnixEpoch,
914    /// }
915    ///
916    /// client.add_event_handler(async |ev: SyncTokenEvent, room: Room| -> () {
917    ///     todo!("Display the token");
918    /// });
919    ///
920    /// // Event handler closures can also capture local variables.
921    /// // Make sure they are cheap to clone though, because they will be cloned
922    /// // every time the closure is called.
923    /// let data: std::sync::Arc<str> = "MyCustomIdentifier".into();
924    ///
925    /// client.add_event_handler(move |ev: SyncRoomMessageEvent | async move {
926    ///     println!("Calling the handler with identifier {data}");
927    /// });
928    /// # }
929    /// ```
930    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
931    where
932        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
933        H: EventHandler<Ev, Ctx>,
934    {
935        self.add_event_handler_impl(handler, None)
936    }
937
938    /// Register a handler for a specific room, and event type.
939    ///
940    /// This method works the same way as
941    /// [`add_event_handler`][Self::add_event_handler], except that the handler
942    /// will only be called for events in the room with the specified ID. See
943    /// that method for more details on event handler functions.
944    ///
945    /// `client.add_room_event_handler(room_id, hdl)` is equivalent to
946    /// `room.add_event_handler(hdl)`. Use whichever one is more convenient in
947    /// your use case.
948    pub fn add_room_event_handler<Ev, Ctx, H>(
949        &self,
950        room_id: &RoomId,
951        handler: H,
952    ) -> EventHandlerHandle
953    where
954        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
955        H: EventHandler<Ev, Ctx>,
956    {
957        self.add_event_handler_impl(handler, Some(room_id.to_owned()))
958    }
959
960    /// Observe a specific event type.
961    ///
962    /// `Ev` represents the kind of event that will be observed. `Ctx`
963    /// represents the context that will come with the event. It relies on the
964    /// same mechanism as [`Client::add_event_handler`]. The main difference is
965    /// that it returns an [`ObservableEventHandler`] and doesn't require a
966    /// user-defined closure. It is possible to subscribe to the
967    /// [`ObservableEventHandler`] to get an [`EventHandlerSubscriber`], which
968    /// implements a [`Stream`]. The `Stream::Item` will be of type `(Ev,
969    /// Ctx)`.
970    ///
971    /// Be careful that only the most recent value can be observed. Subscribers
972    /// are notified when a new value is sent, but there is no guarantee
973    /// that they will see all values.
974    ///
975    /// # Example
976    ///
977    /// Let's see a classical usage:
978    ///
979    /// ```
980    /// use futures_util::StreamExt as _;
981    /// use matrix_sdk::{
982    ///     Client, Room,
983    ///     ruma::{events::room::message::SyncRoomMessageEvent, push::Action},
984    /// };
985    ///
986    /// # async fn example(client: Client) -> Option<()> {
987    /// let observer =
988    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
989    ///
990    /// let mut subscriber = observer.subscribe();
991    ///
992    /// let (event, (room, push_actions)) = subscriber.next().await?;
993    /// # Some(())
994    /// # }
995    /// ```
996    ///
997    /// Now let's see how to get several contexts that can be useful for you:
998    ///
999    /// ```
1000    /// use matrix_sdk::{
1001    ///     Client, Room,
1002    ///     deserialized_responses::EncryptionInfo,
1003    ///     ruma::{
1004    ///         events::room::{
1005    ///             message::SyncRoomMessageEvent, topic::SyncRoomTopicEvent,
1006    ///         },
1007    ///         push::Action,
1008    ///     },
1009    /// };
1010    ///
1011    /// # async fn example(client: Client) {
1012    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `Client`.
1013    /// let _ = client.observe_events::<SyncRoomMessageEvent, (Room, Client)>();
1014    ///
1015    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + `EncryptionInfo`
1016    /// // to distinguish between unencrypted events and events that were decrypted
1017    /// // by the SDK.
1018    /// let _ = client
1019    ///     .observe_events::<SyncRoomMessageEvent, (Room, Option<EncryptionInfo>)>(
1020    ///     );
1021    ///
1022    /// // Observe `SyncRoomMessageEvent` and fetch `Room` + push actions.
1023    /// // For example, an event with `Action::SetTweak(Tweak::Highlight(true))`
1024    /// // should be highlighted in the timeline.
1025    /// let _ =
1026    ///     client.observe_events::<SyncRoomMessageEvent, (Room, Vec<Action>)>();
1027    ///
1028    /// // Observe `SyncRoomTopicEvent` and fetch nothing else.
1029    /// let _ = client.observe_events::<SyncRoomTopicEvent, ()>();
1030    /// # }
1031    /// ```
1032    ///
1033    /// [`EventHandlerSubscriber`]: crate::event_handler::EventHandlerSubscriber
1034    pub fn observe_events<Ev, Ctx>(&self) -> ObservableEventHandler<(Ev, Ctx)>
1035    where
1036        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1037        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1038    {
1039        self.observe_room_events_impl(None)
1040    }
1041
1042    /// Observe a specific room, and event type.
1043    ///
1044    /// This method works the same way as [`Client::observe_events`], except
1045    /// that the observability will only be applied for events in the room with
1046    /// the specified ID. See that method for more details.
1047    ///
1048    /// Be careful that only the most recent value can be observed. Subscribers
1049    /// are notified when a new value is sent, but there is no guarantee
1050    /// that they will see all values.
1051    pub fn observe_room_events<Ev, Ctx>(
1052        &self,
1053        room_id: &RoomId,
1054    ) -> ObservableEventHandler<(Ev, Ctx)>
1055    where
1056        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1057        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1058    {
1059        self.observe_room_events_impl(Some(room_id.to_owned()))
1060    }
1061
1062    /// Shared implementation for `Client::observe_events` and
1063    /// `Client::observe_room_events`.
1064    fn observe_room_events_impl<Ev, Ctx>(
1065        &self,
1066        room_id: Option<OwnedRoomId>,
1067    ) -> ObservableEventHandler<(Ev, Ctx)>
1068    where
1069        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + SyncOutsideWasm + 'static,
1070        Ctx: EventHandlerContext + SendOutsideWasm + SyncOutsideWasm + 'static,
1071    {
1072        // The default value is `None`. It becomes `Some((Ev, Ctx))` once it has a
1073        // new value.
1074        let shared_observable = SharedObservable::new(None);
1075
1076        ObservableEventHandler::new(
1077            shared_observable.clone(),
1078            self.event_handler_drop_guard(self.add_event_handler_impl(
1079                move |event: Ev, context: Ctx| {
1080                    shared_observable.set(Some((event, context)));
1081
1082                    ready(())
1083                },
1084                room_id,
1085            )),
1086        )
1087    }
1088
1089    /// Remove the event handler associated with the handle.
1090    ///
1091    /// Note that you **must not** call `remove_event_handler` from the
1092    /// non-async part of an event handler, that is:
1093    ///
1094    /// ```ignore
1095    /// client.add_event_handler(|ev: SomeEvent, client: Client, handle: EventHandlerHandle| {
1096    ///     // ⚠ this will cause a deadlock ⚠
1097    ///     client.remove_event_handler(handle);
1098    ///
1099    ///     async move {
1100    ///         // removing the event handler here is fine
1101    ///         client.remove_event_handler(handle);
1102    ///     }
1103    /// })
1104    /// ```
1105    ///
1106    /// Note also that handlers that remove themselves will still execute with
1107    /// events received in the same sync cycle.
1108    ///
1109    /// # Arguments
1110    ///
1111    /// `handle` - The [`EventHandlerHandle`] that is returned when
1112    /// registering the event handler with [`Client::add_event_handler`].
1113    ///
1114    /// # Examples
1115    ///
1116    /// ```no_run
1117    /// # use url::Url;
1118    /// # use tokio::sync::mpsc;
1119    /// #
1120    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
1121    /// #
1122    /// use matrix_sdk::{
1123    ///     Client, event_handler::EventHandlerHandle,
1124    ///     ruma::events::room::member::SyncRoomMemberEvent,
1125    /// };
1126    /// #
1127    /// # futures_executor::block_on(async {
1128    /// # let client = matrix_sdk::Client::builder()
1129    /// #     .homeserver_url(homeserver)
1130    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1131    /// #     .build()
1132    /// #     .await
1133    /// #     .unwrap();
1134    ///
1135    /// client.add_event_handler(
1136    ///     |ev: SyncRoomMemberEvent,
1137    ///      client: Client,
1138    ///      handle: EventHandlerHandle| async move {
1139    ///         // Common usage: Check arriving Event is the expected one
1140    ///         println!("Expected RoomMemberEvent received!");
1141    ///         client.remove_event_handler(handle);
1142    ///     },
1143    /// );
1144    /// # });
1145    /// ```
1146    pub fn remove_event_handler(&self, handle: EventHandlerHandle) {
1147        self.inner.event_handlers.remove(handle);
1148    }
1149
1150    /// Create an [`EventHandlerDropGuard`] for the event handler identified by
1151    /// the given handle.
1152    ///
1153    /// When the returned value is dropped, the event handler will be removed.
1154    pub fn event_handler_drop_guard(&self, handle: EventHandlerHandle) -> EventHandlerDropGuard {
1155        EventHandlerDropGuard::new(handle, self.clone())
1156    }
1157
1158    /// Add an arbitrary value for use as event handler context.
1159    ///
1160    /// The value can be obtained in an event handler by adding an argument of
1161    /// the type [`Ctx<T>`][crate::event_handler::Ctx].
1162    ///
1163    /// If a value of the same type has been added before, it will be
1164    /// overwritten.
1165    ///
1166    /// # Examples
1167    ///
1168    /// ```no_run
1169    /// use matrix_sdk::{
1170    ///     Room, event_handler::Ctx,
1171    ///     ruma::events::room::message::SyncRoomMessageEvent,
1172    /// };
1173    /// # #[derive(Clone)]
1174    /// # struct SomeType;
1175    /// # fn obtain_gui_handle() -> SomeType { SomeType }
1176    /// # let homeserver = url::Url::parse("http://localhost:8080").unwrap();
1177    /// # futures_executor::block_on(async {
1178    /// # let client = matrix_sdk::Client::builder()
1179    /// #     .homeserver_url(homeserver)
1180    /// #     .server_versions([ruma::api::MatrixVersion::V1_0])
1181    /// #     .build()
1182    /// #     .await
1183    /// #     .unwrap();
1184    ///
1185    /// // Handle used to send messages to the UI part of the app
1186    /// let my_gui_handle: SomeType = obtain_gui_handle();
1187    ///
1188    /// client.add_event_handler_context(my_gui_handle.clone());
1189    /// client.add_event_handler(
1190    ///     |ev: SyncRoomMessageEvent, room: Room, gui_handle: Ctx<SomeType>| {
1191    ///         async move {
1192    ///             // gui_handle.send(DisplayMessage { message: ev });
1193    ///         }
1194    ///     },
1195    /// );
1196    /// # });
1197    /// ```
1198    pub fn add_event_handler_context<T>(&self, ctx: T)
1199    where
1200        T: Clone + Send + Sync + 'static,
1201    {
1202        self.inner.event_handlers.add_context(ctx);
1203    }
1204
1205    /// Register a handler for a notification.
1206    ///
1207    /// Similar to [`Client::add_event_handler`], but only allows functions
1208    /// or closures with exactly the three arguments [`Notification`], [`Room`],
1209    /// [`Client`] for now.
1210    pub async fn register_notification_handler<H, Fut>(&self, handler: H) -> &Self
1211    where
1212        H: Fn(Notification, Room, Client) -> Fut + SendOutsideWasm + SyncOutsideWasm + 'static,
1213        Fut: Future<Output = ()> + SendOutsideWasm + 'static,
1214    {
1215        self.inner.notification_handlers.write().await.push(Box::new(
1216            move |notification, room, client| Box::pin((handler)(notification, room, client)),
1217        ));
1218
1219        self
1220    }
1221
1222    /// Subscribe to all updates for the room with the given ID.
1223    ///
1224    /// The returned receiver will receive a new message for each sync response
1225    /// that contains updates for that room.
1226    pub fn subscribe_to_room_updates(&self, room_id: &RoomId) -> broadcast::Receiver<RoomUpdate> {
1227        match self.inner.room_update_channels.lock().unwrap().entry(room_id.to_owned()) {
1228            btree_map::Entry::Vacant(entry) => {
1229                let (tx, rx) = broadcast::channel(8);
1230                entry.insert(tx);
1231                rx
1232            }
1233            btree_map::Entry::Occupied(entry) => entry.get().subscribe(),
1234        }
1235    }
1236
1237    /// Subscribe to all updates to all rooms, whenever any has been received in
1238    /// a sync response.
1239    pub fn subscribe_to_all_room_updates(&self) -> broadcast::Receiver<RoomUpdates> {
1240        self.inner.room_updates_sender.subscribe()
1241    }
1242
1243    pub(crate) async fn notification_handlers(
1244        &self,
1245    ) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
1246        self.inner.notification_handlers.read().await
1247    }
1248
1249    /// Get all the rooms the client knows about.
1250    ///
1251    /// This will return the list of joined, invited, and left rooms.
1252    pub fn rooms(&self) -> Vec<Room> {
1253        self.base_client().rooms().into_iter().map(|room| Room::new(self.clone(), room)).collect()
1254    }
1255
1256    /// Get all the rooms the client knows about, filtered by room state.
1257    pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
1258        self.base_client()
1259            .rooms_filtered(filter)
1260            .into_iter()
1261            .map(|room| Room::new(self.clone(), room))
1262            .collect()
1263    }
1264
1265    /// Get a stream of all the rooms, in addition to the existing rooms.
1266    pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
1267        let (rooms, stream) = self.base_client().rooms_stream();
1268
1269        let map_room = |room| Room::new(self.clone(), room);
1270
1271        (
1272            rooms.into_iter().map(map_room).collect(),
1273            stream.map(move |diffs| diffs.into_iter().map(|diff| diff.map(map_room)).collect()),
1274        )
1275    }
1276
1277    /// Returns the joined rooms this client knows about.
1278    pub fn joined_rooms(&self) -> Vec<Room> {
1279        self.rooms_filtered(RoomStateFilter::JOINED)
1280    }
1281
1282    /// Returns the invited rooms this client knows about.
1283    pub fn invited_rooms(&self) -> Vec<Room> {
1284        self.rooms_filtered(RoomStateFilter::INVITED)
1285    }
1286
1287    /// Returns the left rooms this client knows about.
1288    pub fn left_rooms(&self) -> Vec<Room> {
1289        self.rooms_filtered(RoomStateFilter::LEFT)
1290    }
1291
1292    /// Returns the joined space rooms this client knows about.
1293    pub fn joined_space_rooms(&self) -> Vec<Room> {
1294        self.base_client()
1295            .rooms_filtered(RoomStateFilter::JOINED)
1296            .into_iter()
1297            .flat_map(|room| room.is_space().then_some(Room::new(self.clone(), room)))
1298            .collect()
1299    }
1300
1301    /// Get a room with the given room id.
1302    ///
1303    /// # Arguments
1304    ///
1305    /// `room_id` - The unique id of the room that should be fetched.
1306    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1307        self.base_client().get_room(room_id).map(|room| Room::new(self.clone(), room))
1308    }
1309
1310    /// Gets the preview of a room, whether the current user has joined it or
1311    /// not.
1312    pub async fn get_room_preview(
1313        &self,
1314        room_or_alias_id: &RoomOrAliasId,
1315        via: Vec<OwnedServerName>,
1316    ) -> Result<RoomPreview> {
1317        let room_id = match <&RoomId>::try_from(room_or_alias_id) {
1318            Ok(room_id) => room_id.to_owned(),
1319            Err(alias) => self.resolve_room_alias(alias).await?.room_id,
1320        };
1321
1322        if let Some(room) = self.get_room(&room_id) {
1323            // The cached data can only be trusted if the room state is joined or
1324            // banned: for invite and knock rooms, no updates will be received
1325            // for the rooms after the invite/knock action took place so we may
1326            // have very out to date data for important fields such as
1327            // `join_rule`. For left rooms, the homeserver should return the latest info.
1328            match room.state() {
1329                RoomState::Joined | RoomState::Banned => {
1330                    return Ok(RoomPreview::from_known_room(&room).await);
1331                }
1332                RoomState::Left | RoomState::Invited | RoomState::Knocked => {}
1333            }
1334        }
1335
1336        RoomPreview::from_remote_room(self, room_id, room_or_alias_id, via).await
1337    }
1338
1339    /// Resolve a room alias to a room id and a list of servers which know
1340    /// about it.
1341    ///
1342    /// # Arguments
1343    ///
1344    /// `room_alias` - The room alias to be resolved.
1345    pub async fn resolve_room_alias(
1346        &self,
1347        room_alias: &RoomAliasId,
1348    ) -> HttpResult<get_alias::v3::Response> {
1349        let request = get_alias::v3::Request::new(room_alias.to_owned());
1350        self.send(request).await
1351    }
1352
1353    /// Checks if a room alias is not in use yet.
1354    ///
1355    /// Returns:
1356    /// - `Ok(true)` if the room alias is available.
1357    /// - `Ok(false)` if it's not (the resolve alias request returned a `404`
1358    ///   status code).
1359    /// - An `Err` otherwise.
1360    pub async fn is_room_alias_available(&self, alias: &RoomAliasId) -> HttpResult<bool> {
1361        match self.resolve_room_alias(alias).await {
1362            // The room alias was resolved, so it's already in use.
1363            Ok(_) => Ok(false),
1364            Err(error) => {
1365                match error.client_api_error_kind() {
1366                    // The room alias wasn't found, so it's available.
1367                    Some(ErrorKind::NotFound) => Ok(true),
1368                    _ => Err(error),
1369                }
1370            }
1371        }
1372    }
1373
1374    /// Adds a new room alias associated with a room to the room directory.
1375    pub async fn create_room_alias(&self, alias: &RoomAliasId, room_id: &RoomId) -> HttpResult<()> {
1376        let request = create_alias::v3::Request::new(alias.to_owned(), room_id.to_owned());
1377        self.send(request).await?;
1378        Ok(())
1379    }
1380
1381    /// Removes a room alias from the room directory.
1382    pub async fn remove_room_alias(&self, alias: &RoomAliasId) -> HttpResult<()> {
1383        let request = delete_alias::v3::Request::new(alias.to_owned());
1384        self.send(request).await?;
1385        Ok(())
1386    }
1387
1388    /// Update the homeserver from the login response well-known if needed.
1389    ///
1390    /// # Arguments
1391    ///
1392    /// * `login_well_known` - The `well_known` field from a successful login
1393    ///   response.
1394    pub(crate) fn maybe_update_login_well_known(&self, login_well_known: Option<&DiscoveryInfo>) {
1395        if self.inner.respect_login_well_known
1396            && let Some(well_known) = login_well_known
1397            && let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url)
1398        {
1399            self.set_homeserver(homeserver);
1400        }
1401    }
1402
1403    /// Similar to [`Client::restore_session_with`], with
1404    /// [`RoomLoadSettings::default()`].
1405    ///
1406    /// # Panics
1407    ///
1408    /// Panics if a session was already restored or logged in.
1409    #[instrument(skip_all)]
1410    pub async fn restore_session(&self, session: impl Into<AuthSession>) -> Result<()> {
1411        self.restore_session_with(session, RoomLoadSettings::default()).await
1412    }
1413
1414    /// Restore a session previously logged-in using one of the available
1415    /// authentication APIs. The number of rooms to restore is controlled by
1416    /// [`RoomLoadSettings`].
1417    ///
1418    /// See the documentation of the corresponding authentication API's
1419    /// `restore_session` method for more information.
1420    ///
1421    /// # Panics
1422    ///
1423    /// Panics if a session was already restored or logged in.
1424    #[instrument(skip_all)]
1425    pub async fn restore_session_with(
1426        &self,
1427        session: impl Into<AuthSession>,
1428        room_load_settings: RoomLoadSettings,
1429    ) -> Result<()> {
1430        let session = session.into();
1431        match session {
1432            AuthSession::Matrix(session) => {
1433                Box::pin(self.matrix_auth().restore_session(session, room_load_settings)).await
1434            }
1435            AuthSession::OAuth(session) => {
1436                Box::pin(self.oauth().restore_session(*session, room_load_settings)).await
1437            }
1438        }
1439    }
1440
1441    /// Refresh the access token using the authentication API used to log into
1442    /// this session.
1443    ///
1444    /// See the documentation of the authentication API's `refresh_access_token`
1445    /// method for more information.
1446    pub async fn refresh_access_token(&self) -> Result<(), RefreshTokenError> {
1447        let Some(auth_api) = self.auth_api() else {
1448            return Err(RefreshTokenError::RefreshTokenRequired);
1449        };
1450
1451        match auth_api {
1452            AuthApi::Matrix(api) => {
1453                trace!("Token refresh: Using the homeserver.");
1454                Box::pin(api.refresh_access_token()).await?;
1455            }
1456            AuthApi::OAuth(api) => {
1457                trace!("Token refresh: Using OAuth 2.0.");
1458                Box::pin(api.refresh_access_token()).await?;
1459            }
1460        }
1461
1462        Ok(())
1463    }
1464
1465    /// Log out the current session using the proper authentication API.
1466    ///
1467    /// # Errors
1468    ///
1469    /// Returns an error if the session is not authenticated or if an error
1470    /// occurred while making the request to the server.
1471    pub async fn logout(&self) -> Result<(), Error> {
1472        let auth_api = self.auth_api().ok_or(Error::AuthenticationRequired)?;
1473        match auth_api {
1474            AuthApi::Matrix(matrix_auth) => {
1475                matrix_auth.logout().await?;
1476                Ok(())
1477            }
1478            AuthApi::OAuth(oauth) => Ok(oauth.logout().await?),
1479        }
1480    }
1481
1482    /// Get or upload a sync filter.
1483    ///
1484    /// This method will either get a filter ID from the store or upload the
1485    /// filter definition to the homeserver and return the new filter ID.
1486    ///
1487    /// # Arguments
1488    ///
1489    /// * `filter_name` - The unique name of the filter, this name will be used
1490    /// locally to store and identify the filter ID returned by the server.
1491    ///
1492    /// * `definition` - The filter definition that should be uploaded to the
1493    /// server if no filter ID can be found in the store.
1494    ///
1495    /// # Examples
1496    ///
1497    /// ```no_run
1498    /// # use matrix_sdk::{
1499    /// #    Client, config::SyncSettings,
1500    /// #    ruma::api::client::{
1501    /// #        filter::{
1502    /// #           FilterDefinition, LazyLoadOptions, RoomEventFilter, RoomFilter,
1503    /// #        },
1504    /// #        sync::sync_events::v3::Filter,
1505    /// #    }
1506    /// # };
1507    /// # use url::Url;
1508    /// # async {
1509    /// # let homeserver = Url::parse("http://example.com").unwrap();
1510    /// # let client = Client::new(homeserver).await.unwrap();
1511    /// let mut filter = FilterDefinition::default();
1512    ///
1513    /// // Let's enable member lazy loading.
1514    /// filter.room.state.lazy_load_options =
1515    ///     LazyLoadOptions::Enabled { include_redundant_members: false };
1516    ///
1517    /// let filter_id = client
1518    ///     .get_or_upload_filter("sync", filter)
1519    ///     .await
1520    ///     .unwrap();
1521    ///
1522    /// let sync_settings = SyncSettings::new()
1523    ///     .filter(Filter::FilterId(filter_id));
1524    ///
1525    /// let response = client.sync_once(sync_settings).await.unwrap();
1526    /// # };
1527    #[instrument(skip(self, definition))]
1528    pub async fn get_or_upload_filter(
1529        &self,
1530        filter_name: &str,
1531        definition: FilterDefinition,
1532    ) -> Result<String> {
1533        if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
1534            debug!("Found filter locally");
1535            Ok(filter)
1536        } else {
1537            debug!("Didn't find filter locally");
1538            let user_id = self.user_id().ok_or(Error::AuthenticationRequired)?;
1539            let request = FilterUploadRequest::new(user_id.to_owned(), definition);
1540            let response = self.send(request).await?;
1541
1542            self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
1543
1544            Ok(response.filter_id)
1545        }
1546    }
1547
1548    /// Prepare to join a room by ID, by getting the current details about it
1549    async fn prepare_join_room_by_id(&self, room_id: &RoomId) -> Option<PreJoinRoomInfo> {
1550        let room = self.get_room(room_id)?;
1551
1552        let inviter = match room.invite_details().await {
1553            Ok(details) => details.inviter,
1554            Err(Error::WrongRoomState(_)) => None,
1555            Err(e) => {
1556                warn!("Error fetching invite details for room: {e:?}");
1557                None
1558            }
1559        };
1560
1561        Some(PreJoinRoomInfo { inviter })
1562    }
1563
1564    /// Finish joining a room.
1565    ///
1566    /// If the room was an invite that should be marked as a DM, will include it
1567    /// in the DM event after creating the joined room.
1568    ///
1569    /// If encrypted history sharing is enabled, will check to see if we have a
1570    /// key bundle, and import it if so.
1571    ///
1572    /// # Arguments
1573    ///
1574    /// * `room_id` - The `RoomId` of the room that was joined.
1575    /// * `pre_join_room_info` - Information about the room before we joined.
1576    async fn finish_join_room(
1577        &self,
1578        room_id: &RoomId,
1579        pre_join_room_info: Option<PreJoinRoomInfo>,
1580    ) -> Result<Room> {
1581        let mark_as_dm = if let Some(room) = self.get_room(room_id) {
1582            room.state() == RoomState::Invited
1583                && room.is_direct().await.unwrap_or_else(|e| {
1584                    warn!(%room_id, "is_direct() failed: {e}");
1585                    false
1586                })
1587        } else {
1588            false
1589        };
1590
1591        let base_room = self
1592            .base_client()
1593            .room_joined(
1594                room_id,
1595                pre_join_room_info
1596                    .as_ref()
1597                    .and_then(|info| info.inviter.as_ref())
1598                    .map(|i| i.user_id().to_owned()),
1599            )
1600            .await?;
1601        let room = Room::new(self.clone(), base_room);
1602
1603        if mark_as_dm {
1604            room.set_is_direct(true).await?;
1605        }
1606
1607        #[cfg(feature = "e2e-encryption")]
1608        if self.inner.enable_share_history_on_invite
1609            && let Some(inviter) =
1610                pre_join_room_info.as_ref().and_then(|info| info.inviter.as_ref())
1611        {
1612            crate::room::shared_room_history::maybe_accept_key_bundle(&room, inviter.user_id())
1613                .await?;
1614        }
1615
1616        // Suppress "unused variable" and "unused field" lints
1617        #[cfg(not(feature = "e2e-encryption"))]
1618        let _ = pre_join_room_info.map(|i| i.inviter);
1619
1620        Ok(room)
1621    }
1622
1623    /// Join a room by `RoomId`.
1624    ///
1625    /// Returns the `Room` in the joined state.
1626    ///
1627    /// # Arguments
1628    ///
1629    /// * `room_id` - The `RoomId` of the room to be joined.
1630    #[instrument(skip(self))]
1631    pub async fn join_room_by_id(&self, room_id: &RoomId) -> Result<Room> {
1632        // See who invited us to this room, if anyone. Note we have to do this before
1633        // making the `/join` request, otherwise we could race against the sync.
1634        let pre_join_info = self.prepare_join_room_by_id(room_id).await;
1635
1636        let request = join_room_by_id::v3::Request::new(room_id.to_owned());
1637        let response = self.send(request).await?;
1638        self.finish_join_room(&response.room_id, pre_join_info).await
1639    }
1640
1641    /// Join a room by `RoomOrAliasId`.
1642    ///
1643    /// Returns the `Room` in the joined state.
1644    ///
1645    /// # Arguments
1646    ///
1647    /// * `alias` - The `RoomId` or `RoomAliasId` of the room to be joined. An
1648    ///   alias looks like `#name:example.com`.
1649    /// * `server_names` - The server names to be used for resolving the alias,
1650    ///   if needs be.
1651    pub async fn join_room_by_id_or_alias(
1652        &self,
1653        alias: &RoomOrAliasId,
1654        server_names: &[OwnedServerName],
1655    ) -> Result<Room> {
1656        let pre_join_info = {
1657            match alias.try_into() {
1658                Ok(room_id) => self.prepare_join_room_by_id(room_id).await,
1659                Err(_) => {
1660                    // The id is a room alias. We assume (possibly incorrectly?) that we are not
1661                    // responding to an invitation to the room, and therefore don't need to handle
1662                    // things that happen as a result of invites.
1663                    None
1664                }
1665            }
1666        };
1667        let request = assign!(join_room_by_id_or_alias::v3::Request::new(alias.to_owned()), {
1668            via: server_names.to_owned(),
1669        });
1670        let response = self.send(request).await?;
1671        self.finish_join_room(&response.room_id, pre_join_info).await
1672    }
1673
1674    /// Search the homeserver's directory of public rooms.
1675    ///
1676    /// Sends a request to "_matrix/client/r0/publicRooms", returns
1677    /// a `get_public_rooms::Response`.
1678    ///
1679    /// # Arguments
1680    ///
1681    /// * `limit` - The number of `PublicRoomsChunk`s in each response.
1682    ///
1683    /// * `since` - Pagination token from a previous request.
1684    ///
1685    /// * `server` - The name of the server, if `None` the requested server is
1686    ///   used.
1687    ///
1688    /// # Examples
1689    /// ```no_run
1690    /// use matrix_sdk::Client;
1691    /// # use url::Url;
1692    /// # let homeserver = Url::parse("http://example.com").unwrap();
1693    /// # let limit = Some(10);
1694    /// # let since = Some("since token");
1695    /// # let server = Some("servername.com".try_into().unwrap());
1696    /// # async {
1697    /// let mut client = Client::new(homeserver).await.unwrap();
1698    ///
1699    /// client.public_rooms(limit, since, server).await;
1700    /// # };
1701    /// ```
1702    #[cfg_attr(not(target_family = "wasm"), deny(clippy::future_not_send))]
1703    pub async fn public_rooms(
1704        &self,
1705        limit: Option<u32>,
1706        since: Option<&str>,
1707        server: Option<&ServerName>,
1708    ) -> HttpResult<get_public_rooms::v3::Response> {
1709        let limit = limit.map(UInt::from);
1710
1711        let request = assign!(get_public_rooms::v3::Request::new(), {
1712            limit,
1713            since: since.map(ToOwned::to_owned),
1714            server: server.map(ToOwned::to_owned),
1715        });
1716        self.send(request).await
1717    }
1718
1719    /// Create a room with the given parameters.
1720    ///
1721    /// Sends a request to `/_matrix/client/r0/createRoom` and returns the
1722    /// created room.
1723    ///
1724    /// If you want to create a direct message with one specific user, you can
1725    /// use [`create_dm`][Self::create_dm], which is more convenient than
1726    /// assembling the [`create_room::v3::Request`] yourself.
1727    ///
1728    /// If the `is_direct` field of the request is set to `true` and at least
1729    /// one user is invited, the room will be automatically added to the direct
1730    /// rooms in the account data.
1731    ///
1732    /// # Examples
1733    ///
1734    /// ```no_run
1735    /// use matrix_sdk::{
1736    ///     Client,
1737    ///     ruma::api::client::room::create_room::v3::Request as CreateRoomRequest,
1738    /// };
1739    /// # use url::Url;
1740    /// #
1741    /// # async {
1742    /// # let homeserver = Url::parse("http://example.com").unwrap();
1743    /// let request = CreateRoomRequest::new();
1744    /// let client = Client::new(homeserver).await.unwrap();
1745    /// assert!(client.create_room(request).await.is_ok());
1746    /// # };
1747    /// ```
1748    pub async fn create_room(&self, request: create_room::v3::Request) -> Result<Room> {
1749        let invite = request.invite.clone();
1750        let is_direct_room = request.is_direct;
1751        let response = self.send(request).await?;
1752
1753        let base_room = self.base_client().get_or_create_room(&response.room_id, RoomState::Joined);
1754
1755        let joined_room = Room::new(self.clone(), base_room);
1756
1757        if is_direct_room
1758            && !invite.is_empty()
1759            && let Err(error) =
1760                self.account().mark_as_dm(joined_room.room_id(), invite.as_slice()).await
1761        {
1762            // FIXME: Retry in the background
1763            error!("Failed to mark room as DM: {error}");
1764        }
1765
1766        Ok(joined_room)
1767    }
1768
1769    /// Create a DM room.
1770    ///
1771    /// Convenience shorthand for [`create_room`][Self::create_room] with the
1772    /// given user being invited, the room marked `is_direct` and both the
1773    /// creator and invitee getting the default maximum power level.
1774    ///
1775    /// If the `e2e-encryption` feature is enabled, the room will also be
1776    /// encrypted.
1777    ///
1778    /// # Arguments
1779    ///
1780    /// * `user_id` - The ID of the user to create a DM for.
1781    pub async fn create_dm(&self, user_id: &UserId) -> Result<Room> {
1782        #[cfg(feature = "e2e-encryption")]
1783        let initial_state = vec![
1784            InitialStateEvent::with_empty_state_key(
1785                RoomEncryptionEventContent::with_recommended_defaults(),
1786            )
1787            .to_raw_any(),
1788        ];
1789
1790        #[cfg(not(feature = "e2e-encryption"))]
1791        let initial_state = vec![];
1792
1793        let request = assign!(create_room::v3::Request::new(), {
1794            invite: vec![user_id.to_owned()],
1795            is_direct: true,
1796            preset: Some(create_room::v3::RoomPreset::TrustedPrivateChat),
1797            initial_state,
1798        });
1799
1800        self.create_room(request).await
1801    }
1802
1803    /// Search the homeserver's directory for public rooms with a filter.
1804    ///
1805    /// # Arguments
1806    ///
1807    /// * `room_search` - The easiest way to create this request is using the
1808    ///   `get_public_rooms_filtered::Request` itself.
1809    ///
1810    /// # Examples
1811    ///
1812    /// ```no_run
1813    /// # use url::Url;
1814    /// # use matrix_sdk::Client;
1815    /// # async {
1816    /// # let homeserver = Url::parse("http://example.com")?;
1817    /// use matrix_sdk::ruma::{
1818    ///     api::client::directory::get_public_rooms_filtered, directory::Filter,
1819    /// };
1820    /// # let mut client = Client::new(homeserver).await?;
1821    ///
1822    /// let mut filter = Filter::new();
1823    /// filter.generic_search_term = Some("rust".to_owned());
1824    /// let mut request = get_public_rooms_filtered::v3::Request::new();
1825    /// request.filter = filter;
1826    ///
1827    /// let response = client.public_rooms_filtered(request).await?;
1828    ///
1829    /// for room in response.chunk {
1830    ///     println!("Found room {room:?}");
1831    /// }
1832    /// # anyhow::Ok(()) };
1833    /// ```
1834    pub async fn public_rooms_filtered(
1835        &self,
1836        request: get_public_rooms_filtered::v3::Request,
1837    ) -> HttpResult<get_public_rooms_filtered::v3::Response> {
1838        self.send(request).await
1839    }
1840
1841    /// Send an arbitrary request to the server, without updating client state.
1842    ///
1843    /// **Warning:** Because this method *does not* update the client state, it
1844    /// is important to make sure that you account for this yourself, and
1845    /// use wrapper methods where available.  This method should *only* be
1846    /// used if a wrapper method for the endpoint you'd like to use is not
1847    /// available.
1848    ///
1849    /// # Arguments
1850    ///
1851    /// * `request` - A filled out and valid request for the endpoint to be hit
1852    ///
1853    /// * `timeout` - An optional request timeout setting, this overrides the
1854    ///   default request setting if one was set.
1855    ///
1856    /// # Examples
1857    ///
1858    /// ```no_run
1859    /// # use matrix_sdk::{Client, config::SyncSettings};
1860    /// # use url::Url;
1861    /// # async {
1862    /// # let homeserver = Url::parse("http://localhost:8080")?;
1863    /// # let mut client = Client::new(homeserver).await?;
1864    /// use matrix_sdk::ruma::{api::client::profile, user_id};
1865    ///
1866    /// // First construct the request you want to make
1867    /// // See https://docs.rs/ruma-client-api/latest/ruma_client_api/index.html
1868    /// // for all available Endpoints
1869    /// let user_id = user_id!("@example:localhost").to_owned();
1870    /// let request = profile::get_profile::v3::Request::new(user_id);
1871    ///
1872    /// // Start the request using Client::send()
1873    /// let response = client.send(request).await?;
1874    ///
1875    /// // Check the corresponding Response struct to find out what types are
1876    /// // returned
1877    /// # anyhow::Ok(()) };
1878    /// ```
1879    pub fn send<Request>(&self, request: Request) -> SendRequest<Request>
1880    where
1881        Request: OutgoingRequest + Clone + Debug,
1882        Request::Authentication: SupportedAuthScheme,
1883        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1884    {
1885        SendRequest {
1886            client: self.clone(),
1887            request,
1888            config: None,
1889            send_progress: Default::default(),
1890        }
1891    }
1892
1893    pub(crate) async fn send_inner<Request>(
1894        &self,
1895        request: Request,
1896        config: Option<RequestConfig>,
1897        send_progress: SharedObservable<TransmissionProgress>,
1898    ) -> HttpResult<Request::IncomingResponse>
1899    where
1900        Request: OutgoingRequest + Debug,
1901        Request::Authentication: SupportedAuthScheme,
1902        HttpError: From<FromHttpResponseError<Request::EndpointError>>,
1903    {
1904        let homeserver = self.homeserver().to_string();
1905        let access_token = self.access_token();
1906
1907        self.inner
1908            .http_client
1909            .send(
1910                request,
1911                config,
1912                homeserver,
1913                access_token.as_deref(),
1914                &self.supported_versions().await?,
1915                send_progress,
1916            )
1917            .await
1918    }
1919
1920    fn broadcast_unknown_token(&self, soft_logout: &bool) {
1921        _ = self
1922            .inner
1923            .auth_ctx
1924            .session_change_sender
1925            .send(SessionChange::UnknownToken { soft_logout: *soft_logout });
1926    }
1927
1928    /// Fetches server versions from network; no caching.
1929    pub async fn fetch_server_versions(
1930        &self,
1931        request_config: Option<RequestConfig>,
1932    ) -> HttpResult<get_supported_versions::Response> {
1933        let server_versions = self
1934            .inner
1935            .http_client
1936            .send(
1937                get_supported_versions::Request::new(),
1938                request_config,
1939                self.homeserver().to_string(),
1940                None,
1941                &SupportedVersions {
1942                    versions: [MatrixVersion::V1_0].into(),
1943                    features: Default::default(),
1944                },
1945                Default::default(),
1946            )
1947            .await?;
1948
1949        Ok(server_versions)
1950    }
1951
1952    /// Fetches client well_known from network; no caching.
1953    pub async fn fetch_client_well_known(&self) -> Option<discover_homeserver::Response> {
1954        let server_url_string = self
1955            .server()
1956            .unwrap_or(
1957                // Sometimes people configure their well-known directly on the homeserver so use
1958                // this as a fallback when the server name is unknown.
1959                &self.homeserver(),
1960            )
1961            .to_string();
1962
1963        let well_known = self
1964            .inner
1965            .http_client
1966            .send(
1967                discover_homeserver::Request::new(),
1968                Some(RequestConfig::short_retry()),
1969                server_url_string,
1970                None,
1971                &SupportedVersions {
1972                    versions: [MatrixVersion::V1_0].into(),
1973                    features: Default::default(),
1974                },
1975                Default::default(),
1976            )
1977            .await;
1978
1979        match well_known {
1980            Ok(well_known) => Some(well_known),
1981            Err(http_error) => {
1982                // It is perfectly valid to not have a well-known file.
1983                // Maybe we should check for a specific error code to be sure?
1984                warn!("Failed to fetch client well-known: {http_error}");
1985                None
1986            }
1987        }
1988    }
1989
1990    /// Load server info from storage, or fetch them from network and cache
1991    /// them.
1992    async fn load_or_fetch_server_info(&self) -> HttpResult<ServerInfo> {
1993        match self.state_store().get_kv_data(StateStoreDataKey::ServerInfo).await {
1994            Ok(Some(stored)) => {
1995                if let Some(server_info) =
1996                    stored.into_server_info().and_then(|info| info.maybe_decode())
1997                {
1998                    return Ok(server_info);
1999                }
2000            }
2001            Ok(None) => {
2002                // fallthrough: cache is empty
2003            }
2004            Err(err) => {
2005                warn!("error when loading cached server info: {err}");
2006                // fallthrough to network.
2007            }
2008        }
2009
2010        let server_versions = self.fetch_server_versions(None).await?;
2011        let well_known = self.fetch_client_well_known().await;
2012        let server_info = ServerInfo::new(
2013            server_versions.versions.clone(),
2014            server_versions.unstable_features.clone(),
2015            well_known.map(Into::into),
2016        );
2017
2018        // Attempt to cache the result in storage.
2019        {
2020            if let Err(err) = self
2021                .state_store()
2022                .set_kv_data(
2023                    StateStoreDataKey::ServerInfo,
2024                    StateStoreDataValue::ServerInfo(server_info.clone()),
2025                )
2026                .await
2027            {
2028                warn!("error when caching server info: {err}");
2029            }
2030        }
2031
2032        Ok(server_info)
2033    }
2034
2035    async fn get_or_load_and_cache_server_info<
2036        Value,
2037        MapFunction: Fn(&ClientServerInfo) -> CachedValue<Value>,
2038    >(
2039        &self,
2040        map: MapFunction,
2041    ) -> HttpResult<Value> {
2042        let server_info = &self.inner.caches.server_info;
2043        if let CachedValue::Cached(val) = map(&*server_info.read().await) {
2044            return Ok(val);
2045        }
2046
2047        let mut guarded_server_info = server_info.write().await;
2048        if let CachedValue::Cached(val) = map(&guarded_server_info) {
2049            return Ok(val);
2050        }
2051
2052        let server_info = self.load_or_fetch_server_info().await?;
2053
2054        // Fill both unstable features and server versions at once.
2055        let mut supported = server_info.supported_versions();
2056        if supported.versions.is_empty() {
2057            supported.versions = [MatrixVersion::V1_0].into();
2058        }
2059
2060        guarded_server_info.supported_versions = CachedValue::Cached(supported);
2061        guarded_server_info.well_known = CachedValue::Cached(server_info.well_known);
2062
2063        // SAFETY: all fields were set above, so (assuming the caller doesn't attempt to
2064        // fetch an optional property), the function will always return some.
2065        Ok(map(&guarded_server_info).unwrap_cached_value())
2066    }
2067
2068    /// Get the Matrix versions and features supported by the homeserver by
2069    /// fetching them from the server or the cache.
2070    ///
2071    /// This is equivalent to calling both [`Client::server_versions()`] and
2072    /// [`Client::unstable_features()`]. To always fetch the result from the
2073    /// homeserver, you can call [`Client::fetch_server_versions()`] instead,
2074    /// and then `.as_supported_versions()` on the response.
2075    ///
2076    /// # Examples
2077    ///
2078    /// ```no_run
2079    /// use ruma::api::{FeatureFlag, MatrixVersion};
2080    /// # use matrix_sdk::{Client, config::SyncSettings};
2081    /// # use url::Url;
2082    /// # async {
2083    /// # let homeserver = Url::parse("http://localhost:8080")?;
2084    /// # let mut client = Client::new(homeserver).await?;
2085    ///
2086    /// let supported = client.supported_versions().await?;
2087    /// let supports_1_1 = supported.versions.contains(&MatrixVersion::V1_1);
2088    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2089    ///
2090    /// let msc_x_feature = FeatureFlag::from("msc_x");
2091    /// let supports_msc_x = supported.features.contains(&msc_x_feature);
2092    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2093    /// # anyhow::Ok(()) };
2094    /// ```
2095    pub async fn supported_versions(&self) -> HttpResult<SupportedVersions> {
2096        self.get_or_load_and_cache_server_info(|server_info| server_info.supported_versions.clone())
2097            .await
2098    }
2099
2100    /// Get the Matrix versions supported by the homeserver by fetching them
2101    /// from the server or the cache.
2102    ///
2103    /// # Examples
2104    ///
2105    /// ```no_run
2106    /// use ruma::api::MatrixVersion;
2107    /// # use matrix_sdk::{Client, config::SyncSettings};
2108    /// # use url::Url;
2109    /// # async {
2110    /// # let homeserver = Url::parse("http://localhost:8080")?;
2111    /// # let mut client = Client::new(homeserver).await?;
2112    ///
2113    /// let server_versions = client.server_versions().await?;
2114    /// let supports_1_1 = server_versions.contains(&MatrixVersion::V1_1);
2115    /// println!("The homeserver supports Matrix 1.1: {supports_1_1:?}");
2116    /// # anyhow::Ok(()) };
2117    /// ```
2118    pub async fn server_versions(&self) -> HttpResult<BTreeSet<MatrixVersion>> {
2119        self.get_or_load_and_cache_server_info(|server_info| {
2120            server_info.supported_versions.as_ref().map(|supported| supported.versions.clone())
2121        })
2122        .await
2123    }
2124
2125    /// Get the unstable features supported by the homeserver by fetching them
2126    /// from the server or the cache.
2127    ///
2128    /// # Examples
2129    ///
2130    /// ```no_run
2131    /// use matrix_sdk::ruma::api::FeatureFlag;
2132    /// # use matrix_sdk::{Client, config::SyncSettings};
2133    /// # use url::Url;
2134    /// # async {
2135    /// # let homeserver = Url::parse("http://localhost:8080")?;
2136    /// # let mut client = Client::new(homeserver).await?;
2137    ///
2138    /// let msc_x_feature = FeatureFlag::from("msc_x");
2139    /// let unstable_features = client.unstable_features().await?;
2140    /// let supports_msc_x = unstable_features.contains(&msc_x_feature);
2141    /// println!("The homeserver supports msc X: {supports_msc_x:?}");
2142    /// # anyhow::Ok(()) };
2143    /// ```
2144    pub async fn unstable_features(&self) -> HttpResult<BTreeSet<FeatureFlag>> {
2145        self.get_or_load_and_cache_server_info(|server_info| {
2146            server_info.supported_versions.as_ref().map(|supported| supported.features.clone())
2147        })
2148        .await
2149    }
2150
2151    /// Get information about the homeserver's advertised RTC foci by fetching
2152    /// the well-known file from the server or the cache.
2153    ///
2154    /// # Examples
2155    /// ```no_run
2156    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::api::client::discovery::discover_homeserver::RtcFocusInfo};
2157    /// # use url::Url;
2158    /// # async {
2159    /// # let homeserver = Url::parse("http://localhost:8080")?;
2160    /// # let mut client = Client::new(homeserver).await?;
2161    /// let rtc_foci = client.rtc_foci().await?;
2162    /// let default_livekit_focus_info = rtc_foci.iter().find_map(|focus| match focus {
2163    ///     RtcFocusInfo::LiveKit(info) => Some(info),
2164    ///     _ => None,
2165    /// });
2166    /// if let Some(info) = default_livekit_focus_info {
2167    ///     println!("Default LiveKit service URL: {}", info.service_url);
2168    /// }
2169    /// # anyhow::Ok(()) };
2170    /// ```
2171    pub async fn rtc_foci(&self) -> HttpResult<Vec<RtcFocusInfo>> {
2172        let well_known = self
2173            .get_or_load_and_cache_server_info(|server_info| server_info.well_known.clone())
2174            .await?;
2175
2176        Ok(well_known.map(|well_known| well_known.rtc_foci).unwrap_or_default())
2177    }
2178
2179    /// Empty the server version and unstable features cache.
2180    ///
2181    /// Since the SDK caches server info (versions, unstable features,
2182    /// well-known etc), it's possible to have a stale entry in the cache. This
2183    /// functions makes it possible to force reset it.
2184    pub async fn reset_server_info(&self) -> Result<()> {
2185        // Empty the in-memory caches.
2186        let mut guard = self.inner.caches.server_info.write().await;
2187        guard.supported_versions = CachedValue::NotSet;
2188
2189        // Empty the store cache.
2190        Ok(self.state_store().remove_kv_data(StateStoreDataKey::ServerInfo).await?)
2191    }
2192
2193    /// Check whether MSC 4028 is enabled on the homeserver.
2194    ///
2195    /// # Examples
2196    ///
2197    /// ```no_run
2198    /// # use matrix_sdk::{Client, config::SyncSettings};
2199    /// # use url::Url;
2200    /// # async {
2201    /// # let homeserver = Url::parse("http://localhost:8080")?;
2202    /// # let mut client = Client::new(homeserver).await?;
2203    /// let msc4028_enabled =
2204    ///     client.can_homeserver_push_encrypted_event_to_device().await?;
2205    /// # anyhow::Ok(()) };
2206    /// ```
2207    pub async fn can_homeserver_push_encrypted_event_to_device(&self) -> HttpResult<bool> {
2208        Ok(self.unstable_features().await?.contains(&FeatureFlag::from("org.matrix.msc4028")))
2209    }
2210
2211    /// Get information of all our own devices.
2212    ///
2213    /// # Examples
2214    ///
2215    /// ```no_run
2216    /// # use matrix_sdk::{Client, config::SyncSettings};
2217    /// # use url::Url;
2218    /// # async {
2219    /// # let homeserver = Url::parse("http://localhost:8080")?;
2220    /// # let mut client = Client::new(homeserver).await?;
2221    /// let response = client.devices().await?;
2222    ///
2223    /// for device in response.devices {
2224    ///     println!(
2225    ///         "Device: {} {}",
2226    ///         device.device_id,
2227    ///         device.display_name.as_deref().unwrap_or("")
2228    ///     );
2229    /// }
2230    /// # anyhow::Ok(()) };
2231    /// ```
2232    pub async fn devices(&self) -> HttpResult<get_devices::v3::Response> {
2233        let request = get_devices::v3::Request::new();
2234
2235        self.send(request).await
2236    }
2237
2238    /// Delete the given devices from the server.
2239    ///
2240    /// # Arguments
2241    ///
2242    /// * `devices` - The list of devices that should be deleted from the
2243    ///   server.
2244    ///
2245    /// * `auth_data` - This request requires user interactive auth, the first
2246    ///   request needs to set this to `None` and will always fail with an
2247    ///   `UiaaResponse`. The response will contain information for the
2248    ///   interactive auth and the same request needs to be made but this time
2249    ///   with some `auth_data` provided.
2250    ///
2251    /// ```no_run
2252    /// # use matrix_sdk::{
2253    /// #    ruma::{api::client::uiaa, device_id},
2254    /// #    Client, Error, config::SyncSettings,
2255    /// # };
2256    /// # use serde_json::json;
2257    /// # use url::Url;
2258    /// # use std::collections::BTreeMap;
2259    /// # async {
2260    /// # let homeserver = Url::parse("http://localhost:8080")?;
2261    /// # let mut client = Client::new(homeserver).await?;
2262    /// let devices = &[device_id!("DEVICEID").to_owned()];
2263    ///
2264    /// if let Err(e) = client.delete_devices(devices, None).await {
2265    ///     if let Some(info) = e.as_uiaa_response() {
2266    ///         let mut password = uiaa::Password::new(
2267    ///             uiaa::UserIdentifier::UserIdOrLocalpart("example".to_owned()),
2268    ///             "wordpass".to_owned(),
2269    ///         );
2270    ///         password.session = info.session.clone();
2271    ///
2272    ///         client
2273    ///             .delete_devices(devices, Some(uiaa::AuthData::Password(password)))
2274    ///             .await?;
2275    ///     }
2276    /// }
2277    /// # anyhow::Ok(()) };
2278    pub async fn delete_devices(
2279        &self,
2280        devices: &[OwnedDeviceId],
2281        auth_data: Option<uiaa::AuthData>,
2282    ) -> HttpResult<delete_devices::v3::Response> {
2283        let mut request = delete_devices::v3::Request::new(devices.to_owned());
2284        request.auth = auth_data;
2285
2286        self.send(request).await
2287    }
2288
2289    /// Change the display name of a device owned by the current user.
2290    ///
2291    /// Returns a `update_device::Response` which specifies the result
2292    /// of the operation.
2293    ///
2294    /// # Arguments
2295    ///
2296    /// * `device_id` - The ID of the device to change the display name of.
2297    /// * `display_name` - The new display name to set.
2298    pub async fn rename_device(
2299        &self,
2300        device_id: &DeviceId,
2301        display_name: &str,
2302    ) -> HttpResult<update_device::v3::Response> {
2303        let mut request = update_device::v3::Request::new(device_id.to_owned());
2304        request.display_name = Some(display_name.to_owned());
2305
2306        self.send(request).await
2307    }
2308
2309    /// Synchronize the client's state with the latest state on the server.
2310    ///
2311    /// ## Syncing Events
2312    ///
2313    /// Messages or any other type of event need to be periodically fetched from
2314    /// the server, this is achieved by sending a `/sync` request to the server.
2315    ///
2316    /// The first sync is sent out without a [`token`]. The response of the
2317    /// first sync will contain a [`next_batch`] field which should then be
2318    /// used in the subsequent sync calls as the [`token`]. This ensures that we
2319    /// don't receive the same events multiple times.
2320    ///
2321    /// ## Long Polling
2322    ///
2323    /// A sync should in the usual case always be in flight. The
2324    /// [`SyncSettings`] have a  [`timeout`] option, which controls how
2325    /// long the server will wait for new events before it will respond.
2326    /// The server will respond immediately if some new events arrive before the
2327    /// timeout has expired. If no changes arrive and the timeout expires an
2328    /// empty sync response will be sent to the client.
2329    ///
2330    /// This method of sending a request that may not receive a response
2331    /// immediately is called long polling.
2332    ///
2333    /// ## Filtering Events
2334    ///
2335    /// The number or type of messages and events that the client should receive
2336    /// from the server can be altered using a [`Filter`].
2337    ///
2338    /// Filters can be non-trivial and, since they will be sent with every sync
2339    /// request, they may take up a bunch of unnecessary bandwidth.
2340    ///
2341    /// Luckily filters can be uploaded to the server and reused using an unique
2342    /// identifier, this can be achieved using the [`get_or_upload_filter()`]
2343    /// method.
2344    ///
2345    /// # Arguments
2346    ///
2347    /// * `sync_settings` - Settings for the sync call, this allows us to set
2348    /// various options to configure the sync:
2349    ///     * [`filter`] - To configure which events we receive and which get
2350    ///       [filtered] by the server
2351    ///     * [`timeout`] - To configure our [long polling] setup.
2352    ///     * [`token`] - To tell the server which events we already received
2353    ///       and where we wish to continue syncing.
2354    ///     * [`full_state`] - To tell the server that we wish to receive all
2355    ///       state events, regardless of our configured [`token`].
2356    ///     * [`set_presence`] - To tell the server to set the presence and to
2357    ///       which state.
2358    ///
2359    /// # Examples
2360    ///
2361    /// ```no_run
2362    /// # use url::Url;
2363    /// # async {
2364    /// # let homeserver = Url::parse("http://localhost:8080")?;
2365    /// # let username = "";
2366    /// # let password = "";
2367    /// use matrix_sdk::{
2368    ///     Client, config::SyncSettings,
2369    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2370    /// };
2371    ///
2372    /// let client = Client::new(homeserver).await?;
2373    /// client.matrix_auth().login_username(username, password).send().await?;
2374    ///
2375    /// // Sync once so we receive the client state and old messages.
2376    /// client.sync_once(SyncSettings::default()).await?;
2377    ///
2378    /// // Register our handler so we start responding once we receive a new
2379    /// // event.
2380    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2381    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2382    /// });
2383    ///
2384    /// // Now keep on syncing forever. `sync()` will use the stored sync token
2385    /// // from our `sync_once()` call automatically.
2386    /// client.sync(SyncSettings::default()).await;
2387    /// # anyhow::Ok(()) };
2388    /// ```
2389    ///
2390    /// [`sync`]: #method.sync
2391    /// [`SyncSettings`]: crate::config::SyncSettings
2392    /// [`token`]: crate::config::SyncSettings#method.token
2393    /// [`timeout`]: crate::config::SyncSettings#method.timeout
2394    /// [`full_state`]: crate::config::SyncSettings#method.full_state
2395    /// [`set_presence`]: ruma::presence::PresenceState
2396    /// [`filter`]: crate::config::SyncSettings#method.filter
2397    /// [`Filter`]: ruma::api::client::sync::sync_events::v3::Filter
2398    /// [`next_batch`]: SyncResponse#structfield.next_batch
2399    /// [`get_or_upload_filter()`]: #method.get_or_upload_filter
2400    /// [long polling]: #long-polling
2401    /// [filtered]: #filtering-events
2402    #[instrument(skip(self))]
2403    pub async fn sync_once(
2404        &self,
2405        sync_settings: crate::config::SyncSettings,
2406    ) -> Result<SyncResponse> {
2407        // The sync might not return for quite a while due to the timeout.
2408        // We'll see if there's anything crypto related to send out before we
2409        // sync, i.e. if we closed our client after a sync but before the
2410        // crypto requests were sent out.
2411        //
2412        // This will mostly be a no-op.
2413        #[cfg(feature = "e2e-encryption")]
2414        if let Err(e) = self.send_outgoing_requests().await {
2415            error!(error = ?e, "Error while sending outgoing E2EE requests");
2416        }
2417
2418        let token = match sync_settings.token {
2419            SyncToken::Specific(token) => Some(token),
2420            SyncToken::NoToken => None,
2421            SyncToken::ReusePrevious => self.sync_token().await,
2422        };
2423
2424        let request = assign!(sync_events::v3::Request::new(), {
2425            filter: sync_settings.filter.map(|f| *f),
2426            since: token,
2427            full_state: sync_settings.full_state,
2428            set_presence: sync_settings.set_presence,
2429            timeout: sync_settings.timeout,
2430            use_state_after: true,
2431        });
2432        let mut request_config = self.request_config();
2433        if let Some(timeout) = sync_settings.timeout {
2434            let base_timeout = request_config.timeout.unwrap_or(Duration::from_secs(30));
2435            request_config.timeout = Some(base_timeout + timeout);
2436        }
2437
2438        let response = self.send(request).with_request_config(request_config).await?;
2439        let next_batch = response.next_batch.clone();
2440        let response = self.process_sync(response).await?;
2441
2442        #[cfg(feature = "e2e-encryption")]
2443        if let Err(e) = self.send_outgoing_requests().await {
2444            error!(error = ?e, "Error while sending outgoing E2EE requests");
2445        }
2446
2447        self.inner.sync_beat.notify(usize::MAX);
2448
2449        Ok(SyncResponse::new(next_batch, response))
2450    }
2451
2452    /// Repeatedly synchronize the client state with the server.
2453    ///
2454    /// This method will only return on error, if cancellation is needed
2455    /// the method should be wrapped in a cancelable task or the
2456    /// [`Client::sync_with_callback`] method can be used or
2457    /// [`Client::sync_with_result_callback`] if you want to handle error
2458    /// cases in the loop, too.
2459    ///
2460    /// This method will internally call [`Client::sync_once`] in a loop.
2461    ///
2462    /// This method can be used with the [`Client::add_event_handler`]
2463    /// method to react to individual events. If you instead wish to handle
2464    /// events in a bulk manner the [`Client::sync_with_callback`],
2465    /// [`Client::sync_with_result_callback`] and
2466    /// [`Client::sync_stream`] methods can be used instead. Those methods
2467    /// repeatedly return the whole sync response.
2468    ///
2469    /// # Arguments
2470    ///
2471    /// * `sync_settings` - Settings for the sync call. *Note* that those
2472    ///   settings will be only used for the first sync call. See the argument
2473    ///   docs for [`Client::sync_once`] for more info.
2474    ///
2475    /// # Return
2476    /// The sync runs until an error occurs, returning with `Err(Error)`. It is
2477    /// up to the user of the API to check the error and decide whether the sync
2478    /// should continue or not.
2479    ///
2480    /// # Examples
2481    ///
2482    /// ```no_run
2483    /// # use url::Url;
2484    /// # async {
2485    /// # let homeserver = Url::parse("http://localhost:8080")?;
2486    /// # let username = "";
2487    /// # let password = "";
2488    /// use matrix_sdk::{
2489    ///     Client, config::SyncSettings,
2490    ///     ruma::events::room::message::OriginalSyncRoomMessageEvent,
2491    /// };
2492    ///
2493    /// let client = Client::new(homeserver).await?;
2494    /// client.matrix_auth().login_username(&username, &password).send().await?;
2495    ///
2496    /// // Register our handler so we start responding once we receive a new
2497    /// // event.
2498    /// client.add_event_handler(|ev: OriginalSyncRoomMessageEvent| async move {
2499    ///     println!("Received event {}: {:?}", ev.sender, ev.content);
2500    /// });
2501    ///
2502    /// // Now keep on syncing forever. `sync()` will use the latest sync token
2503    /// // automatically.
2504    /// client.sync(SyncSettings::default()).await?;
2505    /// # anyhow::Ok(()) };
2506    /// ```
2507    ///
2508    /// [argument docs]: #method.sync_once
2509    /// [`sync_with_callback`]: #method.sync_with_callback
2510    pub async fn sync(&self, sync_settings: crate::config::SyncSettings) -> Result<(), Error> {
2511        self.sync_with_callback(sync_settings, |_| async { LoopCtrl::Continue }).await
2512    }
2513
2514    /// Repeatedly call sync to synchronize the client state with the server.
2515    ///
2516    /// # Arguments
2517    ///
2518    /// * `sync_settings` - Settings for the sync call. *Note* that those
2519    ///   settings will be only used for the first sync call. See the argument
2520    ///   docs for [`Client::sync_once`] for more info.
2521    ///
2522    /// * `callback` - A callback that will be called every time a successful
2523    ///   response has been fetched from the server. The callback must return a
2524    ///   boolean which signalizes if the method should stop syncing. If the
2525    ///   callback returns `LoopCtrl::Continue` the sync will continue, if the
2526    ///   callback returns `LoopCtrl::Break` the sync will be stopped.
2527    ///
2528    /// # Return
2529    /// The sync runs until an error occurs or the
2530    /// callback indicates that the Loop should stop. If the callback asked for
2531    /// a regular stop, the result will be `Ok(())` otherwise the
2532    /// `Err(Error)` is returned.
2533    ///
2534    /// # Examples
2535    ///
2536    /// The following example demonstrates how to sync forever while sending all
2537    /// the interesting events through a mpsc channel to another thread e.g. a
2538    /// UI thread.
2539    ///
2540    /// ```no_run
2541    /// # use std::time::Duration;
2542    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2543    /// # use url::Url;
2544    /// # async {
2545    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2546    /// # let mut client = Client::new(homeserver).await.unwrap();
2547    ///
2548    /// use tokio::sync::mpsc::channel;
2549    ///
2550    /// let (tx, rx) = channel(100);
2551    ///
2552    /// let sync_channel = &tx;
2553    /// let sync_settings = SyncSettings::new()
2554    ///     .timeout(Duration::from_secs(30));
2555    ///
2556    /// client
2557    ///     .sync_with_callback(sync_settings, |response| async move {
2558    ///         let channel = sync_channel;
2559    ///         for (room_id, room) in response.rooms.joined {
2560    ///             for event in room.timeline.events {
2561    ///                 channel.send(event).await.unwrap();
2562    ///             }
2563    ///         }
2564    ///
2565    ///         LoopCtrl::Continue
2566    ///     })
2567    ///     .await;
2568    /// };
2569    /// ```
2570    #[instrument(skip_all)]
2571    pub async fn sync_with_callback<C>(
2572        &self,
2573        sync_settings: crate::config::SyncSettings,
2574        callback: impl Fn(SyncResponse) -> C,
2575    ) -> Result<(), Error>
2576    where
2577        C: Future<Output = LoopCtrl>,
2578    {
2579        self.sync_with_result_callback(sync_settings, |result| async {
2580            Ok(callback(result?).await)
2581        })
2582        .await
2583    }
2584
2585    /// Repeatedly call sync to synchronize the client state with the server.
2586    ///
2587    /// # Arguments
2588    ///
2589    /// * `sync_settings` - Settings for the sync call. *Note* that those
2590    ///   settings will be only used for the first sync call. See the argument
2591    ///   docs for [`Client::sync_once`] for more info.
2592    ///
2593    /// * `callback` - A callback that will be called every time after a
2594    ///   response has been received, failure or not. The callback returns a
2595    ///   `Result<LoopCtrl, Error>`, too. When returning
2596    ///   `Ok(LoopCtrl::Continue)` the sync will continue, if the callback
2597    ///   returns `Ok(LoopCtrl::Break)` the sync will be stopped and the
2598    ///   function returns `Ok(())`. In case the callback can't handle the
2599    ///   `Error` or has a different malfunction, it can return an `Err(Error)`,
2600    ///   which results in the sync ending and the `Err(Error)` being returned.
2601    ///
2602    /// # Return
2603    /// The sync runs until an error occurs that the callback can't handle or
2604    /// the callback indicates that the Loop should stop. If the callback
2605    /// asked for a regular stop, the result will be `Ok(())` otherwise the
2606    /// `Err(Error)` is returned.
2607    ///
2608    /// _Note_: Lower-level configuration (e.g. for retries) are not changed by
2609    /// this, and are handled first without sending the result to the
2610    /// callback. Only after they have exceeded is the `Result` handed to
2611    /// the callback.
2612    ///
2613    /// # Examples
2614    ///
2615    /// The following example demonstrates how to sync forever while sending all
2616    /// the interesting events through a mpsc channel to another thread e.g. a
2617    /// UI thread.
2618    ///
2619    /// ```no_run
2620    /// # use std::time::Duration;
2621    /// # use matrix_sdk::{Client, config::SyncSettings, LoopCtrl};
2622    /// # use url::Url;
2623    /// # async {
2624    /// # let homeserver = Url::parse("http://localhost:8080").unwrap();
2625    /// # let mut client = Client::new(homeserver).await.unwrap();
2626    /// #
2627    /// use tokio::sync::mpsc::channel;
2628    ///
2629    /// let (tx, rx) = channel(100);
2630    ///
2631    /// let sync_channel = &tx;
2632    /// let sync_settings = SyncSettings::new()
2633    ///     .timeout(Duration::from_secs(30));
2634    ///
2635    /// client
2636    ///     .sync_with_result_callback(sync_settings, |response| async move {
2637    ///         let channel = sync_channel;
2638    ///         let sync_response = response?;
2639    ///         for (room_id, room) in sync_response.rooms.joined {
2640    ///              for event in room.timeline.events {
2641    ///                  channel.send(event).await.unwrap();
2642    ///               }
2643    ///         }
2644    ///
2645    ///         Ok(LoopCtrl::Continue)
2646    ///     })
2647    ///     .await;
2648    /// };
2649    /// ```
2650    #[instrument(skip(self, callback))]
2651    pub async fn sync_with_result_callback<C>(
2652        &self,
2653        sync_settings: crate::config::SyncSettings,
2654        callback: impl Fn(Result<SyncResponse, Error>) -> C,
2655    ) -> Result<(), Error>
2656    where
2657        C: Future<Output = Result<LoopCtrl, Error>>,
2658    {
2659        let mut sync_stream = Box::pin(self.sync_stream(sync_settings).await);
2660
2661        while let Some(result) = sync_stream.next().await {
2662            trace!("Running callback");
2663            if callback(result).await? == LoopCtrl::Break {
2664                trace!("Callback told us to stop");
2665                break;
2666            }
2667            trace!("Done running callback");
2668        }
2669
2670        Ok(())
2671    }
2672
2673    //// Repeatedly synchronize the client state with the server.
2674    ///
2675    /// This method will internally call [`Client::sync_once`] in a loop and is
2676    /// equivalent to the [`Client::sync`] method but the responses are provided
2677    /// as an async stream.
2678    ///
2679    /// # Arguments
2680    ///
2681    /// * `sync_settings` - Settings for the sync call. *Note* that those
2682    ///   settings will be only used for the first sync call. See the argument
2683    ///   docs for [`Client::sync_once`] for more info.
2684    ///
2685    /// # Examples
2686    ///
2687    /// ```no_run
2688    /// # use url::Url;
2689    /// # async {
2690    /// # let homeserver = Url::parse("http://localhost:8080")?;
2691    /// # let username = "";
2692    /// # let password = "";
2693    /// use futures_util::StreamExt;
2694    /// use matrix_sdk::{Client, config::SyncSettings};
2695    ///
2696    /// let client = Client::new(homeserver).await?;
2697    /// client.matrix_auth().login_username(&username, &password).send().await?;
2698    ///
2699    /// let mut sync_stream =
2700    ///     Box::pin(client.sync_stream(SyncSettings::default()).await);
2701    ///
2702    /// while let Some(Ok(response)) = sync_stream.next().await {
2703    ///     for room in response.rooms.joined.values() {
2704    ///         for e in &room.timeline.events {
2705    ///             if let Ok(event) = e.raw().deserialize() {
2706    ///                 println!("Received event {:?}", event);
2707    ///             }
2708    ///         }
2709    ///     }
2710    /// }
2711    ///
2712    /// # anyhow::Ok(()) };
2713    /// ```
2714    #[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
2715    #[instrument(skip(self))]
2716    pub async fn sync_stream(
2717        &self,
2718        mut sync_settings: crate::config::SyncSettings,
2719    ) -> impl Stream<Item = Result<SyncResponse>> + '_ {
2720        let mut is_first_sync = true;
2721        let mut timeout = None;
2722        let mut last_sync_time: Option<Instant> = None;
2723
2724        let parent_span = Span::current();
2725
2726        async_stream::stream!({
2727            loop {
2728                trace!("Syncing");
2729
2730                if sync_settings.ignore_timeout_on_first_sync {
2731                    if is_first_sync {
2732                        timeout = sync_settings.timeout.take();
2733                    } else if sync_settings.timeout.is_none() && timeout.is_some() {
2734                        sync_settings.timeout = timeout.take();
2735                    }
2736
2737                    is_first_sync = false;
2738                }
2739
2740                yield self
2741                    .sync_loop_helper(&mut sync_settings)
2742                    .instrument(parent_span.clone())
2743                    .await;
2744
2745                Client::delay_sync(&mut last_sync_time).await
2746            }
2747        })
2748    }
2749
2750    /// Get the current, if any, sync token of the client.
2751    /// This will be None if the client didn't sync at least once.
2752    pub(crate) async fn sync_token(&self) -> Option<String> {
2753        self.inner.base_client.sync_token().await
2754    }
2755
2756    /// Gets information about the owner of a given access token.
2757    pub async fn whoami(&self) -> HttpResult<whoami::v3::Response> {
2758        let request = whoami::v3::Request::new();
2759        self.send(request).await
2760    }
2761
2762    /// Subscribes a new receiver to client SessionChange broadcasts.
2763    pub fn subscribe_to_session_changes(&self) -> broadcast::Receiver<SessionChange> {
2764        let broadcast = &self.auth_ctx().session_change_sender;
2765        broadcast.subscribe()
2766    }
2767
2768    /// Sets the save/restore session callbacks.
2769    ///
2770    /// This is another mechanism to get synchronous updates to session tokens,
2771    /// while [`Self::subscribe_to_session_changes`] provides an async update.
2772    pub fn set_session_callbacks(
2773        &self,
2774        reload_session_callback: Box<ReloadSessionCallback>,
2775        save_session_callback: Box<SaveSessionCallback>,
2776    ) -> Result<()> {
2777        self.inner
2778            .auth_ctx
2779            .reload_session_callback
2780            .set(reload_session_callback)
2781            .map_err(|_| Error::MultipleSessionCallbacks)?;
2782
2783        self.inner
2784            .auth_ctx
2785            .save_session_callback
2786            .set(save_session_callback)
2787            .map_err(|_| Error::MultipleSessionCallbacks)?;
2788
2789        Ok(())
2790    }
2791
2792    /// Get the notification settings of the current owner of the client.
2793    pub async fn notification_settings(&self) -> NotificationSettings {
2794        let ruleset = self.account().push_rules().await.unwrap_or_else(|_| Ruleset::new());
2795        NotificationSettings::new(self.clone(), ruleset)
2796    }
2797
2798    /// Create a new specialized `Client` that can process notifications.
2799    ///
2800    /// See [`CrossProcessLock::new`] to learn more about
2801    /// `cross_process_store_locks_holder_name`.
2802    ///
2803    /// [`CrossProcessLock::new`]: matrix_sdk_common::cross_process_lock::CrossProcessLock::new
2804    pub async fn notification_client(
2805        &self,
2806        cross_process_store_locks_holder_name: String,
2807    ) -> Result<Client> {
2808        let client = Client {
2809            inner: ClientInner::new(
2810                self.inner.auth_ctx.clone(),
2811                self.server().cloned(),
2812                self.homeserver(),
2813                self.sliding_sync_version(),
2814                self.inner.http_client.clone(),
2815                self.inner
2816                    .base_client
2817                    .clone_with_in_memory_state_store(&cross_process_store_locks_holder_name, false)
2818                    .await?,
2819                self.inner.caches.server_info.read().await.clone(),
2820                self.inner.respect_login_well_known,
2821                self.inner.event_cache.clone(),
2822                self.inner.send_queue_data.clone(),
2823                self.inner.latest_events.clone(),
2824                #[cfg(feature = "e2e-encryption")]
2825                self.inner.e2ee.encryption_settings,
2826                #[cfg(feature = "e2e-encryption")]
2827                self.inner.enable_share_history_on_invite,
2828                cross_process_store_locks_holder_name,
2829                #[cfg(feature = "experimental-search")]
2830                self.inner.search_index.clone(),
2831                self.inner.thread_subscription_catchup.clone(),
2832            )
2833            .await,
2834        };
2835
2836        Ok(client)
2837    }
2838
2839    /// The [`EventCache`] instance for this [`Client`].
2840    pub fn event_cache(&self) -> &EventCache {
2841        // SAFETY: always initialized in the `Client` ctor.
2842        self.inner.event_cache.get().unwrap()
2843    }
2844
2845    /// The [`LatestEvents`] instance for this [`Client`].
2846    pub async fn latest_events(&self) -> &LatestEvents {
2847        self.inner
2848            .latest_events
2849            .get_or_init(|| async {
2850                LatestEvents::new(
2851                    WeakClient::from_client(self),
2852                    self.event_cache().clone(),
2853                    SendQueue::new(self.clone()),
2854                )
2855            })
2856            .await
2857    }
2858
2859    /// Waits until an at least partially synced room is received, and returns
2860    /// it.
2861    ///
2862    /// **Note: this function will loop endlessly until either it finds the room
2863    /// or an externally set timeout happens.**
2864    pub async fn await_room_remote_echo(&self, room_id: &RoomId) -> Room {
2865        loop {
2866            if let Some(room) = self.get_room(room_id) {
2867                if room.is_state_partially_or_fully_synced() {
2868                    debug!("Found just created room!");
2869                    return room;
2870                }
2871                debug!("Room wasn't partially synced, waiting for sync beat to try again");
2872            } else {
2873                debug!("Room wasn't found, waiting for sync beat to try again");
2874            }
2875            self.inner.sync_beat.listen().await;
2876        }
2877    }
2878
2879    /// Knock on a room given its `room_id_or_alias` to ask for permission to
2880    /// join it.
2881    pub async fn knock(
2882        &self,
2883        room_id_or_alias: OwnedRoomOrAliasId,
2884        reason: Option<String>,
2885        server_names: Vec<OwnedServerName>,
2886    ) -> Result<Room> {
2887        let request =
2888            assign!(knock_room::v3::Request::new(room_id_or_alias), { reason, via: server_names });
2889        let response = self.send(request).await?;
2890        let base_room = self.inner.base_client.room_knocked(&response.room_id).await?;
2891        Ok(Room::new(self.clone(), base_room))
2892    }
2893
2894    /// Checks whether the provided `user_id` belongs to an ignored user.
2895    pub async fn is_user_ignored(&self, user_id: &UserId) -> bool {
2896        self.base_client().is_user_ignored(user_id).await
2897    }
2898
2899    /// Gets the `max_upload_size` value from the homeserver, getting either a
2900    /// cached value or with a `/_matrix/client/v1/media/config` request if it's
2901    /// missing.
2902    ///
2903    /// Check the spec for more info:
2904    /// <https://spec.matrix.org/v1.14/client-server-api/#get_matrixclientv1mediaconfig>
2905    pub async fn load_or_fetch_max_upload_size(&self) -> Result<UInt> {
2906        let max_upload_size_lock = self.inner.server_max_upload_size.lock().await;
2907        if let Some(data) = max_upload_size_lock.get() {
2908            return Ok(data.to_owned());
2909        }
2910
2911        // Use the authenticated endpoint when the server supports it.
2912        let supported_versions = self.supported_versions().await?;
2913        let use_auth =
2914            authenticated_media::get_media_config::v1::Request::is_supported(&supported_versions);
2915
2916        let upload_size = if use_auth {
2917            self.send(authenticated_media::get_media_config::v1::Request::default())
2918                .await?
2919                .upload_size
2920        } else {
2921            #[allow(deprecated)]
2922            self.send(media::get_media_config::v3::Request::default()).await?.upload_size
2923        };
2924
2925        match max_upload_size_lock.set(upload_size) {
2926            Ok(_) => Ok(upload_size),
2927            Err(error) => {
2928                Err(Error::Media(MediaError::FetchMaxUploadSizeFailed(error.to_string())))
2929            }
2930        }
2931    }
2932
2933    /// The settings to use for decrypting events.
2934    #[cfg(feature = "e2e-encryption")]
2935    pub fn decryption_settings(&self) -> &DecryptionSettings {
2936        &self.base_client().decryption_settings
2937    }
2938
2939    /// Returns the [`SearchIndex`] for this [`Client`].
2940    #[cfg(feature = "experimental-search")]
2941    pub fn search_index(&self) -> &SearchIndex {
2942        &self.inner.search_index
2943    }
2944
2945    /// Whether the client is configured to take thread subscriptions (MSC4306
2946    /// and MSC4308) into account.
2947    ///
2948    /// This may cause filtering out of thread subscriptions, and loading the
2949    /// thread subscriptions via the sliding sync extension, when the room
2950    /// list service is being used.
2951    pub fn enabled_thread_subscriptions(&self) -> bool {
2952        match self.base_client().threading_support {
2953            ThreadingSupport::Enabled { with_subscriptions } => with_subscriptions,
2954            ThreadingSupport::Disabled => false,
2955        }
2956    }
2957
2958    /// Fetch thread subscriptions changes between `from` and up to `to`.
2959    ///
2960    /// The `limit` optional parameter can be used to limit the number of
2961    /// entries in a response. It can also be overridden by the server, if
2962    /// it's deemed too large.
2963    pub async fn fetch_thread_subscriptions(
2964        &self,
2965        from: Option<String>,
2966        to: Option<String>,
2967        limit: Option<UInt>,
2968    ) -> Result<get_thread_subscriptions_changes::unstable::Response> {
2969        let request = assign!(get_thread_subscriptions_changes::unstable::Request::new(), {
2970            from,
2971            to,
2972            limit,
2973        });
2974        Ok(self.send(request).await?)
2975    }
2976
2977    pub(crate) fn thread_subscription_catchup(&self) -> &ThreadSubscriptionCatchup {
2978        self.inner.thread_subscription_catchup.get().unwrap()
2979    }
2980}
2981
2982#[cfg(any(feature = "testing", test))]
2983impl Client {
2984    /// Test helper to mark users as tracked by the crypto layer.
2985    #[cfg(feature = "e2e-encryption")]
2986    pub async fn update_tracked_users_for_testing(
2987        &self,
2988        user_ids: impl IntoIterator<Item = &UserId>,
2989    ) {
2990        let olm = self.olm_machine().await;
2991        let olm = olm.as_ref().unwrap();
2992        olm.update_tracked_users(user_ids).await.unwrap();
2993    }
2994}
2995
2996/// A weak reference to the inner client, useful when trying to get a handle
2997/// on the owning client.
2998#[derive(Clone, Debug)]
2999pub(crate) struct WeakClient {
3000    client: Weak<ClientInner>,
3001}
3002
3003impl WeakClient {
3004    /// Construct a [`WeakClient`] from a `Arc<ClientInner>`.
3005    pub fn from_inner(client: &Arc<ClientInner>) -> Self {
3006        Self { client: Arc::downgrade(client) }
3007    }
3008
3009    /// Construct a [`WeakClient`] from a [`Client`].
3010    pub fn from_client(client: &Client) -> Self {
3011        Self::from_inner(&client.inner)
3012    }
3013
3014    /// Attempts to get a [`Client`] from this [`WeakClient`].
3015    pub fn get(&self) -> Option<Client> {
3016        self.client.upgrade().map(|inner| Client { inner })
3017    }
3018
3019    /// Gets the number of strong (`Arc`) pointers still pointing to this
3020    /// client.
3021    #[allow(dead_code)]
3022    pub fn strong_count(&self) -> usize {
3023        self.client.strong_count()
3024    }
3025}
3026
3027#[derive(Clone)]
3028struct ClientServerInfo {
3029    /// The Matrix versions and unstable features the server supports (known
3030    /// ones only).
3031    supported_versions: CachedValue<SupportedVersions>,
3032
3033    /// The server's well-known file, if any.
3034    well_known: CachedValue<Option<WellKnownResponse>>,
3035}
3036
3037/// A cached value that can either be set or not set, used to avoid confusion
3038/// between a value that is set to `None` (because it doesn't exist) and a value
3039/// that has not been cached yet.
3040#[derive(Clone)]
3041enum CachedValue<Value> {
3042    /// A value has been cached.
3043    Cached(Value),
3044    /// Nothing has been cached yet.
3045    NotSet,
3046}
3047
3048impl<Value> CachedValue<Value> {
3049    /// Unwraps the cached value, returning it if it exists.
3050    ///
3051    /// # Panics
3052    ///
3053    /// If the cached value is not set, this will panic.
3054    fn unwrap_cached_value(self) -> Value {
3055        match self {
3056            CachedValue::Cached(value) => value,
3057            CachedValue::NotSet => panic!("Tried to unwrap a cached value that wasn't set"),
3058        }
3059    }
3060
3061    /// Converts from `&CachedValue<Value>` to `CachedValue<&Value>`.
3062    fn as_ref(&self) -> CachedValue<&Value> {
3063        match self {
3064            Self::Cached(value) => CachedValue::Cached(value),
3065            Self::NotSet => CachedValue::NotSet,
3066        }
3067    }
3068
3069    /// Maps a `CachedValue<Value>` to `CachedValue<Other>` by applying a
3070    /// function to a contained value (if `Cached`) or returns `NotSet` (if
3071    /// `NotSet`).
3072    fn map<Other, F>(self, f: F) -> CachedValue<Other>
3073    where
3074        F: FnOnce(Value) -> Other,
3075    {
3076        match self {
3077            Self::Cached(value) => CachedValue::Cached(f(value)),
3078            Self::NotSet => CachedValue::NotSet,
3079        }
3080    }
3081}
3082
3083/// Information about the state of a room before we joined it.
3084#[derive(Debug, Clone, Default)]
3085struct PreJoinRoomInfo {
3086    /// The user who invited us to the room, if any.
3087    pub inviter: Option<RoomMember>,
3088}
3089
3090// The http mocking library is not supported for wasm32
3091#[cfg(all(test, not(target_family = "wasm")))]
3092pub(crate) mod tests {
3093    use std::{sync::Arc, time::Duration};
3094
3095    use assert_matches::assert_matches;
3096    use assert_matches2::assert_let;
3097    use eyeball::SharedObservable;
3098    use futures_util::{FutureExt, StreamExt, pin_mut};
3099    use js_int::{UInt, uint};
3100    use matrix_sdk_base::{
3101        RoomState,
3102        store::{MemoryStore, StoreConfig},
3103    };
3104    use matrix_sdk_test::{
3105        DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
3106        event_factory::EventFactory,
3107    };
3108    #[cfg(target_family = "wasm")]
3109    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
3110
3111    use ruma::{
3112        RoomId, ServerName, UserId,
3113        api::{
3114            FeatureFlag, MatrixVersion,
3115            client::{
3116                discovery::discover_homeserver::RtcFocusInfo,
3117                room::create_room::v3::Request as CreateRoomRequest,
3118            },
3119        },
3120        assign,
3121        events::{
3122            ignored_user_list::IgnoredUserListEventContent,
3123            media_preview_config::{InviteAvatars, MediaPreviewConfigEventContent, MediaPreviews},
3124        },
3125        owned_room_id, owned_user_id, room_alias_id, room_id,
3126    };
3127    use serde_json::json;
3128    use stream_assert::{assert_next_matches, assert_pending};
3129    use tokio::{
3130        spawn,
3131        time::{sleep, timeout},
3132    };
3133    use url::Url;
3134
3135    use super::Client;
3136    use crate::{
3137        Error, TransmissionProgress,
3138        client::{WeakClient, futures::SendMediaUploadRequest},
3139        config::{RequestConfig, SyncSettings},
3140        futures::SendRequest,
3141        media::MediaError,
3142        test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
3143    };
3144
3145    #[async_test]
3146    async fn test_account_data() {
3147        let server = MatrixMockServer::new().await;
3148        let client = server.client_builder().build().await;
3149
3150        let f = EventFactory::new();
3151        server
3152            .mock_sync()
3153            .ok_and_run(&client, |builder| {
3154                builder.add_global_account_data(
3155                    f.ignored_user_list([owned_user_id!("@someone:example.org")]),
3156                );
3157            })
3158            .await;
3159
3160        let content = client
3161            .account()
3162            .account_data::<IgnoredUserListEventContent>()
3163            .await
3164            .unwrap()
3165            .unwrap()
3166            .deserialize()
3167            .unwrap();
3168
3169        assert_eq!(content.ignored_users.len(), 1);
3170    }
3171
3172    #[async_test]
3173    async fn test_successful_discovery() {
3174        // Imagine this is `matrix.org`.
3175        let server = MatrixMockServer::new().await;
3176        let server_url = server.uri();
3177
3178        // Imagine this is `matrix-client.matrix.org`.
3179        let homeserver = MatrixMockServer::new().await;
3180        let homeserver_url = homeserver.uri();
3181
3182        // Imagine Alice has the user ID `@alice:matrix.org`.
3183        let domain = server_url.strip_prefix("http://").unwrap();
3184        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3185
3186        // The `.well-known` is on the server (e.g. `matrix.org`).
3187        server
3188            .mock_well_known()
3189            .ok_with_homeserver_url(&homeserver_url)
3190            .mock_once()
3191            .named("well-known")
3192            .mount()
3193            .await;
3194
3195        // The `/versions` is on the homeserver (e.g. `matrix-client.matrix.org`).
3196        homeserver.mock_versions().ok().mock_once().named("versions").mount().await;
3197
3198        let client = Client::builder()
3199            .insecure_server_name_no_tls(alice.server_name())
3200            .build()
3201            .await
3202            .unwrap();
3203
3204        assert_eq!(client.server().unwrap(), &Url::parse(&server_url).unwrap());
3205        assert_eq!(client.homeserver(), Url::parse(&homeserver_url).unwrap());
3206        client.server_versions().await.unwrap();
3207    }
3208
3209    #[async_test]
3210    async fn test_discovery_broken_server() {
3211        let server = MatrixMockServer::new().await;
3212        let server_url = server.uri();
3213        let domain = server_url.strip_prefix("http://").unwrap();
3214        let alice = UserId::parse("@alice:".to_owned() + domain).unwrap();
3215
3216        server.mock_well_known().error404().mock_once().named("well-known").mount().await;
3217
3218        assert!(
3219            Client::builder()
3220                .insecure_server_name_no_tls(alice.server_name())
3221                .build()
3222                .await
3223                .is_err(),
3224            "Creating a client from a user ID should fail when the .well-known request fails."
3225        );
3226    }
3227
3228    #[async_test]
3229    async fn test_room_creation() {
3230        let server = MatrixMockServer::new().await;
3231        let client = server.client_builder().build().await;
3232
3233        server
3234            .mock_sync()
3235            .ok_and_run(&client, |builder| {
3236                builder.add_joined_room(
3237                    JoinedRoomBuilder::default()
3238                        .add_state_event(StateTestEvent::Member)
3239                        .add_state_event(StateTestEvent::PowerLevels),
3240                );
3241            })
3242            .await;
3243
3244        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
3245        assert_eq!(room.state(), RoomState::Joined);
3246    }
3247
3248    #[async_test]
3249    async fn test_retry_limit_http_requests() {
3250        let server = MatrixMockServer::new().await;
3251        let client = server
3252            .client_builder()
3253            .on_builder(|builder| builder.request_config(RequestConfig::new().retry_limit(3)))
3254            .build()
3255            .await;
3256
3257        assert!(client.request_config().retry_limit.unwrap() == 3);
3258
3259        server.mock_login().error500().expect(3).mount().await;
3260
3261        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3262    }
3263
3264    #[async_test]
3265    async fn test_retry_timeout_http_requests() {
3266        // Keep this timeout small so that the test doesn't take long
3267        let retry_timeout = Duration::from_secs(5);
3268        let server = MatrixMockServer::new().await;
3269        let client = server
3270            .client_builder()
3271            .on_builder(|builder| {
3272                builder.request_config(RequestConfig::new().max_retry_time(retry_timeout))
3273            })
3274            .build()
3275            .await;
3276
3277        assert!(client.request_config().max_retry_time.unwrap() == retry_timeout);
3278
3279        server.mock_login().error500().expect(2..).mount().await;
3280
3281        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3282    }
3283
3284    #[async_test]
3285    async fn test_short_retry_initial_http_requests() {
3286        let server = MatrixMockServer::new().await;
3287        let client = server
3288            .client_builder()
3289            .on_builder(|builder| builder.request_config(RequestConfig::short_retry()))
3290            .build()
3291            .await;
3292
3293        server.mock_login().error500().expect(3..).mount().await;
3294
3295        client.matrix_auth().login_username("example", "wordpass").send().await.unwrap_err();
3296    }
3297
3298    #[async_test]
3299    async fn test_no_retry_http_requests() {
3300        let server = MatrixMockServer::new().await;
3301        let client = server.client_builder().build().await;
3302
3303        server.mock_devices().error500().mock_once().mount().await;
3304
3305        client.devices().await.unwrap_err();
3306    }
3307
3308    #[async_test]
3309    async fn test_set_homeserver() {
3310        let client = MockClientBuilder::new(None).build().await;
3311        assert_eq!(client.homeserver().as_ref(), "http://localhost/");
3312
3313        let homeserver = Url::parse("http://example.com/").unwrap();
3314        client.set_homeserver(homeserver.clone());
3315        assert_eq!(client.homeserver(), homeserver);
3316    }
3317
3318    #[async_test]
3319    async fn test_search_user_request() {
3320        let server = MatrixMockServer::new().await;
3321        let client = server.client_builder().build().await;
3322
3323        server.mock_user_directory().ok().mock_once().mount().await;
3324
3325        let response = client.search_users("test", 50).await.unwrap();
3326        assert_eq!(response.results.len(), 1);
3327        let result = &response.results[0];
3328        assert_eq!(result.user_id.to_string(), "@test:example.me");
3329        assert_eq!(result.display_name.clone().unwrap(), "Test");
3330        assert_eq!(result.avatar_url.clone().unwrap().to_string(), "mxc://example.me/someid");
3331        assert!(!response.limited);
3332    }
3333
3334    #[async_test]
3335    async fn test_request_unstable_features() {
3336        let server = MatrixMockServer::new().await;
3337        let client = server.client_builder().no_server_versions().build().await;
3338
3339        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3340
3341        let unstable_features = client.unstable_features().await.unwrap();
3342        assert!(unstable_features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3343        assert!(!unstable_features.contains(&FeatureFlag::from("you.shall.pass")));
3344    }
3345
3346    #[async_test]
3347    async fn test_can_homeserver_push_encrypted_event_to_device() {
3348        let server = MatrixMockServer::new().await;
3349        let client = server.client_builder().no_server_versions().build().await;
3350
3351        server.mock_versions().ok_with_unstable_features().mock_once().mount().await;
3352
3353        let msc4028_enabled = client.can_homeserver_push_encrypted_event_to_device().await.unwrap();
3354        assert!(msc4028_enabled);
3355    }
3356
3357    #[async_test]
3358    async fn test_recently_visited_rooms() {
3359        // Tracking recently visited rooms requires authentication
3360        let client = MockClientBuilder::new(None).unlogged().build().await;
3361        assert_matches!(
3362            client.account().track_recently_visited_room(owned_room_id!("!alpha:localhost")).await,
3363            Err(Error::AuthenticationRequired)
3364        );
3365
3366        let client = MockClientBuilder::new(None).build().await;
3367        let account = client.account();
3368
3369        // We should start off with an empty list
3370        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 0);
3371
3372        // Tracking a valid room id should add it to the list
3373        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3374        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3375        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3376
3377        // And the existing list shouldn't be changed
3378        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3379        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3380
3381        // Tracking the same room again shouldn't change the list
3382        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3383        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 1);
3384        assert_eq!(account.get_recently_visited_rooms().await.unwrap(), ["!alpha:localhost"]);
3385
3386        // Tracking a second room should add it to the front of the list
3387        account.track_recently_visited_room(owned_room_id!("!beta:localhost")).await.unwrap();
3388        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3389        assert_eq!(
3390            account.get_recently_visited_rooms().await.unwrap(),
3391            [room_id!("!beta:localhost"), room_id!("!alpha:localhost")]
3392        );
3393
3394        // Tracking the first room yet again should move it to the front of the list
3395        account.track_recently_visited_room(owned_room_id!("!alpha:localhost")).await.unwrap();
3396        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 2);
3397        assert_eq!(
3398            account.get_recently_visited_rooms().await.unwrap(),
3399            [room_id!("!alpha:localhost"), room_id!("!beta:localhost")]
3400        );
3401
3402        // Tracking should be capped at 20
3403        for n in 0..20 {
3404            account
3405                .track_recently_visited_room(RoomId::parse(format!("!{n}:localhost")).unwrap())
3406                .await
3407                .unwrap();
3408        }
3409
3410        assert_eq!(account.get_recently_visited_rooms().await.unwrap().len(), 20);
3411
3412        // And the initial rooms should've been pushed out
3413        let rooms = account.get_recently_visited_rooms().await.unwrap();
3414        assert!(!rooms.contains(&owned_room_id!("!alpha:localhost")));
3415        assert!(!rooms.contains(&owned_room_id!("!beta:localhost")));
3416
3417        // And the last tracked room should be the first
3418        assert_eq!(rooms.first().unwrap(), room_id!("!19:localhost"));
3419    }
3420
3421    #[async_test]
3422    async fn test_client_no_cycle_with_event_cache() {
3423        let client = MockClientBuilder::new(None).build().await;
3424
3425        // Wait for the init tasks to die.
3426        sleep(Duration::from_secs(1)).await;
3427
3428        let weak_client = WeakClient::from_client(&client);
3429        assert_eq!(weak_client.strong_count(), 1);
3430
3431        {
3432            let room_id = room_id!("!room:example.org");
3433
3434            // Have the client know the room.
3435            let response = SyncResponseBuilder::default()
3436                .add_joined_room(JoinedRoomBuilder::new(room_id))
3437                .build_sync_response();
3438            client.inner.base_client.receive_sync_response(response).await.unwrap();
3439
3440            client.event_cache().subscribe().unwrap();
3441
3442            let (_room_event_cache, _drop_handles) =
3443                client.get_room(room_id).unwrap().event_cache().await.unwrap();
3444        }
3445
3446        drop(client);
3447
3448        // Give a bit of time for background tasks to die.
3449        sleep(Duration::from_secs(1)).await;
3450
3451        // The weak client must be the last reference to the client now.
3452        assert_eq!(weak_client.strong_count(), 0);
3453        let client = weak_client.get();
3454        assert!(
3455            client.is_none(),
3456            "too many strong references to the client: {}",
3457            Arc::strong_count(&client.unwrap().inner)
3458        );
3459    }
3460
3461    #[async_test]
3462    async fn test_server_info_caching() {
3463        let server = MatrixMockServer::new().await;
3464        let server_url = server.uri();
3465        let domain = server_url.strip_prefix("http://").unwrap();
3466        let server_name = <&ServerName>::try_from(domain).unwrap();
3467        let rtc_foci = vec![RtcFocusInfo::livekit("https://livekit.example.com".to_owned())];
3468
3469        let well_known_mock = server
3470            .mock_well_known()
3471            .ok()
3472            .named("well known mock")
3473            .expect(2) // One for ClientBuilder discovery, one for the ServerInfo cache.
3474            .mount_as_scoped()
3475            .await;
3476
3477        let versions_mock = server
3478            .mock_versions()
3479            .ok_with_unstable_features()
3480            .named("first versions mock")
3481            .expect(1)
3482            .mount_as_scoped()
3483            .await;
3484
3485        let memory_store = Arc::new(MemoryStore::new());
3486        let client = Client::builder()
3487            .insecure_server_name_no_tls(server_name)
3488            .store_config(
3489                StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3490                    .state_store(memory_store.clone()),
3491            )
3492            .build()
3493            .await
3494            .unwrap();
3495
3496        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3497
3498        // These subsequent calls hit the in-memory cache.
3499        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3500        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3501
3502        drop(client);
3503
3504        let client = server
3505            .client_builder()
3506            .no_server_versions()
3507            .on_builder(|builder| {
3508                builder.store_config(
3509                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3510                        .state_store(memory_store.clone()),
3511                )
3512            })
3513            .build()
3514            .await;
3515
3516        // These calls to the new client hit the on-disk cache.
3517        assert!(
3518            client
3519                .unstable_features()
3520                .await
3521                .unwrap()
3522                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3523        );
3524        let supported = client.supported_versions().await.unwrap();
3525        assert!(supported.versions.contains(&MatrixVersion::V1_0));
3526        assert!(supported.features.contains(&FeatureFlag::from("org.matrix.e2e_cross_signing")));
3527
3528        // Then this call hits the in-memory cache.
3529        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3530
3531        drop(versions_mock);
3532        drop(well_known_mock);
3533
3534        // Now, reset the cache, and observe the endpoints being called again once.
3535        client.reset_server_info().await.unwrap();
3536
3537        server.mock_well_known().ok().named("second well known mock").expect(1).mount().await;
3538
3539        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3540
3541        // Hits network again.
3542        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3543        // Hits in-memory cache again.
3544        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3545        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3546    }
3547
3548    #[async_test]
3549    async fn test_server_info_without_a_well_known() {
3550        let server = MatrixMockServer::new().await;
3551        let rtc_foci: Vec<RtcFocusInfo> = vec![];
3552
3553        let versions_mock = server
3554            .mock_versions()
3555            .ok_with_unstable_features()
3556            .named("first versions mock")
3557            .expect(1)
3558            .mount_as_scoped()
3559            .await;
3560
3561        let memory_store = Arc::new(MemoryStore::new());
3562        let client = server
3563            .client_builder()
3564            .no_server_versions()
3565            .on_builder(|builder| {
3566                builder.store_config(
3567                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3568                        .state_store(memory_store.clone()),
3569                )
3570            })
3571            .build()
3572            .await;
3573
3574        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3575
3576        // These subsequent calls hit the in-memory cache.
3577        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3578        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3579
3580        drop(client);
3581
3582        let client = server
3583            .client_builder()
3584            .no_server_versions()
3585            .on_builder(|builder| {
3586                builder.store_config(
3587                    StoreConfig::new("cross-process-store-locks-holder-name".to_owned())
3588                        .state_store(memory_store.clone()),
3589                )
3590            })
3591            .build()
3592            .await;
3593
3594        // This call to the new client hits the on-disk cache.
3595        assert!(
3596            client
3597                .unstable_features()
3598                .await
3599                .unwrap()
3600                .contains(&FeatureFlag::from("org.matrix.e2e_cross_signing"))
3601        );
3602
3603        // Then this call hits the in-memory cache.
3604        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3605
3606        drop(versions_mock);
3607
3608        // Now, reset the cache, and observe the endpoints being called again once.
3609        client.reset_server_info().await.unwrap();
3610
3611        server.mock_versions().ok().expect(1).named("second versions mock").mount().await;
3612
3613        // Hits network again.
3614        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3615        // Hits in-memory cache again.
3616        assert!(client.server_versions().await.unwrap().contains(&MatrixVersion::V1_0));
3617        assert_eq!(client.rtc_foci().await.unwrap(), rtc_foci);
3618    }
3619
3620    #[async_test]
3621    async fn test_no_network_doesnt_cause_infinite_retries() {
3622        // We want infinite retries for transient errors.
3623        let client = MockClientBuilder::new(None)
3624            .on_builder(|builder| builder.request_config(RequestConfig::new()))
3625            .build()
3626            .await;
3627
3628        // We don't define a mock server on purpose here, so that the error is really a
3629        // network error.
3630        client.whoami().await.unwrap_err();
3631    }
3632
3633    #[async_test]
3634    async fn test_await_room_remote_echo_returns_the_room_if_it_was_already_synced() {
3635        let server = MatrixMockServer::new().await;
3636        let client = server.client_builder().build().await;
3637
3638        let room_id = room_id!("!room:example.org");
3639
3640        server
3641            .mock_sync()
3642            .ok_and_run(&client, |builder| {
3643                builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3644            })
3645            .await;
3646
3647        let room = client.await_room_remote_echo(room_id).now_or_never().unwrap();
3648        assert_eq!(room.room_id(), room_id);
3649    }
3650
3651    #[async_test]
3652    async fn test_await_room_remote_echo_returns_the_room_when_it_is_ready() {
3653        let server = MatrixMockServer::new().await;
3654        let client = server.client_builder().build().await;
3655
3656        let room_id = room_id!("!room:example.org");
3657
3658        let client = Arc::new(client);
3659
3660        // Perform the /sync request with a delay so it starts after the
3661        // `await_room_remote_echo` call has happened
3662        spawn({
3663            let client = client.clone();
3664            async move {
3665                sleep(Duration::from_millis(100)).await;
3666
3667                server
3668                    .mock_sync()
3669                    .ok_and_run(&client, |builder| {
3670                        builder.add_joined_room(JoinedRoomBuilder::new(room_id));
3671                    })
3672                    .await;
3673            }
3674        });
3675
3676        let room =
3677            timeout(Duration::from_secs(10), client.await_room_remote_echo(room_id)).await.unwrap();
3678        assert_eq!(room.room_id(), room_id);
3679    }
3680
3681    #[async_test]
3682    async fn test_await_room_remote_echo_will_timeout_if_no_room_is_found() {
3683        let client = MockClientBuilder::new(None).build().await;
3684
3685        let room_id = room_id!("!room:example.org");
3686        // Room is not present so the client won't be able to find it. The call will
3687        // timeout.
3688        timeout(Duration::from_secs(1), client.await_room_remote_echo(room_id)).await.unwrap_err();
3689    }
3690
3691    #[async_test]
3692    async fn test_await_room_remote_echo_will_timeout_if_room_is_found_but_not_synced() {
3693        let server = MatrixMockServer::new().await;
3694        let client = server.client_builder().build().await;
3695
3696        server.mock_create_room().ok().mount().await;
3697
3698        // Create a room in the internal store
3699        let room = client
3700            .create_room(assign!(CreateRoomRequest::new(), {
3701                invite: vec![],
3702                is_direct: false,
3703            }))
3704            .await
3705            .unwrap();
3706
3707        // Room is locally present, but not synced, the call will timeout
3708        timeout(Duration::from_secs(1), client.await_room_remote_echo(room.room_id()))
3709            .await
3710            .unwrap_err();
3711    }
3712
3713    #[async_test]
3714    async fn test_is_room_alias_available_if_alias_is_not_resolved() {
3715        let server = MatrixMockServer::new().await;
3716        let client = server.client_builder().build().await;
3717
3718        server.mock_room_directory_resolve_alias().not_found().expect(1).mount().await;
3719
3720        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3721        assert_matches!(ret, Ok(true));
3722    }
3723
3724    #[async_test]
3725    async fn test_is_room_alias_available_if_alias_is_resolved() {
3726        let server = MatrixMockServer::new().await;
3727        let client = server.client_builder().build().await;
3728
3729        server
3730            .mock_room_directory_resolve_alias()
3731            .ok("!some_room_id:matrix.org", Vec::new())
3732            .expect(1)
3733            .mount()
3734            .await;
3735
3736        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3737        assert_matches!(ret, Ok(false));
3738    }
3739
3740    #[async_test]
3741    async fn test_is_room_alias_available_if_error_found() {
3742        let server = MatrixMockServer::new().await;
3743        let client = server.client_builder().build().await;
3744
3745        server.mock_room_directory_resolve_alias().error500().expect(1).mount().await;
3746
3747        let ret = client.is_room_alias_available(room_alias_id!("#some_alias:matrix.org")).await;
3748        assert_matches!(ret, Err(_));
3749    }
3750
3751    #[async_test]
3752    async fn test_create_room_alias() {
3753        let server = MatrixMockServer::new().await;
3754        let client = server.client_builder().build().await;
3755
3756        server.mock_room_directory_create_room_alias().ok().expect(1).mount().await;
3757
3758        let ret = client
3759            .create_room_alias(
3760                room_alias_id!("#some_alias:matrix.org"),
3761                room_id!("!some_room:matrix.org"),
3762            )
3763            .await;
3764        assert_matches!(ret, Ok(()));
3765    }
3766
3767    #[async_test]
3768    async fn test_room_preview_for_invited_room_hits_summary_endpoint() {
3769        let server = MatrixMockServer::new().await;
3770        let client = server.client_builder().build().await;
3771
3772        let room_id = room_id!("!a-room:matrix.org");
3773
3774        // Make sure the summary endpoint is called once
3775        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3776
3777        // We create a locally cached invited room
3778        let invited_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Invited);
3779
3780        // And we get a preview, the server endpoint was reached
3781        let preview = client
3782            .get_room_preview(room_id.into(), Vec::new())
3783            .await
3784            .expect("Room preview should be retrieved");
3785
3786        assert_eq!(invited_room.room_id().to_owned(), preview.room_id);
3787    }
3788
3789    #[async_test]
3790    async fn test_room_preview_for_left_room_hits_summary_endpoint() {
3791        let server = MatrixMockServer::new().await;
3792        let client = server.client_builder().build().await;
3793
3794        let room_id = room_id!("!a-room:matrix.org");
3795
3796        // Make sure the summary endpoint is called once
3797        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3798
3799        // We create a locally cached left room
3800        let left_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Left);
3801
3802        // And we get a preview, the server endpoint was reached
3803        let preview = client
3804            .get_room_preview(room_id.into(), Vec::new())
3805            .await
3806            .expect("Room preview should be retrieved");
3807
3808        assert_eq!(left_room.room_id().to_owned(), preview.room_id);
3809    }
3810
3811    #[async_test]
3812    async fn test_room_preview_for_knocked_room_hits_summary_endpoint() {
3813        let server = MatrixMockServer::new().await;
3814        let client = server.client_builder().build().await;
3815
3816        let room_id = room_id!("!a-room:matrix.org");
3817
3818        // Make sure the summary endpoint is called once
3819        server.mock_room_summary().ok(room_id).mock_once().mount().await;
3820
3821        // We create a locally cached knocked room
3822        let knocked_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Knocked);
3823
3824        // And we get a preview, the server endpoint was reached
3825        let preview = client
3826            .get_room_preview(room_id.into(), Vec::new())
3827            .await
3828            .expect("Room preview should be retrieved");
3829
3830        assert_eq!(knocked_room.room_id().to_owned(), preview.room_id);
3831    }
3832
3833    #[async_test]
3834    async fn test_room_preview_for_joined_room_retrieves_local_room_info() {
3835        let server = MatrixMockServer::new().await;
3836        let client = server.client_builder().build().await;
3837
3838        let room_id = room_id!("!a-room:matrix.org");
3839
3840        // Make sure the summary endpoint is not called
3841        server.mock_room_summary().ok(room_id).never().mount().await;
3842
3843        // We create a locally cached joined room
3844        let joined_room = client.inner.base_client.get_or_create_room(room_id, RoomState::Joined);
3845
3846        // And we get a preview, no server endpoint was reached
3847        let preview = client
3848            .get_room_preview(room_id.into(), Vec::new())
3849            .await
3850            .expect("Room preview should be retrieved");
3851
3852        assert_eq!(joined_room.room_id().to_owned(), preview.room_id);
3853    }
3854
3855    #[async_test]
3856    async fn test_media_preview_config() {
3857        let server = MatrixMockServer::new().await;
3858        let client = server.client_builder().build().await;
3859
3860        server
3861            .mock_sync()
3862            .ok_and_run(&client, |builder| {
3863                builder.add_custom_global_account_data(json!({
3864                    "content": {
3865                        "media_previews": "private",
3866                        "invite_avatars": "off"
3867                    },
3868                    "type": "m.media_preview_config"
3869                }));
3870            })
3871            .await;
3872
3873        let (initial_value, stream) =
3874            client.account().observe_media_preview_config().await.unwrap();
3875
3876        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3877        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3878        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3879        pin_mut!(stream);
3880        assert_pending!(stream);
3881
3882        server
3883            .mock_sync()
3884            .ok_and_run(&client, |builder| {
3885                builder.add_custom_global_account_data(json!({
3886                    "content": {
3887                        "media_previews": "off",
3888                        "invite_avatars": "on"
3889                    },
3890                    "type": "m.media_preview_config"
3891                }));
3892            })
3893            .await;
3894
3895        assert_next_matches!(
3896            stream,
3897            MediaPreviewConfigEventContent {
3898                media_previews: Some(MediaPreviews::Off),
3899                invite_avatars: Some(InviteAvatars::On),
3900                ..
3901            }
3902        );
3903        assert_pending!(stream);
3904    }
3905
3906    #[async_test]
3907    async fn test_unstable_media_preview_config() {
3908        let server = MatrixMockServer::new().await;
3909        let client = server.client_builder().build().await;
3910
3911        server
3912            .mock_sync()
3913            .ok_and_run(&client, |builder| {
3914                builder.add_custom_global_account_data(json!({
3915                    "content": {
3916                        "media_previews": "private",
3917                        "invite_avatars": "off"
3918                    },
3919                    "type": "io.element.msc4278.media_preview_config"
3920                }));
3921            })
3922            .await;
3923
3924        let (initial_value, stream) =
3925            client.account().observe_media_preview_config().await.unwrap();
3926
3927        let initial_value: MediaPreviewConfigEventContent = initial_value.unwrap();
3928        assert_eq!(initial_value.invite_avatars, Some(InviteAvatars::Off));
3929        assert_eq!(initial_value.media_previews, Some(MediaPreviews::Private));
3930        pin_mut!(stream);
3931        assert_pending!(stream);
3932
3933        server
3934            .mock_sync()
3935            .ok_and_run(&client, |builder| {
3936                builder.add_custom_global_account_data(json!({
3937                    "content": {
3938                        "media_previews": "off",
3939                        "invite_avatars": "on"
3940                    },
3941                    "type": "io.element.msc4278.media_preview_config"
3942                }));
3943            })
3944            .await;
3945
3946        assert_next_matches!(
3947            stream,
3948            MediaPreviewConfigEventContent {
3949                media_previews: Some(MediaPreviews::Off),
3950                invite_avatars: Some(InviteAvatars::On),
3951                ..
3952            }
3953        );
3954        assert_pending!(stream);
3955    }
3956
3957    #[async_test]
3958    async fn test_media_preview_config_not_found() {
3959        let server = MatrixMockServer::new().await;
3960        let client = server.client_builder().build().await;
3961
3962        let (initial_value, _) = client.account().observe_media_preview_config().await.unwrap();
3963
3964        assert!(initial_value.is_none());
3965    }
3966
3967    #[async_test]
3968    async fn test_load_or_fetch_max_upload_size_with_auth_matrix_version() {
3969        // The default Matrix version we use is 1.11 or higher, so authenticated media
3970        // is supported.
3971        let server = MatrixMockServer::new().await;
3972        let client = server.client_builder().build().await;
3973
3974        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
3975
3976        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
3977        client.load_or_fetch_max_upload_size().await.unwrap();
3978
3979        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
3980    }
3981
3982    #[async_test]
3983    async fn test_load_or_fetch_max_upload_size_with_auth_stable_feature() {
3984        // The server must advertise support for the stable feature for authenticated
3985        // media support, so we mock the `GET /versions` response.
3986        let server = MatrixMockServer::new().await;
3987        let client = server.client_builder().no_server_versions().build().await;
3988
3989        server
3990            .mock_versions()
3991            .ok_custom(
3992                &["v1.7", "v1.8", "v1.9", "v1.10"],
3993                &[("org.matrix.msc3916.stable", true)].into(),
3994            )
3995            .named("versions")
3996            .expect(1)
3997            .mount()
3998            .await;
3999
4000        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4001
4002        server.mock_authenticated_media_config().ok(uint!(2)).mock_once().mount().await;
4003        client.load_or_fetch_max_upload_size().await.unwrap();
4004
4005        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4006    }
4007
4008    #[async_test]
4009    async fn test_load_or_fetch_max_upload_size_no_auth() {
4010        // The server must not support Matrix 1.11 or higher for unauthenticated
4011        // media requests, so we mock the `GET /versions` response.
4012        let server = MatrixMockServer::new().await;
4013        let client = server.client_builder().no_server_versions().build().await;
4014
4015        server
4016            .mock_versions()
4017            .ok_custom(&["v1.1"], &Default::default())
4018            .named("versions")
4019            .expect(1)
4020            .mount()
4021            .await;
4022
4023        assert!(!client.inner.server_max_upload_size.lock().await.initialized());
4024
4025        server.mock_media_config().ok(uint!(2)).mock_once().mount().await;
4026        client.load_or_fetch_max_upload_size().await.unwrap();
4027
4028        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(2));
4029    }
4030
4031    #[async_test]
4032    async fn test_uploading_a_too_large_media_file() {
4033        let server = MatrixMockServer::new().await;
4034        let client = server.client_builder().build().await;
4035
4036        server.mock_authenticated_media_config().ok(uint!(1)).mock_once().mount().await;
4037        client.load_or_fetch_max_upload_size().await.unwrap();
4038        assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
4039
4040        let data = vec![1, 2];
4041        let upload_request =
4042            ruma::api::client::media::create_content::v3::Request::new(data.clone());
4043        let request = SendRequest {
4044            client: client.clone(),
4045            request: upload_request,
4046            config: None,
4047            send_progress: SharedObservable::new(TransmissionProgress::default()),
4048        };
4049        let media_request = SendMediaUploadRequest::new(request);
4050
4051        let error = media_request.await.err();
4052        assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);
4053        assert_eq!(max, uint!(1));
4054        assert_eq!(current, UInt::new_wrapping(data.len() as u64));
4055    }
4056
4057    #[async_test]
4058    async fn test_dont_ignore_timeout_on_first_sync() {
4059        let server = MatrixMockServer::new().await;
4060        let client = server.client_builder().build().await;
4061
4062        server
4063            .mock_sync()
4064            .timeout(Some(Duration::from_secs(30)))
4065            .ok(|_| {})
4066            .mock_once()
4067            .named("sync_with_timeout")
4068            .mount()
4069            .await;
4070
4071        // Call the endpoint once to check the timeout.
4072        let mut stream = Box::pin(client.sync_stream(SyncSettings::new()).await);
4073
4074        timeout(Duration::from_secs(1), async {
4075            stream.next().await.unwrap().unwrap();
4076        })
4077        .await
4078        .unwrap();
4079    }
4080
4081    #[async_test]
4082    async fn test_ignore_timeout_on_first_sync() {
4083        let server = MatrixMockServer::new().await;
4084        let client = server.client_builder().build().await;
4085
4086        server
4087            .mock_sync()
4088            .timeout(None)
4089            .ok(|_| {})
4090            .mock_once()
4091            .named("sync_no_timeout")
4092            .mount()
4093            .await;
4094        server
4095            .mock_sync()
4096            .timeout(Some(Duration::from_secs(30)))
4097            .ok(|_| {})
4098            .mock_once()
4099            .named("sync_with_timeout")
4100            .mount()
4101            .await;
4102
4103        // Call each version of the endpoint once to check the timeouts.
4104        let mut stream = Box::pin(
4105            client.sync_stream(SyncSettings::new().ignore_timeout_on_first_sync(true)).await,
4106        );
4107
4108        timeout(Duration::from_secs(1), async {
4109            stream.next().await.unwrap().unwrap();
4110            stream.next().await.unwrap().unwrap();
4111        })
4112        .await
4113        .unwrap();
4114    }
4115}